luciansabo / async-command
使用Worker Pool帮助并行化在命令行脚本中运行的PHP代码执行
v1.0.0
2020-01-13 14:06 UTC
Requires
- guzzlehttp/promises: >=1.3
- symfony/process: >=3.4
Requires (Dev)
- phpunit/phpunit: ^8
This package is auto-updated.
Last update: 2024-09-14 02:53:19 UTC
README
这允许您在命令行脚本中使用时并行化PHP代码执行。Worker Pool概念使用固定数量的进程来执行任务。它类似于多线程,但使用进程而不是线程。
快速入门
此示例显示了如何并行执行4个HTTP GET请求
<?php use Lucian\AsyncCommand\EntryPoint\PhpScriptEntryPoint; use Lucian\AsyncCommand\WorkerPool; // include composer autoloader require_once 'vendor/autoload.php'; $urls = [ 'https://google.com', 'https://bing.com', 'https://yahoo.com', 'https://amazon.com', ]; $workerPool = new WorkerPool(null, 4); $entryPoint = new PhpScriptEntryPoint(__FILE__, 'workerCode'); foreach ($urls as $key => $url) { $promise = $workerPool->runAsync($entryPoint, $url, $key); $promise->then( function ($value) { // we receive what was returned from workerCode ([$url, strlen($contents)]) var_dump($value); return $value; }, function ($reason) { echo "\n\nErrors:\n$reason\n"; return $reason; } ); } $workerPool->wait(); function workerCode(string $url) { $contents = file_get_contents($url); // exceptions or fatal errors should result in a fail //throw new \Exception($url . ' caca'); //trigger_error("Warning", E_WARNING); //trigger_error("Fatal error", E_USER_ERROR); return [$url, strlen($contents)]; }
runAsync() 返回与Promises/A+兼容的Promise。所使用的Promise实现来自Guzzle。
Promise代表异步操作最终的结果。与Promise交互的主要方式是通过其then方法,该方法注册回调以接收Promise的最终值或Promise无法满足的原因。
use GuzzleHttp\Promise\PromiseInterface; /** @var PromiseInterface $promise */ //$promise = new Promise(); $promise->then( // $onFulfilled function ($value) { echo 'The promise was fulfilled.'; }, // $onRejected function ($reason) { echo 'The promise was rejected.'; } );
从同一类执行工作代码
要定义当前类中的入口点,请将 this:<method> 传递给 PhpScriptEntryPoint 方法参数。不要忘记使用 $this 作为父参数构建WorkerPool。
<?php use Lucian\AsyncCommand\EntryPoint\PhpScriptEntryPoint; use Lucian\AsyncCommand\WorkerPool; require_once 'vendor/autoload.php'; class SampleObjectMethodEntrypoint { private $workerPool; public function __construct() { $this->workerPool = new WorkerPool($this, 4); } public function execute() { $entryPoint = new PhpScriptEntryPoint(__FILE__, 'this:workerCode'); $param2 = 'test'; for ($i = 0; $i < 10; $i++) { $promise = $this->workerPool->runAsync($entryPoint, $i, $param2); $promise->then( function ($value) { // do something with the value echo "$value\n"; return $value; } ); } $this->workerPool->wait(); } public function workerCode(int $counter, string $param2) { return $param2 . $counter; } } $app = new SampleObjectMethodEntrypoint(); $app->execute();
workerCode() 是当前类中的一个方法。
在Symfony控制台命令中执行工作代码
要定义当前symfony命令中的入口点,请使用 SymfonyCommandEntryPoint
use Lucian\AsyncCommand\EntryPoint\SymfonyCommandEntryPoint; use Lucian\AsyncCommand\WorkerPool; use Symfony\Component\Console\Command\Command; use Symfony\Component\Console\Input\InputInterface; use Symfony\Component\Console\Output\OutputInterface; class TestAsyncCommand extends Command { /** * @var WorkerPool */ private $workerPool; public function __construct() { $this->workerPool = new WorkerPool($this, 4); parent::__construct(); } protected function execute(InputInterface $input, OutputInterface $output) { $entryPoint = new SymfonyCommandEntryPoint($this, 'workerCode'); $param2 = 'test'; for ($i = 0; $i < 10; $i++) { $promise = $this->workerPool->runAsync($entryPoint, $i, $param2); $promise->then( function ($value) { // do something with the value echo "$value\n"; return $value; } ); } $this->workerPool->wait(); } public function workerCode(int $counter, string $param2) { sleep(1); // simulate time consuming task return $param2 . $counter; } }