uginroot / async-symfony-process
执行进程池的 Symfony 进程
v1.0.1
2021-01-05 03:25 UTC
Requires
- php: >=7.4
- symfony/process: ^3.4|^4.4|^5.0
Requires (Dev)
- phpunit/phpunit: 8.0.0
README
现有的库使用早期生成的进程池,如果有很多进程,这并不适合,如果在执行进程之后可能需要执行另一个进程,通常也不适合。这个库就是为了解决这个问题。
这个库基于 symfony/process 构建。
安装
composer require uginroot/async-symfony-process
使用
基础
在最简单的情况下,您只需指定一个将生成新进程的函数。如果函数返回零而不是进程,则所有当前进程执行完毕后执行将结束。
use Symfony\Component\Process\Process; use Uginroot\AsyncSymfonyProcess\Pool; $queue = range(1, 10); $processFactory = static function() use (&$queue):?Process{ if(count($queue) === 0){ return null; } $value = array_shift($queue); return Process::fromShellCommandline(sprintf('echo %d', $value)); }; $pool = new Pool(); $pool->setProcessFactory($processFactory); $pool->execute();
回调
如果您需要进程的结果,则需要设置一个回调函数,该函数将在进程完成后被调用。
use Symfony\Component\Process\Process; use Uginroot\AsyncSymfonyProcess\Pool; use Uginroot\AsyncSymfonyProcess\ProcessWrapper; $queue = range(1, 10); $results = []; $processFactory = static function() use (&$queue):?Process{ if(count($queue) === 0){ return null; } $value = array_shift($queue); return Process::fromShellCommandline(sprintf('echo %d', $value)); }; $callback = static function(ProcessWrapper $processWrapper) use (&$results):void{ $results[] = (int)$processWrapper->getOutput(); }; $pool = new Pool(); $pool->setProcessFactory($processFactory); $pool->setCallback($callback); $pool->execute();
输出监听器
如果进程是交互式的,或者您需要以某种方式对进程生成的错误做出反应,则需要设置输出监听器。它将接收当前进程的实例、输出类型(Process :: ERR 或 Process :: OUT)以及进程生成数据。
use Symfony\Component\Process\InputStream; use Symfony\Component\Process\Process; use Uginroot\AsyncSymfonyProcess\Pool; $queue = range(1, 10); $processFactory = static function() use (&$queue):?Process{ if(count($queue) === 0){ return null; } $value = array_shift($queue); $process = Process::fromShellCommandline(sprintf('echo %d', $value)); $process->setInput(new InputStream()); return $process; }; $outputListener = static function(Process $process, string $type, string $data):void{ /** @var InputStream $input */ $input = $process->getInput(); if($type === Process::ERR){ $input->write('exit;'); } elseif ($type === Process::OUT){ if($data === 'Say yes:'){ $input->write('yes'); } } }; $pool = new Pool(); $pool->setProcessFactory($processFactory); $pool->setOutputListener($outputListener); $pool->execute();
循环监听器
为了在每个进程执行循环的每次迭代中执行任何操作,您必须设置循环监听器。
use Symfony\Component\Process\Process; use Uginroot\AsyncSymfonyProcess\Pool; $queue = range(1, 10); $processFactory = static function() use (&$queue):?Process{ if(count($queue) === 0){ return null; } $value = array_shift($queue); return Process::fromShellCommandline(sprintf('echo %d', $value)); }; $whileListener = static function():void{ // pass }; $pool = new Pool(); $pool->setProcessFactory($processFactory); $pool->setWhileListener($whileListener); $pool->execute();
无限执行循环
为了防止进程执行周期在进程工厂返回零时停止,在配置池时必须通知这一点。您可以在任何被调用的函数中做出反应并补充进程列表,例如在 whileListener 中。如果您需要停止无限执行,则需要将此信息指定给池。
use Symfony\Component\Process\Process; use Uginroot\AsyncSymfonyProcess\Pool; $queue = []; $iteration = 0; $processFactory = static function() use (&$queue):?Process{ if(count($queue) === 0){ return null; } $value = array_shift($queue); return Process::fromShellCommandline(sprintf('echo %d', $value)); }; $whileListener = static function() use (&$queue, &$iteration, &$pool):void{ $iteration++; if($iteration === 20){ // Will end execution after executed and // newly generated processes have finished $pool->setIsEternal(false); } if($iteration % 5 === 0){ $queue[] = $iteration; } }; $pool = new Pool(); $pool->setProcessFactory($processFactory); $pool->setWhileListener($whileListener); $pool->setIsEternal(true); $pool->execute();
在类中使用
use Symfony\Component\Process\Process; use Uginroot\AsyncSymfonyProcess\Pool; use Uginroot\AsyncSymfonyProcess\ProcessWrapper; class AsyncProcess { private array $indexes; private array $results = []; public function __construct() { $this->indexes = range(1, 10); } public function processFactory():?Process { if(count($this->indexes) === 0){ return null; } $index = array_shift($this->indexes); return Process::fromShellCommandline(sprintf('echo %d', $index)); } public function processCallback(ProcessWrapper $processWrapper):void { $index = (int)$processWrapper->getOutput(); $this->results[] = $index; } public function run():void { $pool = new Pool(); $pool ->setProcessFactory([$this, 'processFactory']) ->setCallback([$this, 'processCallback']) ->execute() ; } }