qxsch/worker-pool

在并行处理工作池中运行任务。

资助包维护!
qxsch

v2.0.2 2024-02-22 13:34 UTC

README

Project Status

Latest Stable Version Total Downloads License

PHP的并行处理工作池

4个月内下载量达10K,非常感谢!我们将根据需求添加功能。

示例

WorkerPool类提供了一个非常简单的接口,可以将数据传递给工作池并对其进行处理。您可以在任何时候从工作员那里获取结果。每个工作子进程都可以接收并返回任何可以被 序列化 的值。

简单示例

<?php

$wp=new \QXS\WorkerPool\WorkerPool();
$wp->setWorkerPoolSize(4)
   ->create(new \QXS\WorkerPool\ClosureWorker(
                        /**
                          * @param mixed $input the input from the WorkerPool::run() Method
                          * @param \QXS\WorkerPool\Semaphore $semaphore the semaphore to synchronize calls accross all workers
                          * @param \ArrayObject $storage a persistent storage for the current child process
                          */
                        function($input, $semaphore, $storage) {
                                echo "[".getmypid()."]"." hi $input\n";
                                sleep(rand(1,3)); // this is the working load!
                                return $input; // return null here, in case you do not want to pass any data to the parent 
                        }
                )
);


for($i=0; $i<10; $i++) {
        $wp->run($i);
}

$wp->waitForAllWorkers(); // wait for all workers

foreach($wp as $val) {
	echo $val->dump() . "\n";  // dump the returned values
        // var_dump($val);  // dump the returned values
}

更复杂的示例

<?php

use QXS\WorkerPool\WorkerPool;
use QXS\WorkerPool\WorkerInterface;
use QXS\WorkerPool\Semaphore;

/**
 * Our Worker Class
 */
Class MyWorker implements WorkerInterface {
        protected $sem;
        /**
         * after the worker has been forked into another process
         *
         * @param \QXS\WorkerPool\Semaphore $semaphore the semaphore to run synchronized tasks
         * @throws \Exception in case of a processing Error an Exception will be thrown
         */
        public function onProcessCreate(Semaphore $semaphore) {
                // semaphore can be used in the run method to synchronize the workers
                $this->sem=$semaphore;
                // write something to the stdout
                echo "\t[".getmypid()."] has been created.\n";
                // initialize mt_rand
                list($usec, $sec) = explode(' ', microtime());
                mt_srand((int)( (float) $sec + ((float) $usec * 100000) ));
        }
        /**
         * before the worker process is getting destroyed
         *
         * @throws \Exception in case of a processing Error an Exception will be thrown
         */
        public function onProcessDestroy() {
                // write something to the stdout
                echo "\t[".getmypid()."] will be destroyed.\n";
        }
        /**
         * run the work
         *
         * @param Serializeable $input the data, that the worker should process
         * @return Serializeable Returns the result
         * @throws \Exception in case of a processing Error an Exception will be thrown
         */
        public function run($input) {
                $input=(string)$input;
                echo "\t[".getmypid()."] Hi $input\n";
                sleep(mt_rand(0,10)); // this is the workload!
                // and sometimes exceptions might occur
                if(mt_rand(0,10)==9) {
                        throw new \RuntimeException('We have a problem for '.$input.'.');
                }
                return "Hi $input"; // return null here, in case you do not want to pass any data to the parent
        }
}


$wp=new WorkerPool();
$wp->setWorkerPoolSize(10)
   ->create(new MyWorker());

// produce some tasks
for($i=1; $i<=50; $i++) {
        $wp->run($i);
}

// some statistics
echo "Busy Workers:".$wp->getBusyWorkers()."  Free Workers:".$wp->getFreeWorkers()."\n";

// wait for completion of all tasks
$wp->waitForAllWorkers();

// collect all the results
foreach($wp as $val) {
        if(isset($val['data'])) {
                echo "RESULT: ".$val['data']."\n";
        }
        elseif(isset($val['workerException'])) {
                echo "WORKER EXCEPTION: ".$val['workerException']['class'].": ".$val['workerException']['message']."\n".$val['workerException']['trace']."\n";
        }
        elseif(isset($val['poolException'])) {
                echo "POOL EXCEPTION: ".$val['poolException']['class'].": ".$val['poolException']['message']."\n".$val['poolException']['trace']."\n";
        }
}


// write something, before the parent exits
echo "ByeBye\n";

同步您的工人

如果您需要访问共享资源,您可以同步您的工人。

<?php

$wp=new \QXS\WorkerPool\WorkerPool();
$wp->setWorkerPoolSize(4)
   ->create(new \QXS\WorkerPool\ClosureWorker(
                        /**
                          * @param mixed $input the input from the WorkerPool::run() Method
                          * @param \QXS\WorkerPool\Semaphore $semaphore the semaphore to synchronize calls accross all workers
                          * @param \ArrayObject $storage a persistent storge for the current child process
                          */
                        function($input, $semaphore, $storage) {
				$semaphore->synchronizedBegin();
				try {
                                        // this code is being synchronized accross all workers
					// so here we have just one worker at a time
                                        echo "[A][".getmypid()."]"." hi $input\n";
				}
				finally {
                                	$semaphore->synchronizedEnd();
				}
				
                                // alternative example
                                $semaphore->synchronize(function() use ($input, $storage) {
                                        // this code is being synchronized accross all workers
					// so here we have just one worker at a time
                                        echo "[B][".getmypid()."]"." hi $input\n";
                                });
                                sleep(rand(1,3)); // this is the working load!
                                return $input;
                        }
                )
);


for($i=0; $i<10; $i++) {
        $wp->run($i);
}

$wp->waitForAllWorkers(); // wait for all workers

foreach($wp as $val) {
        var_dump($val);  // dump the returned values
}

禁用信号量(同步工人的能力)

您可以选择禁用信号量。有些人抱怨打开信号量,他们根本不需要。

<?php

$wp=new \QXS\WorkerPool\WorkerPool();
$wp->setWorkerPoolSize(4)
   ->disableSemaphore()  // <--- this disables the semaphore support (you can still use it in the worker, but it will have no effect)
   ->create(new \QXS\WorkerPool\ClosureWorker(
                        /**
                          * @param mixed $input the input from the WorkerPool::run() Method
                          * @param \QXS\WorkerPool\Semaphore $semaphore the semaphore to synchronize calls accross all workers
                          * @param \ArrayObject $storage a persistent storage for the current child process
                          */
                        function($input, $semaphore, $storage) {
                                echo "[".getmypid()."]"." hi $input\n";
                                sleep(rand(1,3)); // this is the working load!
                                return $input; // return null here, in case you do not want to pass any data to the parent 
                        }
                )
);


for($i=0; $i<10; $i++) {
        $wp->run($i);
}

$wp->waitForAllWorkers(); // wait for all workers

foreach($wp as $val) {
        var_dump($val);  // dump the returned values
}

自动重启

您可以选择自动重启死亡的工作员。

<?php

$wp=new \QXS\WorkerPool\WorkerPool();
$wp->setWorkerPoolSize(4)
   ->respawnAutomatically()
   ->create(new \QXS\WorkerPool\ClosureWorker(
                        /**
                          * @param mixed $input the input from the WorkerPool::run() Method
                          * @param \QXS\WorkerPool\Semaphore $semaphore the semaphore to synchronize calls accross all workers
                          * @param \ArrayObject $storage a persistent storage for the current child process
                          */
                        function($input, $semaphore, $storage) {
                                echo "[".getmypid()."]"." hi $input\n";
                                sleep(rand(1,3)); // this is the working load!

                                // Simulate unexpected worker death
                                if (rand(1, 10) > 5) exit;

                                return $input; // return null here, in case you do not want to pass any data to the parent 
                        }
                )
);


for($i=0; $i<10; $i++) {
        $wp->run($i);
}

$wp->waitForAllWorkers(); // wait for all workers

foreach($wp as $val) {
        var_dump($val);  // dump the returned values
}

每次工作员死亡时,都会创建一个新的工作员,其索引递增。

您应避免工作员死亡但重启能力可以是一个有用的解决方案,直到您解决这个问题。

向ps的透明输出

在运行PS时查看正在发生的事情

root   2378   \_ simpleExample.php: Parent
root   2379       \_ simpleExample.php: Worker 1 of QXS\WorkerPool\ClosureWorker [busy]
root   2380       \_ simpleExample.php: Worker 2 of QXS\WorkerPool\ClosureWorker [busy]
root   2381       \_ simpleExample.php: Worker 3 of QXS\WorkerPool\ClosureWorker [free]
root   2382       \_ simpleExample.php: Worker 4 of QXS\WorkerPool\ClosureWorker [free]

文档

文档可以在这里找到 http://qxsch.github.io/WorkerPool/doc/