riki137/multitron

管理任务快速异步和多线程执行的工具。专注于性能和友好的命令行界面。

0.1.2 2024-08-23 09:32 UTC

This package is auto-updated.

Last update: 2024-09-23 09:38:38 UTC


README

Multitron是一个基于PHP的任务编排和执行库,旨在高效地运行和管理复杂任务树。它设计用于处理并行任务执行、进程间通信和资源管理。

⚠️ 这个库仍在早期开发阶段,未来可能会有破坏性变更。然而,它已经在生产中使用并正在积极开发。

特性

  • 任务编排和依赖管理
  • 可配置任务并发级别的并发执行
  • 实时进度更新和日志记录
  • 支持任务分区和高级路由
  • 通过通道和信号量实现共享内存和进程间通信
  • 使用PSR-3和Tracy进行错误处理和日志记录

安装

可以使用Composer安装Multitron

composer require riki137/multitron

使用方法

基本任务树

通过实现Task接口或扩展SimpleTask抽象类来定义自定义任务。使用依赖注入容器将这些任务注入到任务树中。

use Multitron\Container\TaskTree;
use Psr\Container\ContainerInterface;

class MyTaskTree extends TaskTree
{
    public function __construct(ContainerInterface $container)
    {
        parent::__construct($container);
    }

    public function build(): Generator
    {
        yield $cacheClear = $this->group('cache-clear', function() {
            yield $this->task(ClearCacheTask::class);
            yield $this->task(ClearLogsTask::class);
        });
        yield $this->task(OtherCacheClearTask::class)->belongsTo($cacheClear);
        
        yield $this->task(MyFirstTask::class);
        yield $secondTask = $this->task(MySecondTask::class);
        yield $thirdTask = $this->task(MyThirdTask::class)->dependsOn($secondTask);
        yield $this->partitioned(MyPartitionedTask::class, 4)->dependsOn($thirdTask, $cacheClear);
        
    }
}

运行任务树

要运行任务树,请使用适当的参数实例化Multitron命令类

use Multitron\Multitron;
use Multitron\Error\TracyErrorHandler;
use Symfony\Component\Console\Application;

$taskTree = new MyTaskTree($container);
$bootstrapPath = '/path/to/bootstrap.php'; // Path to the bootstrap file that returns an instance of a PSR container or Nette Container
$concurrentTasks = 4; // Number of concurrent tasks
$errorHandler = new TracyErrorHandler(); // see below for PSR logger

$multitron = new Multitron($taskTree, $bootstrapPath, $concurrentTasks, $errorHandler);

$application = new Application();
$application->add($multitron);
$application->run();

错误日志记录

使用PSR-3日志记录器或Tracy进行错误处理,以配置详细的错误报告。

PSR-3日志记录器

use Monolog\Logger;
use Monolog\Handler\StreamHandler;
use Multitron\Error\PsrLogErrorHandler;

$logger = new Logger('multitron');
$logger->pushHandler(new StreamHandler('path/to/logfile.log', Logger::ERROR));

$errorHandler = new PsrLogErrorHandler($logger);

Tracy

use Multitron\Error\TracyErrorHandler;
$errorHandler = new TracyErrorHandler();

进度报告和日志记录

Multitron提供了实时日志记录和进度更新,可以使用提供的类进行配置。

use Multitron\Output\TableOutput;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\ConsoleOutputInterface;

$tableOutput = new TableOutput();
$tableOutput->configure($inputConfiguration);

任务分区

将任务分区以并行运行任务块,每个任务块在数据子集上操作。

use Multitron\Container\Node\PartitionedTaskGroupNode;

$partitionedNode = new PartitionedTaskGroupNode("MyPartitionedTask", function() use ($container) {
    return $container->get(MyPartitionedTask::class);
}, 4); // Partition into 4 chunks

带有TaskCommunicator的中心缓存

Multitron中的中心缓存为任务提供了一个全局共享内存空间,以便任务可以高效地在不同的任务实例之间读取和写入数据。此功能通过充当execute方法中这些操作的中间人的TaskCommunicator来实现。

TaskCommunicator提供的操作

在任务执行的上下文中,TaskCommunicator提供了四个关键方法,用于与中心缓存交互

  1. $comm->read(string $key): ?array

    此方法允许任务从与指定键关联的中心缓存中读取数据。如果键存在,它检索相应的数据并将其作为数组返回。如果键不存在,则返回null

    示例用法

    $data = $comm->read('task_result');
    if ($data !== null) {
        // Process the retrieved data
    } else {
        // Handle the absence of data
    }
  2. $comm->readSubset(string $key, array $subkeys): ?array

    此方法读取存储在指定键下的数据子集。它接受一个键和一个子键数组。如果存在,则检索每个提供的子键的数据,返回包含子键及其对应值的关联数组。如果任何子键未找到,则简单地从结果中省略它们。

    示例用法

    $subData = $comm->readSubset('user_emails', [123,126]); // Retrieve emails for user IDs 123 and 126
    // Note: Similarly to array_intersect_key(), only the existing keys are returned
    $email = $subData[123] ?? null;
  3. $comm->write(string $key, array &$data): Future

    此方法允许任务将数据写入或更新与指定键关联的中心缓存。它接受键和数据数组的引用。方法异步写入数据,并返回一个Future,允许任务在等待数据写入时继续执行。

    示例用法

    $dataToStore = [123 => 'richard@popelis.sk', 124 => 'fero@example.org'];
    $comm->write('user_emails', $dataToStore)->await();
  4. $comm->merge(string $key, array $data, int $level = 1): Future

    《merge》方法用于将新数据合并到中央缓存中指定键下的现有数据结构中。它需要一个键、要合并的数据以及一个可选的合并级别,表示数组结构应该合并多深。这对于层次或嵌套数据结构特别有用。

    示例用法

    $newResults = ['subtask1' => ['status' => 'done'], 'subtask2' => ['status' => 'pending']];
    $comm->merge('project_results', $newResults, 1)->await();
    // The project_results key will have its data updated without overwriting existing entries

使用总结

因此,《TaskCommunicator》作为通过中央缓存管理共享状态的强大工具。通过利用上述方法,任务可以有效地记录和检索执行所需的数据,增强跨可能复杂且并行化的工作流程的协调和状态管理。

这些方法中的每一个都是为了在任务执行阶段方便和高效地共享数据而设计的,允许开发者构建可扩展和响应的系统,充分利用可用资源,同时保持共享数据的完整性。

贡献

欢迎贡献、问题和功能请求!

如果您想贡献,请随时查看问题页面

许可证

Multitron采用MIT许可证。有关更多详细信息,请参阅LICENSE文件。