PHP 8.5.0 Released!

Pool 类

(PECL pthreads >= 2.0.0)

简介

Pool 对象是多个 Worker 对象的容器,同时也是它们的控制器。

线程池是对 Worker 功能的高层抽象,包括按照 pthreads 需要的方式来管理应用的功能。

类摘要

class Pool {
/* 属性 */
protected $size;
protected $class;
protected $workers;
protected $ctor;
protected $last;
/* 方法 */
public __construct(int $size, string $class = ?, array $ctor = ?)
public collect(Callable $collector = ?): int
publicresize(int $size): void
publicshutdown(): void
public submit(Threaded $task): int
public submitTo(int $worker, Threaded $task): int
}

属性

size

Pool 对象可容纳的 Worker 对象的最大数量

class

Worker 的类

workers

指向 Worker 对象的引用

ctor

构造新的 Worker 对象时所需的参数

last

最后使用的 Worker 对象在池中的位置偏移量

目录

添加备注

用户贡献的备注 3 notes

up
6
meadowsjared at gmail dot com
9 years ago
Please note, when using the collect function, it's important that you extend the pool class so you can keep checking for finished threads until they're all done.

<?php
class TestWork extends Threaded {
protected
$complete;
//$pData is the data sent to your worker thread to do it's job.
public function __construct($pData){
//transfer all the variables to local variables
$this->complete = false;
$this->testData = $pData;
}
//This is where all of your work will be done.
public function run(){
usleep(2000000); //sleep 2 seconds to simulate a large job
$this->complete = true;
}
public function
isGarbage() {
return
$this->complete;
}
}
class
ExamplePool extends Pool
{
public
$data = array();
public function
process()
{
// Run this loop as long as we have
// jobs in the pool
while (count($this->work)) {
$this->collect(function (TestWork $task) {
// If a task was marked as done
// collect its results
if ($task->isGarbage()) {
$tmpObj = new stdclass();
$tmpObj->complete = $task->complete;
//this is how you get your completed data back out [accessed by $pool->process()]
$this->data[] = $tmpObj;
}
return
$task->isGarbage();
});
}
// All jobs are done
// we can shutdown the pool
$this->shutdown();
return
$this->data;
}
}
$pool = new ExamplePool(3);
$testData = 'asdf';
for(
$i=0;$i<5;$i++) {
$pool->submit(new TestWork($testData));
}
$retArr = $pool->process(); //get all of the results
echo '<pre>';
print_r($retArr); //return the array of results (and maybe errors)
echo '</pre>';
?>
up
4
meadowsjared at gmail dot com
4 years ago
In this example, it shows how to use a pool to get an array of results, using pThreads v3.2.1 and php 7.3.23

<?php
class TestWork extends Threaded {
//updated version that works with pThreads v3.2.1 and php 7.3.23
protected $complete;
//$pData is the data sent to your worker thread to do it's job.
public function __construct($pData) {
//transfer all the variables to local variables
$this->complete = false;
$this->testData = $pData;
}
//This is where all of your work will be done.
public function run() {
usleep(2000000); //sleep 2 seconds to simulate a large job
$this->complete = true;
}
public function
isDone() {
return
$this->complete;
}
}
class
ExamplePool extends Pool {
public
$data = array(); // used to return data after we're done
private $numTasks = 0; // counter used to know when we're done
/**
* override the submit function from the parent
* to keep track of our jobs
*/
public function submit(Threaded $task) {
$this->numTasks++;
parent::submit($task);
}
/**
* used to wait until all workers are done
*/
public function process() {
// Run this loop as long as we have
// jobs in the pool
while (count($this->data) < $this->numTasks) {
$this->collect(function (TestWork $task) {
// If a task was marked as done, collect its results
if ($task->isDone()) {
$tmpObj = new stdclass();
$tmpObj->complete = $task->complete;
//this is how you get your completed data back out [accessed by $pool->process()]
$this->data[] = $tmpObj;
}
return
$task->isDone();
});
}
// All jobs are done
// we can shutdown the pool
$this->shutdown();
return
$this->data;
}
}
$pool = new ExamplePool(3);
$testData = 'asdf';
for(
$i=0;$i<5;$i++) {
$pool->submit(new TestWork($testData));
}
$retArr = $pool->process(); //get all of the results
echo '<pre>';
print_r($retArr); //return the array of results (and maybe errors)
echo '</pre>';
?>
up
2
olavk
10 years ago
Simple example with Collectable (basically Thread meant for Pool) and Pool

<?php

class job extends Collectable {
public
$val;

public function
__construct($val){
// init some properties
$this->val = $val;
}
public function
run(){
// do some work
$this->val = $this->val . file_get_contents('http://www.example.com/', null, null, 3, 20);
$this->setGarbage();
}
}

// At most 3 threads will work at once
$p = new Pool(3);

$tasks = array(
new
job('0'),
new
job('1'),
new
job('2'),
new
job('3'),
new
job('4'),
new
job('5'),
new
job('6'),
new
job('7'),
new
job('8'),
new
job('9'),
new
job('10'),
);
// Add tasks to pool queue
foreach ($tasks as $task) {
$p->submit($task);
}

// shutdown will wait for current queue to be completed
$p->shutdown();
// garbage collection check / read results
$p->collect(function($checkingTask){
echo
$checkingTask->val;
return
$checkingTask->isGarbage();
});

?>
To Top