wwwision / batch-processing
并行执行批处理 CLI 命令
Requires
- php: >=7.4
- react/child-process: ^0.6.4
Requires (Dev)
- roave/security-advisories: dev-latest
README
帮助并行执行 Flow CLI 命令使用 ReactPHP ChildProcess
安装
通过以下方式安装此包:
composer require wwwision/batch-processing
用法
从 Flow CommandController
中调用 BatchProcessRunner::start()
。
示例
<?php use Neos\Flow\Annotations as Flow; use Wwwision\BatchProcessing\BatchProcessRunner; class SomeCommandController extends CommandController { public function someMainCommand(): void { $runner = new BatchProcessRunner('somebatch'); $runner->start(); } /** * @param int $offset zero based offset * @param int $limit size of the batch to process * @internal */ public function someBatchCommand(int $offset, int $limit): void { // process batch of size $limit from $offset } }
(见以下完整示例)
自定义批命令参数
默认情况下,参数 offset
和 limit
将传递给批命令。这可以通过 BatchProcessRunner
的构造函数参数来更改,可以使用占位符 {offset}
和 {limit}
$runner = new BatchProcessRunner('some:batch:command', ['start' => '{offset}', 'size' => '{limit}', 'some-custom-arg' => 'some value']);
渲染进度条
可以使用 ProgressBarRenderer
输出进度条,如上图所示
$progressHandler = ProgressBarRenderer::create($this->output->getOutput()); $runner = new BatchProcessRunner('some:batch:command', null, $progressHandler);
为了允许各个批次报告它们的进度和错误,可以使用 ProgressPipe
$progressPipe = new ProgressPipe(); for ($i = $offset; $i < ($offset + $limit); $i ++) { try { // do something } catch (SomeException $e) { $progressPipe->error($e->getMessage()); } $progressPipe->set($i); }
指定批量大小
默认情况下,批量大小为 500。这意味着对于总共 1234 个任务,批处理命令将被调用三次,带有 offset/limit
的
0/500
500/500
1000/234
可以通过 BatchProcessRunner::setBatchSize()
调整批量大小。
注意: 批量大小合适的尺寸在很大程度上取决于每个任务将花费多少时间以及它将消耗多少内存。
指定池大小
默认情况下,最多 5 个子进程将并行执行。这可以通过 BatchProcessRunner::setPoolSize()
来调整。
注意: 为了防止并行执行批次,将池大小设置为 1
处理完成的进程
由于 BatchProcessRunner::start()
以 非阻塞 方式执行,以下代码 将不会按预期工作
$runner->start($totalAmountOfTasks); $this->outputLine('This might be outputted before the runner has been finished!')
相反,应该使用 BatchProcessRunner::onFinish()
回调来最终化进程
$runner->onFinish(function() { $this->outputLine('This will be outputted when all tasks have been processed'); }); $runner->start($totalAmountOfTasks);
处理错误
为了防止中断批处理或破坏进度条渲染,默认不输出错误。相反,可以通过 BatchProcessRunner::onError()
回调来处理它们
$runner->onError(function(string $message) { $this->outputLine('<error>%s</error>', $message); })
或者,可以使用 onFinish()
来处理批处理过程中发生的所有错误
$runner->onFinish(function(array $errors) { if ($errors !== []) { $this->outputLine('<error>%d errors occurred!</error>', [count($errors)]); } });
完整示例
<?php use Neos\Flow\Cli\CommandController; use Wwwision\BatchProcessing\BatchProcessRunner; use Wwwision\BatchProcessing\ProgressHandler\NullProgressHandler; use Wwwision\BatchProcessing\ProgressHandler\ProgressBarRenderer; use Wwwision\BatchProcessing\ProgressPipe; class SomeCommandController extends CommandController { public function someMainCommand(string $someArg, int $batchSize = null, int $poolSize = null, bool $quiet = false): void { $numberOfTasks = $this->determineNumberOfTasks(); $quiet || $this->outputLine('Processing <b>%d</b> tasks...', [$numberOfTasks]); $progressHandler = $quiet ? new NullProgressHandler() : ProgressBarRenderer::create($this->output->getOutput()); $runner = new BatchProcessRunner('some.package:some:somebatch', ['someArg' => $someArg, 'offset' => '{offset}', 'limit' => '{limit}'], $progressHandler); if ($batchSize !== null) { $runner->setBatchSize($batchSize); } if ($poolSize !== null) { $runner->setPoolSize($poolSize); } $runner->onFinish(function(array $errors) use ($quiet) { if ($errors === []) { $quiet || $this->outputLine('<success>Done</success>'); return; } $this->outputLine('<error>Finished with <b>%d</b> error%s%s</error>', [\count($errors), \count($errors) === 1 ? '' : 's', $quiet ? '' : ':']); if (!$quiet) { foreach ($errors as $error) { $this->outputLine(' %s', [$error]); } } exit(1); }); $runner->start($numberOfTasks); } /** * @param string $someArg some custom argument * @param int $offset zero based offset * @param int $limit size of the batch to import * @internal */ public function someBatchCommand(string $someArg, int $offset, int $limit): void { $processPipe = new ProgressPipe(); foreach ($this->getTasks($offset, $limit) as $task) { try { $task->process(); } catch (\Throwable $e) { $processPipe->error($e->getMessage()); } $processPipe->advance(); } } }
贡献
以问题或拉取请求形式提供的贡献非常受欢迎
许可证
见 LICENSE