uginroot/async-symfony-process

执行进程池的 Symfony 进程

v1.0.1 2021-01-05 03:25 UTC

This package is auto-updated.

Last update: 2024-09-05 11:13:57 UTC


README

现有的库使用早期生成的进程池,如果有很多进程,这并不适合,如果在执行进程之后可能需要执行另一个进程,通常也不适合。这个库就是为了解决这个问题。

这个库基于 symfony/process 构建。

安装

composer require uginroot/async-symfony-process

使用

基础

在最简单的情况下,您只需指定一个将生成新进程的函数。如果函数返回零而不是进程,则所有当前进程执行完毕后执行将结束。

use Symfony\Component\Process\Process;
use Uginroot\AsyncSymfonyProcess\Pool;

$queue = range(1, 10);

$processFactory = static function() use (&$queue):?Process{
    if(count($queue) === 0){
        return null;
    }
    
    $value = array_shift($queue);
    return Process::fromShellCommandline(sprintf('echo %d', $value));
};

$pool = new Pool();
$pool->setProcessFactory($processFactory);
$pool->execute();

回调

如果您需要进程的结果,则需要设置一个回调函数,该函数将在进程完成后被调用。

use Symfony\Component\Process\Process;
use Uginroot\AsyncSymfonyProcess\Pool;
use Uginroot\AsyncSymfonyProcess\ProcessWrapper;

$queue = range(1, 10);
$results = [];

$processFactory = static function() use (&$queue):?Process{
    if(count($queue) === 0){
        return null;
    }
    
    $value = array_shift($queue);
    return Process::fromShellCommandline(sprintf('echo %d', $value));
};

$callback = static function(ProcessWrapper $processWrapper) use (&$results):void{
    $results[] = (int)$processWrapper->getOutput();
};

$pool = new Pool();
$pool->setProcessFactory($processFactory);
$pool->setCallback($callback);
$pool->execute();

输出监听器

如果进程是交互式的,或者您需要以某种方式对进程生成的错误做出反应,则需要设置输出监听器。它将接收当前进程的实例、输出类型(Process :: ERR 或 Process :: OUT)以及进程生成数据。

use Symfony\Component\Process\InputStream;
use Symfony\Component\Process\Process;
use Uginroot\AsyncSymfonyProcess\Pool;

$queue = range(1, 10);

$processFactory = static function() use (&$queue):?Process{
    if(count($queue) === 0){
        return null;
    }
    
    $value = array_shift($queue);
    $process = Process::fromShellCommandline(sprintf('echo %d', $value));
    $process->setInput(new InputStream());
    return $process;
};

$outputListener = static function(Process $process, string $type, string $data):void{
    /** @var InputStream $input */
    $input = $process->getInput();
    if($type === Process::ERR){
        $input->write('exit;');
    } elseif ($type === Process::OUT){
        if($data === 'Say yes:'){
            $input->write('yes');
        }
    }
};

$pool = new Pool();
$pool->setProcessFactory($processFactory);
$pool->setOutputListener($outputListener);
$pool->execute();

循环监听器

为了在每个进程执行循环的每次迭代中执行任何操作,您必须设置循环监听器。

use Symfony\Component\Process\Process;
use Uginroot\AsyncSymfonyProcess\Pool;

$queue = range(1, 10);

$processFactory = static function() use (&$queue):?Process{
    if(count($queue) === 0){
        return null;
    }
    
    $value = array_shift($queue);
    return Process::fromShellCommandline(sprintf('echo %d', $value));
};

$whileListener = static function():void{
    // pass
};

$pool = new Pool();
$pool->setProcessFactory($processFactory);
$pool->setWhileListener($whileListener);
$pool->execute();

无限执行循环

为了防止进程执行周期在进程工厂返回零时停止,在配置池时必须通知这一点。您可以在任何被调用的函数中做出反应并补充进程列表,例如在 whileListener 中。如果您需要停止无限执行,则需要将此信息指定给池。

use Symfony\Component\Process\Process;
use Uginroot\AsyncSymfonyProcess\Pool;

$queue = [];
$iteration = 0;

$processFactory = static function() use (&$queue):?Process{
    if(count($queue) === 0){
        return null;
    }
    
    $value = array_shift($queue);
    return Process::fromShellCommandline(sprintf('echo %d', $value));
};

$whileListener = static function() use (&$queue, &$iteration, &$pool):void{
    $iteration++;
    
    if($iteration === 20){
        // Will end execution after executed and 
        // newly generated processes have finished
        $pool->setIsEternal(false);
    }
    
    if($iteration % 5 === 0){
        $queue[] = $iteration;
    }
};

$pool = new Pool();
$pool->setProcessFactory($processFactory);
$pool->setWhileListener($whileListener);
$pool->setIsEternal(true);
$pool->execute();

在类中使用

use Symfony\Component\Process\Process;
use Uginroot\AsyncSymfonyProcess\Pool;
use Uginroot\AsyncSymfonyProcess\ProcessWrapper;

class AsyncProcess
{
    private array $indexes;
    private array $results = [];

    public function __construct() {
        $this->indexes = range(1, 10);
    }

    public function processFactory():?Process
    {
        if(count($this->indexes) === 0){
            return null;
        }

        $index = array_shift($this->indexes);
        return Process::fromShellCommandline(sprintf('echo %d', $index));
    }

    public function processCallback(ProcessWrapper $processWrapper):void
    {
        $index = (int)$processWrapper->getOutput();
        $this->results[] = $index;
    }

    public function run():void
    {
        $pool = new Pool();
        $pool
            ->setProcessFactory([$this, 'processFactory'])
            ->setCallback([$this, 'processCallback'])
            ->execute()
        ;
    }
}