kelemen / parallel
并行运行任务的库
Requires
- php: >=7.1
- ext-json: *
- psr/log: ^1.1
- symfony/console: ^4.3|^5.0
- symfony/process: ^4.3|^5.0
Requires (Dev)
- monolog/monolog: ^1.9
- phpunit/phpunit: ^7.5
- squizlabs/php_codesniffer: ^3.4
This package is auto-updated.
Last update: 2024-09-14 08:57:10 UTC
README
用于并行(并发)任务处理的库。使用 symfony/process 实现。
基本表格输出
配置
基本
command.php
#!/usr/bin/env php <?php require_once __DIR__ . '/../vendor/autoload.php'; // First setup Parallel class // If you setup log dir (optional) parallel will automatically create sub folder /stats and log running statistics in json format here. $parallel = new \Parallel\Parallel(__DIR__, 'parallel', 5, 0.1); // Add some tasks (only examples, not part of parallel) // Task is defined by its name and by dependencies (optional) // Dependencies ($runAfter = []) are tasks which have to be done before task can start $parallel->addTask(new \Parallel\AdminsTask('task:admin'), ['task:categories']); $parallel->addTask(new \Parallel\UsersTask('task:user')); $parallel->addTask(new \Parallel\ArticlesTask('task:articles'), ['task:admin', 'task:user']); $parallel->addTask(new \Parallel\CategoriesTask('task:categories')); // Some tasks can be too much resource expensive, so we can define how many tasks can run along this task. // If we setup 0, this task will be run alone although we setup 5 as global max concurrent $parallel->addTask(new \Parallel\ArticleCategoriesTask('task:articlesCategories'), 0); // Run symfony application under hood $parallel->runConsoleApp();
现在只需运行
php command.php parallel:run
仅运行已注册任务的子网
有时你可能只想运行已注册任务的子网(例如,在开发中)。为此,请在 parallel:run
命令中使用 --subnet
选项。--subnet 选项作为正则表达式进行验证,并接受多个值。此外,所有不匹配任何子网正则表达式的依赖项都将从匹配的任务中删除。
# This command run only task:user and task:categories tasks
php command.php parallel:run --subnet task:user$ --subnet task:categories$
日志记录
实现了 PSR logger。因此,我们可以使用 monolog/monolog
。
<?php // Setup PSR logger // ... $parallel->setLogger($psrLogger);
分析命令
Parallel 可以可视化任务依赖图。你只需设置分析目录。输出文件将生成到设置目录,格式为 HTML。
<?php $parallel->setAnalyzeDir(__DIR__ . '/../log');
# Now you can run
php command.php analyze:graph
任务
每个任务都必须返回某种结果(如 SuccessResult、SkipResult、ErrorResult)作为任务/项目处理的结果。
任务类型
SimpleTask
适用于具有静态输入数据处理的任务。
<?php class ImplementedSimpleTask extends SimpleTask { protected function processTask(InputInterface $input, OutputInterface $output): TaskResult { // Do some magic return new SuccessResult(); } }
ProgressTask
如果需要单独处理来自中等数据集的每个项目,则适用。所有源项目一次提供。在某些情况下,这可能对内存过于昂贵(请参阅 BatchProgressTask)。
<?php class ImplementedProgressTask extends ProgressTask { protected function items(): iterable { return DB::table('users'); } protected function itemsCount(): int { return DB::table('users')->count(); } protected function processItem($item): TaskResult { // $item here is one record form users table // It can be anything what is provided by items() method (array, object ...) if (!$item['is_active']) { return new SkipResult(); } file_put_contents('active_users', $item['id'] . "/n", FILE_APPEND | LOCK_EX); return SuccessResult(); } }
BatchProgressTask
最先进的任务。如果需要单独处理来自大型数据集的每个项目,则适用。项目可以分批提供和处理。
<?php class ImplementedBatchProgressTask extends BatchProgressTask { protected function startup(): void { // Here you can prepare data } protected function shutdown(): void { // Here you can run cleanup } protected function items(int $processed): iterable { // Fetch data (eg. from database) by 500 and use offset return DB::table('users')->limit(500)->offset($processed); } protected function itemsCount(): int { // Count ALL data that will be processed return DB::table('users')->count(); } protected function processItem($item): TaskResult { // $item here is one record form users table if (!$item['is_active']) { return new SkipResult(); } return new SuccessResult([ 'id' => $item['id'], 'name' => $item['name'] ]); } protected function batch(array $items): void { DB::table('active_users')->insert($items); } }
消息
在 ProgressTask
和 BatchProgressTask
中的任何位置都可以使用 sendMessage($message)
方法将消息发送到输出。
TaskGenerator
Parallel 库中的 TaskGenerator 接口允许你动态生成任务。当需要根据可能事先未知的标准或输入数据创建任务时,这非常有用。
创建新的 TaskGenerator
要创建新的 TaskGenerator,实现 TaskGenerator 接口。此接口需要实现两个方法:getName 和 generateTasks。
以下是如何创建新 TaskGenerator 的示例
namespace Parallel\TaskGenerator; use Parallel\Task; class CustomTaskGenerator implements TaskGenerator { private $name; private $task; private $runAfter = []; private $maxConcurrentTasksCount; private $chunkSize; public function __construct(string $name, Task $task, int $chunksCount, array $runAfter = [], ?int $maxConcurrentTasksCount = null) { $this->name = $name; $this->task = $task; $this->chunkSize = $chunksCount; $this->runAfter = $runAfter; $this->maxConcurrentTasksCount = $maxConcurrentTasksCount; } public function getName(): string { return $this->name; } public function generateTasks(): array { $tasks = []; for ($i = 1; $i <= $this->chunkSize; $i++) { $tasks[] = new BaseGeneratedTask($this->task, $this->runAfter, $this->maxConcurrentTasksCount); } return $tasks; } }
将 TaskGenerator 添加到 Parallel
创建 TaskGenerator 后,将其添加到 Parallel 实例。这允许动态生成的任务包含在并行处理中。
$taskGenerator = new CustomTaskGenerator('custom', new \Parallel\SomeTask('task:custom'), 10, ['task:dependency'], 2); $parallel->addTaskGenerator($taskGenerator);
与生成的任务一起工作
由 TaskGenerator 生成的任务可以像常规任务一样具有依赖关系。在创建生成的任务时指定这些依赖关系。Parallel 库确保这些任务按正确顺序执行。
public function generateTasks(): array { $tasks = []; for ($i = 1; $i <= $this->chunkSize; $i++) { $tasks[] = new BaseGeneratedTask($this->task, ['task:dependency'], $this->maxConcurrentTasksCount); } return $tasks; }
在此示例中,每个生成的任务都依赖于任务:dependency。
为在生成任务之后运行的任务设置依赖关系
要为应在 TaskGenerator 生成的任务之后运行的任务设置依赖关系,请引用生成器的名称。这确保任务将在生成器生成的所有任务完成之前开始。
$parallel->addTask(new \Parallel\SomeOtherTask('task:afterGenerated'), ['custom']);
在此示例中,task:afterGenerated
将仅在 CustomTaskGenerator(名为 custom
)生成的所有任务完成后才开始。afterGenerated` 将仅在 CustomTaskGenerator 生成的所有任务完成后才开始。