kelemen/parallel

并行运行任务的库

2.3.0 2024-05-14 08:24 UTC

This package is auto-updated.

Last update: 2024-09-14 08:57:10 UTC


README

用于并行(并发)任务处理的库。使用 symfony/process 实现。

基本表格输出

output

配置

基本

command.php

#!/usr/bin/env php
<?php

require_once __DIR__ . '/../vendor/autoload.php';

// First setup Parallel class
// If you setup log dir (optional) parallel will automatically create sub folder /stats and log running statistics in json format here.
$parallel = new \Parallel\Parallel(__DIR__, 'parallel', 5, 0.1);

// Add some tasks (only examples, not part of parallel)
// Task is defined by its name and by dependencies (optional)
// Dependencies ($runAfter = []) are tasks which have to be done before task can start
$parallel->addTask(new \Parallel\AdminsTask('task:admin'), ['task:categories']);
$parallel->addTask(new \Parallel\UsersTask('task:user'));
$parallel->addTask(new \Parallel\ArticlesTask('task:articles'), ['task:admin', 'task:user']);
$parallel->addTask(new \Parallel\CategoriesTask('task:categories'));

// Some tasks can be too much resource expensive, so we can define how many tasks can run along this task.
// If we setup 0, this task will be run alone although we setup 5 as global max concurrent
$parallel->addTask(new \Parallel\ArticleCategoriesTask('task:articlesCategories'), 0);

// Run symfony application under hood
$parallel->runConsoleApp();

现在只需运行

php command.php parallel:run

仅运行已注册任务的子网

有时你可能只想运行已注册任务的子网(例如,在开发中)。为此,请在 parallel:run 命令中使用 --subnet 选项。--subnet 选项作为正则表达式进行验证,并接受多个值。此外,所有不匹配任何子网正则表达式的依赖项都将从匹配的任务中删除。

# This command run only task:user and task:categories tasks
php command.php parallel:run --subnet task:user$ --subnet task:categories$

日志记录

实现了 PSR logger。因此,我们可以使用 monolog/monolog

<?php
// Setup PSR logger
// ...
$parallel->setLogger($psrLogger);

分析命令

Parallel 可以可视化任务依赖图。你只需设置分析目录。输出文件将生成到设置目录,格式为 HTML。

<?php
$parallel->setAnalyzeDir(__DIR__ . '/../log');
# Now you can run
php command.php analyze:graph

任务

每个任务都必须返回某种结果(如 SuccessResult、SkipResult、ErrorResult)作为任务/项目处理的结果。

任务类型

SimpleTask

适用于具有静态输入数据处理的任务。

<?php
class ImplementedSimpleTask extends SimpleTask
{
    protected function processTask(InputInterface $input, OutputInterface $output): TaskResult
    {
        // Do some magic
        return new SuccessResult();
    }
}

ProgressTask

如果需要单独处理来自中等数据集的每个项目,则适用。所有源项目一次提供。在某些情况下,这可能对内存过于昂贵(请参阅 BatchProgressTask)。

<?php
class ImplementedProgressTask extends ProgressTask
{
    protected function items(): iterable
    {
        return DB::table('users');
    }

    protected function itemsCount(): int
    {
        return DB::table('users')->count();
    }

    protected function processItem($item): TaskResult
    {
        // $item here is one record form users table
        // It can be anything what is provided by items() method (array, object ...)
        if (!$item['is_active']) {
            return new SkipResult();
        }
        
        file_put_contents('active_users', $item['id'] . "/n", FILE_APPEND | LOCK_EX);
        return SuccessResult();
    }
}

BatchProgressTask

最先进的任务。如果需要单独处理来自大型数据集的每个项目,则适用。项目可以分批提供和处理。

<?php
class ImplementedBatchProgressTask extends BatchProgressTask
{
    protected function startup(): void
    {
        // Here you can prepare data
    }

    protected function shutdown(): void
    {
        // Here you can run cleanup
    }

    protected function items(int $processed): iterable
    {
        // Fetch data (eg. from database) by 500 and use offset
        return DB::table('users')->limit(500)->offset($processed);
    }

    protected function itemsCount(): int
    {
        // Count ALL data that will be processed
        return DB::table('users')->count();
    }

    protected function processItem($item): TaskResult
    {
        // $item here is one record form users table
        if (!$item['is_active']) {
            return new SkipResult();
        }
        
        return new SuccessResult([
            'id' => $item['id'],
            'name' => $item['name']
        ]);
    }

    protected function batch(array $items): void
    {
        DB::table('active_users')->insert($items);
    }
}

消息

ProgressTaskBatchProgressTask 中的任何位置都可以使用 sendMessage($message) 方法将消息发送到输出。

TaskGenerator

Parallel 库中的 TaskGenerator 接口允许你动态生成任务。当需要根据可能事先未知的标准或输入数据创建任务时,这非常有用。

创建新的 TaskGenerator

要创建新的 TaskGenerator,实现 TaskGenerator 接口。此接口需要实现两个方法:getName 和 generateTasks。

以下是如何创建新 TaskGenerator 的示例

namespace Parallel\TaskGenerator;

use Parallel\Task;

class CustomTaskGenerator implements TaskGenerator
{
private $name;
private $task;
private $runAfter = [];
private $maxConcurrentTasksCount;
private $chunkSize;

    public function __construct(string $name, Task $task, int $chunksCount, array $runAfter = [], ?int $maxConcurrentTasksCount = null)
    {
        $this->name = $name;
        $this->task = $task;
        $this->chunkSize = $chunksCount;
        $this->runAfter = $runAfter;
        $this->maxConcurrentTasksCount = $maxConcurrentTasksCount;
    }

    public function getName(): string
    {
        return $this->name;
    }

    public function generateTasks(): array
    {
        $tasks = [];
        for ($i = 1; $i <= $this->chunkSize; $i++) {
            $tasks[] = new BaseGeneratedTask($this->task, $this->runAfter, $this->maxConcurrentTasksCount);
        }
        return $tasks;
    }
}

将 TaskGenerator 添加到 Parallel

创建 TaskGenerator 后,将其添加到 Parallel 实例。这允许动态生成的任务包含在并行处理中。

$taskGenerator = new CustomTaskGenerator('custom', new \Parallel\SomeTask('task:custom'), 10, ['task:dependency'], 2);
$parallel->addTaskGenerator($taskGenerator);

与生成的任务一起工作

由 TaskGenerator 生成的任务可以像常规任务一样具有依赖关系。在创建生成的任务时指定这些依赖关系。Parallel 库确保这些任务按正确顺序执行。

public function generateTasks(): array
{
    $tasks = [];
    for ($i = 1; $i <= $this->chunkSize; $i++) {
        $tasks[] = new BaseGeneratedTask($this->task, ['task:dependency'], $this->maxConcurrentTasksCount);
    }
    return $tasks;
}

在此示例中,每个生成的任务都依赖于任务:dependency。

为在生成任务之后运行的任务设置依赖关系

要为应在 TaskGenerator 生成的任务之后运行的任务设置依赖关系,请引用生成器的名称。这确保任务将在生成器生成的所有任务完成之前开始。

$parallel->addTask(new \Parallel\SomeOtherTask('task:afterGenerated'), ['custom']);

在此示例中,task:afterGenerated 将仅在 CustomTaskGenerator(名为 custom)生成的所有任务完成后才开始。afterGenerated` 将仅在 CustomTaskGenerator 生成的所有任务完成后才开始。