libcast / worker-pool
在并行处理工作池中运行任务。
v1.6.2
2016-04-01 12:40 UTC
Requires
- php: >=5.3.0
- ext-pcntl: *
- ext-posix: *
- ext-sockets: *
- ext-sysvsem: *
- psr/log: ~1.0
Requires (Dev)
- phpunit/phpunit: >=4.0.0
Suggests
- php: >=5.5.0
- jeremeamia/SuperClosure: >=1.0.1
This package is auto-updated.
Last update: 2024-08-26 00:44:34 UTC
README
PHP 的并行处理工作池
4 个月内下载量达到 10K,非常感谢!我们会根据需求添加新功能。
示例
WorkerPool 类提供了一个非常简单的接口,可以将数据传递给工作池并对其进行处理。您可以在任何时候从工作者那里获取结果。每个工作者子进程可以返回任何可以 序列化 的值。
简单示例
<?php $wp=new \QXS\WorkerPool\WorkerPool(); $wp->setWorkerPoolSize(4) ->create(new \QXS\WorkerPool\ClosureWorker( /** * @param mixed $input the input from the WorkerPool::run() Method * @param \QXS\WorkerPool\Semaphore $semaphore the semaphore to synchronize calls accross all workers * @param \ArrayObject $storage a persistent storage for the current child process */ function($input, $semaphore, $storage) { echo "[".getmypid()."]"." hi $input\n"; sleep(rand(1,3)); // this is the working load! return $input; // return null here, in case you do not want to pass any data to the parent } ) ); for($i=0; $i<10; $i++) { $wp->run($i); } $wp->waitForAllWorkers(); // wait for all workers foreach($wp as $val) { var_dump($val); // dump the returned values }
更复杂的示例
<?php use QXS\WorkerPool\WorkerPool; use QXS\WorkerPool\Worker; use QXS\WorkerPool\Semaphore; /** * Our Worker Class */ Class MyWorker implements Worker { protected $sem; /** * after the worker has been forked into another process * * @param \QXS\WorkerPool\Semaphore $semaphore the semaphore to run synchronized tasks * @throws \Exception in case of a processing Error an Exception will be thrown */ public function onProcessCreate(Semaphore $semaphore) { // semaphore can be used in the run method to synchronize the workers $this->sem=$semaphore; // write something to the stdout echo "\t[".getmypid()."] has been created.\n"; // initialize mt_rand list($usec, $sec) = explode(' ', microtime()); mt_srand( (float) $sec + ((float) $usec * 100000) ); } /** * before the worker process is getting destroyed * * @throws \Exception in case of a processing Error an Exception will be thrown */ public function onProcessDestroy() { // write something to the stdout echo "\t[".getmypid()."] will be destroyed.\n"; } /** * run the work * * @param Serializeable $input the data, that the worker should process * @return Serializeable Returns the result * @throws \Exception in case of a processing Error an Exception will be thrown */ public function run($input) { $input=(string)$input; echo "\t[".getmypid()."] Hi $input\n"; sleep(mt_rand(0,10)); // this is the workload! // and sometimes exceptions might occur if(mt_rand(0,10)==9) { throw new \RuntimeException('We have a problem for '.$input.'.'); } return "Hi $input"; // return null here, in case you do not want to pass any data to the parent } } $wp=new WorkerPool(); $wp->setWorkerPoolSize(10) ->create(new MyWorker()); // produce some tasks for($i=1; $i<=50; $i++) { $wp->run($i); } // some statistics echo "Busy Workers:".$wp->getBusyWorkers()." Free Workers:".$wp->getFreeWorkers()."\n"; // wait for completion of all tasks $wp->waitForAllWorkers(); // collect all the results foreach($wp as $val) { if(isset($val['data'])) { echo "RESULT: ".$val['data']."\n"; } elseif(isset($val['workerException'])) { echo "WORKER EXCEPTION: ".$val['workerException']['class'].": ".$val['workerException']['message']."\n".$val['workerException']['trace']."\n"; } elseif(isset($val['poolException'])) { echo "POOL EXCEPTION: ".$val['poolException']['class'].": ".$val['poolException']['message']."\n".$val['poolException']['trace']."\n"; } } // write something, before the parent exits echo "ByeBye\n";
自动重启
您可以选择自动重启已死的工作者。
<?php $wp=new \QXS\WorkerPool\WorkerPool(); $wp->setWorkerPoolSize(4) ->respawnAutomatically() ->create(new \QXS\WorkerPool\ClosureWorker( /** * @param mixed $input the input from the WorkerPool::run() Method * @param \QXS\WorkerPool\Semaphore $semaphore the semaphore to synchronize calls accross all workers * @param \ArrayObject $storage a persistent storage for the current child process */ function($input, $semaphore, $storage) { echo "[".getmypid()."]"." hi $input\n"; sleep(rand(1,3)); // this is the working load! // Simulate unexpected worker death if (rand(1, 10) > 5) exit; return $input; // return null here, in case you do not want to pass any data to the parent } ) ); for($i=0; $i<10; $i++) { $wp->run($i); } $wp->waitForAllWorkers(); // wait for all workers foreach($wp as $val) { var_dump($val); // dump the returned values }
每次工作者死亡时,都会创建一个新的工作者,其索引将递增。
您应该避免工作者死亡但重启能力无法使用的情况。重启能力可以在您修复问题之前作为一个有用的解决方案。
透明输出到 ps
在运行 PS 时查看正在发生的事情
root 2378 \_ simpleExample.php: Parent
root 2379 \_ simpleExample.php: Worker 1 of QXS\WorkerPool\ClosureWorker [busy]
root 2380 \_ simpleExample.php: Worker 2 of QXS\WorkerPool\ClosureWorker [busy]
root 2381 \_ simpleExample.php: Worker 3 of QXS\WorkerPool\ClosureWorker [free]
root 2382 \_ simpleExample.php: Worker 4 of QXS\WorkerPool\ClosureWorker [free]
文档
文档可以在以下位置找到 http://qxsch.github.io/WorkerPool/doc/