soarce / parallel-process-dispatcher
一个用于在后台和/或并行运行作业的迷你PHP库
6.1.0
2023-11-29 20:41 UTC
Requires
- php: >=8.0,<8.3
Requires (Dev)
Replaces
This package is auto-updated.
Last update: 2024-08-29 22:44:32 UTC
README
这个微库包含两个类。其中一个封装了一个(Linux命令行)进程到对象中,并允许异步运行而不产生死锁。另一个是多进程调度器,它接受任意数量的上述进程并将它们同时运行(最多并发进程数)。
使用示例
- 调度长时间运行的cron作业,例如主要等待webservice响应的作业(可以运行比最大CPU更多的进程)
- 运行监听队列的后台工作者(最大应为CPU数量)
- 同时在Web应用程序中运行命令行任务,例如PDF生成、图像处理等。
安装
将以下内容添加到您的 composer.json
{
"require": {
"soarce/parallel-process-dispatcher": "*"
}
}
或者在项目的根目录下运行以下命令
$ composer require "soarce/parallel-process-dispatcher"
使用方法
类
进程
$process = new Process('pngcrush --brute background.png'); $process->start(); // optional: do something else in your application while (! $process->isFinished() ) { usleep(1000); //wait 1ms until next poll } echo $process->getOutput();
调度器
$process1 = new Process('pngcrush --brute background.png'); $process2 = new Process('pngcrush --brute welcome.png'); $process3 = new Process('pngcrush --brute logo.png'); $dispatcher = new Dispatcher(2); // will make sure only two of those will actually run at the same time $dispatcher->addProcess($process1); $dispatcher->addProcess($process2); $dispatcher->addProcess($process3); $dispatcher->dispatch(); // this will run until all processes are finished. $processes = $dispatcher->getFinishedProcesses(); foreach ($processes as $process) { echo $process->getOutput(), "\n\n"; }
高级
使用进程和调度器启动多个进程,然后收集结果
$dispatcher = new Dispatcher(2); $process1 = new Process('pngcrush --brute background.png'); $dispatcher->addProcess($process1, true); // true starts the process if there are still free slots // [... more code ...] $process2 = new Process('pngcrush --brute welcome.png'); $dispatcher->addProcess($process2, true); // [... more code ...] // during code execution, the dispatcher cannot remove finished processes from the stack, so you have to call the tick()-function // if you want the queue to advance - but it's optional since at latest the __destruct() function will call dispatch(); $dispatcher->tick(); // [... more code ...] $dispatcher->dispatch(); // this will make the dispatcher wait until all the processes are finished, if they are still running $processes = $dispatcher->getFinishedProcesses(); // loop over results
在作业运行时读取输出
一个可能的用例是运行多个爬虫在不同的慢速文件系统或网站上,以生成某些可下载或备份数据文件的列表,同时主进程跟踪列表,删除重复项并写入备份。
$dispatcher = new Dispatcher(2); $dispatcher->addProcess(new ProcessLineOutput("...", 'job1')); $dispatcher->addProcess(new ProcessLineOutput("...", 'job2')); $dispatcher->addProcess(new ProcessLineOutput("...", 'job3')); $oa = new OutputAggregator($dispatcher); foreach ($oa->getOutput() as $job => $line) { echo $job, ': ', $line; }
函数 OutputAggregator::getOutput() 返回一个生成器,该生成器返回作业的名称(此处为job1-3)作为键,输出行作为值。与数组不同,键几乎肯定会出现多次。
已知问题
进程
- PHP内部:请注意,如果子进程产生输出,它将写入一个缓冲区,直到缓冲区满。如果缓冲区满了,子进程会暂停,直到父进程从缓冲区读取并腾出空间。这是在isFinished()方法中完成的。调度器定期调用此方法以防止死锁。如果您单独使用进程类,有多种方法可以防止这种情况:
- 在循环中自行调用isFinished(),使用tick函数或其他方式在脚本执行期间
- 而不是写入stdOut,将输出重定向到临时文件,并使用其名称作为输出。
- 使用OutputAggregator和ProcessLineOutput的组合,并在输出到达时立即处理输出。如果您的作业生成大量输出,这可能会消耗大量RAM作为缓冲区,因此请设置足够高的限制。
调度器
- 多个调度器(在不同进程中)互不感知。因此,如果您有一个使用调度器调用另一个脚本的脚本,该脚本本身又使用调度器来生成多个进程,那么您将最终得到超过最大值的子进程,因此请相应地选择最大值或使用队列(例如Redis)并通过例如在redis栈中注册正在运行的工人使工作者相互感知。