hdvianna / parallel-workerpool
1.1.2
2020-09-01 02:34 UTC
Requires
- php: ^7.2
- ext-curl: *
- ext-parallel: *
README
Parallel worker pool 使用 PHP parallel 扩展(https://php.ac.cn/manual/en/book.parallel.php)提供了一个简单接口来处理任务的并行化。
使用方法
WorkerPool
需要实现 WorkFactoryInterface
,该接口负责创建 消费者 和 生产者 闭包。生产者闭包必须返回一个 生成器。
Composer 安装
composer require hdvianna/parallel-workerpool
使用 Docker 运行
docker-compose up
Docker compose 会构建一个包含所需扩展的环境,并将当前目录创建为绑定挂载。
示例
在这个示例中,10 个工作进程将分别休眠 n 毫秒,每次它们都从 WorkFactory 生成的任务中消费。
use hdvianna\Concurrent\WorkFactoryInterface; use hdvianna\Concurrent\WorkerPool; (new WorkerPool(new class implements WorkFactoryInterface { public function createWorkGeneratorClosure(): \Closure { return function () { for ($i = 0; $i < 100; $i++) { $work = new \stdClass(); $work->time = mt_rand(300, 1000); $work->id = $i; yield $work; } }; } public function createWorkConsumerClosure(): \Closure { return function($work) { printf("[$work->id]: Sleeping for %d milliseconds ...%s", $work->time, PHP_EOL); usleep($work->time * 1000); printf("[$work->id]: Woke up after %d milliseconds ...%s", $work->time, PHP_EOL); }; } }, 10))->run();
同步数据
可以通过向工作函数发送锁和解锁闭包来同步数据。共享数据从 $lock
闭包接收,发送到 $unlock
闭包。可以通过调用 WorkerPool::lastValue()
获取最后发送的值。
use hdvianna\Concurrent\WorkFactoryInterface; use hdvianna\Concurrent\WorkerPool; $sharedData = 700; $works = 1000; $pool = new WorkerPool((new class ($sharedData, $works) implements WorkFactoryInterface { /** * @var int */ private $sharedData; /** * @var int */ private $works; /*** * constructor. * @param int $sharedData * @param int $works */ public function __construct($sharedData, $works) { $this->works = $works; $this->sharedData = $sharedData; } public function createWorkGeneratorClosure(): \Closure { $workers = $this->works; return function () use ($workers) { for ($i = 0; $i < $workers; $i++) { $work = new \stdClass(); $work->value = 1; yield $work; } }; } public function createWorkConsumerClosure(): \Closure { $initialValue = $this->sharedData; //Use the $lock and $unlock closures to synchronize data return function ($work, $lock, $unlock) use ($initialValue) { /*Synchronize the data. Will block and wait for data. $lock will return the last value*/ $shared = $lock(); if (!isset($shared)) { //Data was not initialized $shared = $initialValue; } $shared += $work->value; //Unlocks sending the new data. $unlock($shared); }; } }), 10); $pool->run(); //Get the last value sent to the unlock closure $result = $pool->lastValue(); echo("\$result equals to \$works + \$sharedData?" . PHP_EOL); echo("($result equals to $works + $sharedData?)" . PHP_EOL); echo(assert($result === ($works + $sharedData)) ? "Yes!": "No =(").PHP_EOL;