qxsch / worker-pool
在并行处理工作池中运行任务。
v2.0.2
2024-02-22 13:34 UTC
Requires
- php: >=7.1.0
- ext-pcntl: *
- ext-posix: *
- ext-sockets: *
- ext-sysvsem: *
Requires (Dev)
- phpunit/phpunit: >=4.0.0
Suggests
- php: >=8.0.0
- jeremeamia/SuperClosure: >=1.0.1
This package is not auto-updated.
Last update: 2024-09-22 23:20:02 UTC
README
PHP的并行处理工作池
4个月内下载量达10K,非常感谢!我们将根据需求添加功能。
示例
WorkerPool类提供了一个非常简单的接口,可以将数据传递给工作池并对其进行处理。您可以在任何时候从工作员那里获取结果。每个工作子进程都可以接收并返回任何可以被 序列化 的值。
简单示例
<?php $wp=new \QXS\WorkerPool\WorkerPool(); $wp->setWorkerPoolSize(4) ->create(new \QXS\WorkerPool\ClosureWorker( /** * @param mixed $input the input from the WorkerPool::run() Method * @param \QXS\WorkerPool\Semaphore $semaphore the semaphore to synchronize calls accross all workers * @param \ArrayObject $storage a persistent storage for the current child process */ function($input, $semaphore, $storage) { echo "[".getmypid()."]"." hi $input\n"; sleep(rand(1,3)); // this is the working load! return $input; // return null here, in case you do not want to pass any data to the parent } ) ); for($i=0; $i<10; $i++) { $wp->run($i); } $wp->waitForAllWorkers(); // wait for all workers foreach($wp as $val) { echo $val->dump() . "\n"; // dump the returned values // var_dump($val); // dump the returned values }
更复杂的示例
<?php use QXS\WorkerPool\WorkerPool; use QXS\WorkerPool\WorkerInterface; use QXS\WorkerPool\Semaphore; /** * Our Worker Class */ Class MyWorker implements WorkerInterface { protected $sem; /** * after the worker has been forked into another process * * @param \QXS\WorkerPool\Semaphore $semaphore the semaphore to run synchronized tasks * @throws \Exception in case of a processing Error an Exception will be thrown */ public function onProcessCreate(Semaphore $semaphore) { // semaphore can be used in the run method to synchronize the workers $this->sem=$semaphore; // write something to the stdout echo "\t[".getmypid()."] has been created.\n"; // initialize mt_rand list($usec, $sec) = explode(' ', microtime()); mt_srand((int)( (float) $sec + ((float) $usec * 100000) )); } /** * before the worker process is getting destroyed * * @throws \Exception in case of a processing Error an Exception will be thrown */ public function onProcessDestroy() { // write something to the stdout echo "\t[".getmypid()."] will be destroyed.\n"; } /** * run the work * * @param Serializeable $input the data, that the worker should process * @return Serializeable Returns the result * @throws \Exception in case of a processing Error an Exception will be thrown */ public function run($input) { $input=(string)$input; echo "\t[".getmypid()."] Hi $input\n"; sleep(mt_rand(0,10)); // this is the workload! // and sometimes exceptions might occur if(mt_rand(0,10)==9) { throw new \RuntimeException('We have a problem for '.$input.'.'); } return "Hi $input"; // return null here, in case you do not want to pass any data to the parent } } $wp=new WorkerPool(); $wp->setWorkerPoolSize(10) ->create(new MyWorker()); // produce some tasks for($i=1; $i<=50; $i++) { $wp->run($i); } // some statistics echo "Busy Workers:".$wp->getBusyWorkers()." Free Workers:".$wp->getFreeWorkers()."\n"; // wait for completion of all tasks $wp->waitForAllWorkers(); // collect all the results foreach($wp as $val) { if(isset($val['data'])) { echo "RESULT: ".$val['data']."\n"; } elseif(isset($val['workerException'])) { echo "WORKER EXCEPTION: ".$val['workerException']['class'].": ".$val['workerException']['message']."\n".$val['workerException']['trace']."\n"; } elseif(isset($val['poolException'])) { echo "POOL EXCEPTION: ".$val['poolException']['class'].": ".$val['poolException']['message']."\n".$val['poolException']['trace']."\n"; } } // write something, before the parent exits echo "ByeBye\n";
同步您的工人
如果您需要访问共享资源,您可以同步您的工人。
<?php $wp=new \QXS\WorkerPool\WorkerPool(); $wp->setWorkerPoolSize(4) ->create(new \QXS\WorkerPool\ClosureWorker( /** * @param mixed $input the input from the WorkerPool::run() Method * @param \QXS\WorkerPool\Semaphore $semaphore the semaphore to synchronize calls accross all workers * @param \ArrayObject $storage a persistent storge for the current child process */ function($input, $semaphore, $storage) { $semaphore->synchronizedBegin(); try { // this code is being synchronized accross all workers // so here we have just one worker at a time echo "[A][".getmypid()."]"." hi $input\n"; } finally { $semaphore->synchronizedEnd(); } // alternative example $semaphore->synchronize(function() use ($input, $storage) { // this code is being synchronized accross all workers // so here we have just one worker at a time echo "[B][".getmypid()."]"." hi $input\n"; }); sleep(rand(1,3)); // this is the working load! return $input; } ) ); for($i=0; $i<10; $i++) { $wp->run($i); } $wp->waitForAllWorkers(); // wait for all workers foreach($wp as $val) { var_dump($val); // dump the returned values }
禁用信号量(同步工人的能力)
您可以选择禁用信号量。有些人抱怨打开信号量,他们根本不需要。
<?php $wp=new \QXS\WorkerPool\WorkerPool(); $wp->setWorkerPoolSize(4) ->disableSemaphore() // <--- this disables the semaphore support (you can still use it in the worker, but it will have no effect) ->create(new \QXS\WorkerPool\ClosureWorker( /** * @param mixed $input the input from the WorkerPool::run() Method * @param \QXS\WorkerPool\Semaphore $semaphore the semaphore to synchronize calls accross all workers * @param \ArrayObject $storage a persistent storage for the current child process */ function($input, $semaphore, $storage) { echo "[".getmypid()."]"." hi $input\n"; sleep(rand(1,3)); // this is the working load! return $input; // return null here, in case you do not want to pass any data to the parent } ) ); for($i=0; $i<10; $i++) { $wp->run($i); } $wp->waitForAllWorkers(); // wait for all workers foreach($wp as $val) { var_dump($val); // dump the returned values }
自动重启
您可以选择自动重启死亡的工作员。
<?php $wp=new \QXS\WorkerPool\WorkerPool(); $wp->setWorkerPoolSize(4) ->respawnAutomatically() ->create(new \QXS\WorkerPool\ClosureWorker( /** * @param mixed $input the input from the WorkerPool::run() Method * @param \QXS\WorkerPool\Semaphore $semaphore the semaphore to synchronize calls accross all workers * @param \ArrayObject $storage a persistent storage for the current child process */ function($input, $semaphore, $storage) { echo "[".getmypid()."]"." hi $input\n"; sleep(rand(1,3)); // this is the working load! // Simulate unexpected worker death if (rand(1, 10) > 5) exit; return $input; // return null here, in case you do not want to pass any data to the parent } ) ); for($i=0; $i<10; $i++) { $wp->run($i); } $wp->waitForAllWorkers(); // wait for all workers foreach($wp as $val) { var_dump($val); // dump the returned values }
每次工作员死亡时,都会创建一个新的工作员,其索引递增。
您应避免工作员死亡但重启能力可以是一个有用的解决方案,直到您解决这个问题。
向ps的透明输出
在运行PS时查看正在发生的事情
root 2378 \_ simpleExample.php: Parent
root 2379 \_ simpleExample.php: Worker 1 of QXS\WorkerPool\ClosureWorker [busy]
root 2380 \_ simpleExample.php: Worker 2 of QXS\WorkerPool\ClosureWorker [busy]
root 2381 \_ simpleExample.php: Worker 3 of QXS\WorkerPool\ClosureWorker [free]
root 2382 \_ simpleExample.php: Worker 4 of QXS\WorkerPool\ClosureWorker [free]
文档
文档可以在这里找到 http://qxsch.github.io/WorkerPool/doc/