webmozarts / console-parallelization
启用 Symfony Console 命令的并行化
Requires
- php: ^8.1
- fidry/cpu-core-counter: ^0.5.0 || ^1.0
- nikic/iter: ^2.2
- psr/log: ^1.1 || ^2.0 || ^3.0
- symfony/console: ^6.4
- symfony/dependency-injection: ^6.4
- symfony/deprecation-contracts: ^2.5 || ^3.1
- symfony/process: ^6.4
- symfony/service-contracts: ^3.3
- thecodingmachine/safe: ^1.3.3 || ^2.4
- webmozart/assert: ^1.5
Requires (Dev)
- ext-json: *
- bamarni/composer-bin-plugin: ^1.8
- ergebnis/composer-normalize: ^2.28
- fidry/makefile: ^1.0
- infection/infection: ^0.27.0 || ^0.28.0
- jangregor/phpstan-prophecy: ^1.0
- phpspec/prophecy-phpunit: ^2.0
- phpstan/extension-installer: ^1.1
- phpstan/phpstan: ^1.8
- phpstan/phpstan-phpunit: ^1.1
- phpstan/phpstan-symfony: ^1.2
- phpstan/phpstan-webmozart-assert: ^1.2
- phpunit/phpunit: ^10.0
- symfony/framework-bundle: ^6.4
- webmozarts/strict-phpunit: ^7.3
- dev-main
- 2.1.2
- 2.1.1
- 2.1.0
- 2.0.2
- 2.0.1
- 2.0.0
- 2.0.0-beta.2
- 2.0.0-beta.1
- 2.0.0-beta.0
- 2.0.0-alpha.0
- 1.x-dev
- 1.2.4
- 1.2.3
- 1.2.2
- 1.2.1
- 1.2.0
- 1.1.0
- 1.0.1
- 1.0.0
- 1.0.0-RC7
- 1.0.0-RC6
- 1.0.0-RC5
- 1.0.0-RC4
- 1.0.0-RC3
- 1.0.0-RC2
- 1.0.0-RC1
- dev-dependabot/github_actions/dot-github/workflows/dependencies-07e5513225
- dev-dependabot/composer/dependencies-22e4308327
This package is auto-updated.
Last update: 2024-09-16 14:02:04 UTC
README
这个库支持对 Symfony Console 命令进行并行化。
工作原理
当您启用多进程启动命令时,主进程会检索 项目 并通过标准输入将它们分配给给定的子进程数量。子进程在处理固定数量的项目(一个 片段)后会被杀死,以防止它们随着时间的推移而减慢。
可选地,子进程的工作可以进一步分成更小的块(批次)。您可以在每个这些批次之前和之后执行某些工作(例如将更改刷新到数据库),以优化命令的性能。
安装
使用 Composer 安装包
composer require webmozarts/console-parallelization
使用方法
要为您的项目添加并行化功能,您可以选择扩展 ParallelCommand
类或使用 Parallelization
特性
use Symfony\Component\Console\Command\Command; use Symfony\Component\Console\Input\InputInterface; use Symfony\Component\Console\Output\OutputInterface; use Webmozarts\Console\Parallelization\ParallelCommand; use Webmozarts\Console\Parallelization\Parallelization; use Webmozarts\Console\Parallelization\Input\ParallelizationInput; class ImportMoviesCommand extends ParallelCommand { public function __construct() { parent::__construct('import:movies'); } protected function configure(): void { parent::configure(); // ... } protected function fetchItems(InputInterface $input, OutputInterface $output): iterable { // open up the file and read movie data... // return items as strings return [ '{"id": 1, "name": "Star Wars"}', '{"id": 2, "name": "Django Unchained"}', // ... ]; } protected function runSingleCommand(string $item, InputInterface $input, OutputInterface $output): void { $movieData = json_decode($item); // insert into the database } protected function getItemName(?int $count): string { if (null === $count) { return 'movie(s)'; } return 1 === $count ? 'movie' : 'movies'; } }
您可以使用常规的 Symfony Console 命令运行此命令
$ bin/console import:movies --main-process Processing 2768 movies in segments of 2768, batches of 50, 1 round, 56 batches in 1 process 2768/2768 [============================] 100% 56 secs/56 secs 32.0 MiB Processed 2768 movies.
或者,如果您愿意,可以使用并行化运行命令
$ bin/console import:movies # or with a specific number of processes instead: $ bin/console import:movies --processes 2 Processing 2768 movies in segments of 50, batches of 50, 56 rounds, 56 batches in 2 processes 2768/2768 [============================] 100% 31 secs/31 secs 32.0 MiB Processed 2768 movies.
API
ParallelCommand 和 Parallelization 特性
此库提供了一个 ParallelCommand
基类和一个 Parallelization
特性。如果您在寻找基本用法,ParallelCommand
应该更容易使用,因为它将严格所需的方法作为抽象方法提供。所有其他钩子都可以通过重写 ::configureParallelExecutableFactory()
方法进行配置。
另一方面,Parallelization
特性默认实现了所有钩子,需要较少的手动任务。但它确实需要调用 ParallelizationInput::configureCommand()
来添加与并行化相关的输入参数和选项。
项目
主进程检索所有需要处理的项目,并通过它们的标准输入(STDIN)将它们传递给子进程。因此,项目必须满足以下两个要求
- 项目必须是字符串
- 项目不得包含换行符
通常,您希望保持项目较小,以便将处理从主进程卸载到子进程。一些典型的项目示例
- 主进程读取文件并将行传递给子进程
- 主进程检索需要更新的数据库行的 ID 并将它们传递给子进程
片段
当您启用多进程运行命令时,通过 fetchItems()
返回的项目将分成固定大小的片段。每个子进程处理一个片段,然后自行杀死。
默认情况下,片段大小与批次大小相同(见下文),但您可以通过选择不同的片段大小来调整命令的性能(理想情况下是批次大小的倍数)。您可以通过重写 getSegmentSize()
方法来实现这一点。
protected function configureParallelExecutableFactory( ParallelExecutorFactory $parallelExecutorFactory, InputInterface $input, OutputInterface $output ): ParallelExecutorFactory { return $parallelExecutorFactory ->withSegmentSize(250); }
批次
默认情况下,批次大小和片段大小相同。如果需要,您可以选择比片段大小更小的批次大小,并在每个批次前后运行自定义代码。您通常会这样做,以便将更改刷新到数据库或释放不再需要的资源。
要在每次批量执行前/后运行代码,请重写钩子 runBeforeBatch()
和 runAfterBatch()
// When using the ParallelCommand protected function runBeforeBatch(InputInterface $input, OutputInterface $output, array $items): void { // e.g. fetch needed resources collectively } protected function runAfterBatch(InputInterface $input, OutputInterface $output, array $items): void { // e.g. flush database changes and free resources } protected function configureParallelExecutableFactory( ParallelExecutorFactory $parallelExecutorFactory, InputInterface $input, OutputInterface $output, ): ParallelExecutorFactory { return $parallelExecutorFactory ->withRunAfterBatch($this->runBeforeBatch(...)) ->withRunAfterBatch($this->runAfterBatch(...)); } // When using the Parallelization trait, this can be simplified a bit: protected function runBeforeBatch( InputInterface $input, OutputInterface $output, array $items ): void { // ... }
您可以通过重写 getBatchSize()
方法来自定义默认的批量大小50
protected function configureParallelExecutableFactory( ParallelExecutorFactory $parallelExecutorFactory, InputInterface $input, OutputInterface $output, ): ParallelExecutorFactory { return $parallelExecutorFactory ->withBatchSize(150); }
配置
该库提供了一系列的配置设置
::getParallelExecutableFactory()
允许您完全配置ParallelExecutorFactory
工厂,从片段、批量大小,到使用的PHP可执行文件,以及任何 进程处理钩子。::configureParallelExecutableFactory()
是一个不同的、更轻量级的扩展点,用于配置ParallelExecutorFactory
工厂。::getContainer()
允许您配置使用哪个容器。默认情况下,如果有应用程序的内核容器,则传递该容器的内核容器。默认错误处理器使用它,在每项操作失败时重置容器,以避免诸如损坏的Doctrine实体管理器等问题。如果您没有使用内核(例如,在Symfony应用程序之外),则默认不返回任何容器。::createErrorHandler()
允许您配置您想要使用的错误处理器。::createLogger()
允许您完全配置您想要的记录器。
钩子
该库支持几种进程钩子,可以通过 ::configureParallelExecutableFactory()
进行配置
*: 当使用 Parallelization
特性时,可以通过重写相应的方法直接配置这些钩子。
订阅的服务
您应该使用 已订阅的服务 或代理。确实,否则您可能会遇到问题,即最初注入到命令中的服务最终可能与容器使用的服务不同。这是因为当发生错误时,将使用 ResetServiceErrorHandler
错误处理器,该处理器在项目失败时重置容器。因此,如果服务不是直接从容器中获取(以获取在容器重置时的新实例),您将使用旧服务。
此问题的常见症状是遇到已关闭的实体管理器问题。
与 Amphp/ReactPHP 的区别
如果您遇到了这个库,想知道它与 Amphp 或 ReactPHP 或其他潜在的并行化库有何不同,本节旨在突出一些差异。
主要差异是并行化机制本身。Amphp或ReactPHP通过启动一个工作进程池并将工作分配给这些工作进程来工作。然而,这个库却启动了一个进程池。更具体地说,差异在于进程的启动方式
- Amphp/ReactPHP工作进程可以共享状态;然而,使用这个库,您不能轻易做到这一点。
- 工作进程可以处理多个作业,而在这个库中,每个片段完成后都会杀死进程。为了将其提高到类似水平,可以将这个库中处理片段的工作视为Amphp/ReactPHP工作进程任务,并且在工作处理完单个任务后杀死工作进程。
另一个差异是,这个库以命令作为其中心点。这提供了以下优势
- 不需要提供额外的上下文:一旦进入子进程,您就像通常一样处于您的命令中。无需自定义引导。
- 命令可以无缝地以并行化或不并行化的方式执行。要模拟子进程的执行,只需使用
--child
选项并通过 STDIN 传递子项即可。 - 通过配置片段和批量大小,更容易适应任务的负载分配和内存泄漏。
贡献
欢迎对该包的贡献!
要运行CS修复工具和测试,可以使用make
命令。更多详细信息请使用make help
命令。
升级
请参阅升级指南。
作者
许可
本软件包的所有内容均受MIT许可证许可。