webmozarts/console-parallelization

启用 Symfony Console 命令的并行化

2.1.2 2024-04-01 21:58 UTC

README

这个库支持对 Symfony Console 命令进行并行化。

工作原理

当您启用多进程启动命令时,主进程会检索 项目 并通过标准输入将它们分配给给定的子进程数量。子进程在处理固定数量的项目(一个 片段)后会被杀死,以防止它们随着时间的推移而减慢。

可选地,子进程的工作可以进一步分成更小的块(批次)。您可以在每个这些批次之前和之后执行某些工作(例如将更改刷新到数据库),以优化命令的性能。

安装

使用 Composer 安装包

composer require webmozarts/console-parallelization

使用方法

要为您的项目添加并行化功能,您可以选择扩展 ParallelCommand 类或使用 Parallelization 特性

use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Output\OutputInterface;
use Webmozarts\Console\Parallelization\ParallelCommand;
use Webmozarts\Console\Parallelization\Parallelization;
use Webmozarts\Console\Parallelization\Input\ParallelizationInput;

class ImportMoviesCommand extends ParallelCommand
{
    public function __construct()
    {
        parent::__construct('import:movies');
    }

    protected function configure(): void
    {
        parent::configure();
        
        // ...
    }

    protected function fetchItems(InputInterface $input, OutputInterface $output): iterable
    {
        // open up the file and read movie data...

        // return items as strings
        return [
            '{"id": 1, "name": "Star Wars"}',
            '{"id": 2, "name": "Django Unchained"}',
            // ...
        ];
    }

    protected function runSingleCommand(string $item, InputInterface $input, OutputInterface $output): void
    {
        $movieData = json_decode($item);
   
        // insert into the database
    }

    protected function getItemName(?int $count): string
    {
        if (null === $count) {
            return 'movie(s)';
        }

        return 1 === $count ? 'movie' : 'movies';
    }
}

您可以使用常规的 Symfony Console 命令运行此命令

$ bin/console import:movies --main-process
Processing 2768 movies in segments of 2768, batches of 50, 1 round, 56 batches in 1 process

 2768/2768 [============================] 100% 56 secs/56 secs 32.0 MiB
            
Processed 2768 movies.

或者,如果您愿意,可以使用并行化运行命令

$ bin/console import:movies
# or with a specific number of processes instead:
$ bin/console import:movies --processes 2
Processing 2768 movies in segments of 50, batches of 50, 56 rounds, 56 batches in 2 processes

 2768/2768 [============================] 100% 31 secs/31 secs 32.0 MiB
            
Processed 2768 movies.

API

ParallelCommand 和 Parallelization 特性

此库提供了一个 ParallelCommand 基类和一个 Parallelization 特性。如果您在寻找基本用法,ParallelCommand 应该更容易使用,因为它将严格所需的方法作为抽象方法提供。所有其他钩子都可以通过重写 ::configureParallelExecutableFactory() 方法进行配置。

另一方面,Parallelization 特性默认实现了所有钩子,需要较少的手动任务。但它确实需要调用 ParallelizationInput::configureCommand() 来添加与并行化相关的输入参数和选项。

项目

主进程检索所有需要处理的项目,并通过它们的标准输入(STDIN)将它们传递给子进程。因此,项目必须满足以下两个要求

  • 项目必须是字符串
  • 项目不得包含换行符

通常,您希望保持项目较小,以便将处理从主进程卸载到子进程。一些典型的项目示例

  • 主进程读取文件并将行传递给子进程
  • 主进程检索需要更新的数据库行的 ID 并将它们传递给子进程

片段

当您启用多进程运行命令时,通过 fetchItems() 返回的项目将分成固定大小的片段。每个子进程处理一个片段,然后自行杀死。

默认情况下,片段大小与批次大小相同(见下文),但您可以通过选择不同的片段大小来调整命令的性能(理想情况下是批次大小的倍数)。您可以通过重写 getSegmentSize() 方法来实现这一点。

protected function configureParallelExecutableFactory(
      ParallelExecutorFactory $parallelExecutorFactory,
      InputInterface $input,
      OutputInterface $output
): ParallelExecutorFactory {
    return $parallelExecutorFactory
        ->withSegmentSize(250);
}

批次

默认情况下,批次大小和片段大小相同。如果需要,您可以选择比片段大小更小的批次大小,并在每个批次前后运行自定义代码。您通常会这样做,以便将更改刷新到数据库或释放不再需要的资源。

要在每次批量执行前/后运行代码,请重写钩子 runBeforeBatch()runAfterBatch()

// When using the ParallelCommand
protected function runBeforeBatch(InputInterface $input, OutputInterface $output, array $items): void
{
    // e.g. fetch needed resources collectively
}

protected function runAfterBatch(InputInterface $input, OutputInterface $output, array $items): void
{
    // e.g. flush database changes and free resources
}

protected function configureParallelExecutableFactory(
      ParallelExecutorFactory $parallelExecutorFactory,
      InputInterface $input,
      OutputInterface $output,
): ParallelExecutorFactory {
    return $parallelExecutorFactory
        ->withRunAfterBatch($this->runBeforeBatch(...))
        ->withRunAfterBatch($this->runAfterBatch(...));
}

// When using the Parallelization trait, this can be simplified a bit:
protected function runBeforeBatch(
    InputInterface $input,
    OutputInterface $output,
    array $items
): void {
    // ...
}

您可以通过重写 getBatchSize() 方法来自定义默认的批量大小50

protected function configureParallelExecutableFactory(
      ParallelExecutorFactory $parallelExecutorFactory,
      InputInterface $input,
      OutputInterface $output,
): ParallelExecutorFactory {
    return $parallelExecutorFactory
        ->withBatchSize(150);
}

配置

该库提供了一系列的配置设置

  • ::getParallelExecutableFactory() 允许您完全配置 ParallelExecutorFactory 工厂,从片段、批量大小,到使用的PHP可执行文件,以及任何 进程处理钩子
  • ::configureParallelExecutableFactory() 是一个不同的、更轻量级的扩展点,用于配置 ParallelExecutorFactory 工厂。
  • ::getContainer() 允许您配置使用哪个容器。默认情况下,如果有应用程序的内核容器,则传递该容器的内核容器。默认错误处理器使用它,在每项操作失败时重置容器,以避免诸如损坏的Doctrine实体管理器等问题。如果您没有使用内核(例如,在Symfony应用程序之外),则默认不返回任何容器。
  • ::createErrorHandler() 允许您配置您想要使用的错误处理器。
  • ::createLogger() 允许您完全配置您想要的记录器。

钩子

该库支持几种进程钩子,可以通过 ::configureParallelExecutableFactory() 进行配置

*: 当使用 Parallelization 特性时,可以通过重写相应的方法直接配置这些钩子。

订阅的服务

您应该使用 已订阅的服务 或代理。确实,否则您可能会遇到问题,即最初注入到命令中的服务最终可能与容器使用的服务不同。这是因为当发生错误时,将使用 ResetServiceErrorHandler 错误处理器,该处理器在项目失败时重置容器。因此,如果服务不是直接从容器中获取(以获取在容器重置时的新实例),您将使用旧服务。

此问题的常见症状是遇到已关闭的实体管理器问题。

与 Amphp/ReactPHP 的区别

如果您遇到了这个库,想知道它与 AmphpReactPHP 或其他潜在的并行化库有何不同,本节旨在突出一些差异。

主要差异是并行化机制本身。Amphp或ReactPHP通过启动一个工作进程池并将工作分配给这些工作进程来工作。然而,这个库却启动了一个进程池。更具体地说,差异在于进程的启动方式

  • Amphp/ReactPHP工作进程可以共享状态;然而,使用这个库,您不能轻易做到这一点。
  • 工作进程可以处理多个作业,而在这个库中,每个片段完成后都会杀死进程。为了将其提高到类似水平,可以将这个库中处理片段的工作视为Amphp/ReactPHP工作进程任务,并且在工作处理完单个任务后杀死工作进程。

另一个差异是,这个库以命令作为其中心点。这提供了以下优势

  • 不需要提供额外的上下文:一旦进入子进程,您就像通常一样处于您的命令中。无需自定义引导。
  • 命令可以无缝地以并行化或不并行化的方式执行。要模拟子进程的执行,只需使用 --child 选项并通过 STDIN 传递子项即可。
  • 通过配置片段和批量大小,更容易适应任务的负载分配和内存泄漏。

贡献

欢迎对该包的贡献!

要运行CS修复工具和测试,可以使用make命令。更多详细信息请使用make help命令。

升级

请参阅升级指南

作者

许可

本软件包的所有内容均受MIT许可证许可。