soarce/parallel-process-dispatcher

一个用于在后台和/或并行运行作业的迷你PHP库

6.1.0 2023-11-29 20:41 UTC

README

这个微库包含两个类。其中一个封装了一个(Linux命令行)进程到对象中,并允许异步运行而不产生死锁。另一个是多进程调度器,它接受任意数量的上述进程并将它们同时运行(最多并发进程数)。

使用示例

  • 调度长时间运行的cron作业,例如主要等待webservice响应的作业(可以运行比最大CPU更多的进程)
  • 运行监听队列的后台工作者(最大应为CPU数量)
  • 同时在Web应用程序中运行命令行任务,例如PDF生成、图像处理等。

安装

将以下内容添加到您的 composer.json

{
    "require": {
        "soarce/parallel-process-dispatcher": "*"
    }
}

或者在项目的根目录下运行以下命令

$ composer require "soarce/parallel-process-dispatcher"

使用方法

进程

$process = new Process('pngcrush --brute background.png');
$process->start();

// optional: do something else in your application

while (! $process->isFinished() ) {
    usleep(1000); //wait 1ms until next poll
}
echo $process->getOutput();

调度器

$process1 = new Process('pngcrush --brute background.png');
$process2 = new Process('pngcrush --brute welcome.png'); 
$process3 = new Process('pngcrush --brute logo.png'); 

$dispatcher = new Dispatcher(2);    // will make sure only two of those will actually run at the same time
$dispatcher->addProcess($process1);
$dispatcher->addProcess($process2);
$dispatcher->addProcess($process3);

$dispatcher->dispatch();  // this will run until all processes are finished.

$processes = $dispatcher->getFinishedProcesses();

foreach ($processes as $process) {
    echo $process->getOutput(), "\n\n";
}

高级

使用进程和调度器启动多个进程,然后收集结果

$dispatcher = new Dispatcher(2);

$process1 = new Process('pngcrush --brute background.png');
$dispatcher->addProcess($process1, true);   // true starts the process if there are still free slots

// [... more code ...]

$process2 = new Process('pngcrush --brute welcome.png'); 
$dispatcher->addProcess($process2, true);

// [... more code ...]

// during code execution, the dispatcher cannot remove finished processes from the stack, so you have to call the tick()-function
// if you want the queue to advance - but it's optional since at latest the __destruct() function will call dispatch(); 
$dispatcher->tick();

// [... more code ...]

$dispatcher->dispatch();  // this will make the dispatcher wait until all the processes are finished, if they are still running

$processes = $dispatcher->getFinishedProcesses();

// loop over results

在作业运行时读取输出

一个可能的用例是运行多个爬虫在不同的慢速文件系统或网站上,以生成某些可下载或备份数据文件的列表,同时主进程跟踪列表,删除重复项并写入备份。

$dispatcher = new Dispatcher(2);

$dispatcher->addProcess(new ProcessLineOutput("...", 'job1'));
$dispatcher->addProcess(new ProcessLineOutput("...", 'job2'));
$dispatcher->addProcess(new ProcessLineOutput("...", 'job3'));

$oa = new OutputAggregator($dispatcher);
foreach ($oa->getOutput() as $job => $line) {
    echo $job, ': ', $line;
}

函数 OutputAggregator::getOutput() 返回一个生成器,该生成器返回作业的名称(此处为job1-3)作为键,输出行作为值。与数组不同,键几乎肯定会出现多次。

已知问题

进程

  • PHP内部:请注意,如果子进程产生输出,它将写入一个缓冲区,直到缓冲区满。如果缓冲区满了,子进程会暂停,直到父进程从缓冲区读取并腾出空间。这是在isFinished()方法中完成的。调度器定期调用此方法以防止死锁。如果您单独使用进程类,有多种方法可以防止这种情况:
    • 在循环中自行调用isFinished(),使用tick函数或其他方式在脚本执行期间
    • 而不是写入stdOut,将输出重定向到临时文件,并使用其名称作为输出。
    • 使用OutputAggregator和ProcessLineOutput的组合,并在输出到达时立即处理输出。如果您的作业生成大量输出,这可能会消耗大量RAM作为缓冲区,因此请设置足够高的限制。

调度器

  • 多个调度器(在不同进程中)互不感知。因此,如果您有一个使用调度器调用另一个脚本的脚本,该脚本本身又使用调度器来生成多个进程,那么您将最终得到超过最大值的子进程,因此请相应地选择最大值或使用队列(例如Redis)并通过例如在redis栈中注册正在运行的工人使工作者相互感知。