toalett / multiprocessing
易于多进程处理的上下文
Requires
- php: >=7.4
- ext-pcntl: *
- evenement/evenement: ^3.0
- react/event-loop: ^1.1
Requires (Dev)
- phpunit/phpunit: ^9.5
This package is auto-updated.
Last update: 2024-09-13 23:56:50 UTC
README
欢迎来到 Toalett,这是一个围绕“所有软件都是💩”这一理念发起的谦逊的倡议。
Toalett 是挪威语中“马桶”的意思,听起来比“马桶”更高端。
为什么选择 toalett/multiprocessing?
多进程处理 是一种在 PHP (cli) 应用程序中经常使用的技术,用于异步执行任务。由于 PHP 缺乏原生的 多线程,开发者必须依赖于传统的多进程处理来做到这一点。
我们经常看到为了完成这项任务而编写的快速且草率的代码,其中包含对 pcntl_fork() 的调用隐藏在某个地方,导致实现方式丑陋。
Toalett 并不反对快速且草率的 PHP 代码,但鉴于多进程处理如此普遍,使用这个库可能是个不错的选择。
好吧,如何使用它?
使用 composer 安装
$ composer require toalett/multiprocessing
结构
该库提供了一个用于管理多进程处理的单一类:Context。它内部使用 react/event-loop,并通过简单的(但优雅的)evenement/evenement 库发出事件。它将任务委托给内部的 Workers 组件,该组件负责创建和管理子进程。
创建一个 Context
该库包含了一个 ContextBuilder 类,用于构建 Context。它可以提供一个 Concurrency 限制(默认为无限制),一个自定义的 \React\EventLoop\LoopInterface 实例,以及一个在哪个时间间隔内应该执行子进程清理的 Interval。要创建一个 Context,只需调用 build() 方法
use Toalett\Multiprocessing\ContextBuilder; $builder = ContextBuilder::create(); $context = $builder->build();
提交一个任务
使用 Context::submit 方法提交一个任务
use Toalett\Multiprocessing\ContextBuilder; $context = ContextBuilder::create()->build(); $job = static function(string $name) { print("Hello from {$name}!\n"); usleep(500_000); print("Goodbye from ${name}!\n"); }; $context->submit($job, 'John Snow');
任务将在调用 Context::run 方法之前不执行。
为了使用最多两个进程执行此任务 5 次,我们会这样做
use Toalett\Multiprocessing\Concurrency; use Toalett\Multiprocessing\ContextBuilder; $context = ContextBuilder::create() ->withConcurrency(Concurrency::atMost(2)) ->build(); // $job = function(...)... foreach(['John', 'Stannis', 'Jorah', 'Robert', 'Daario'] as $name) { $context->submit($job, $name); } $context->run();
如果您想使用间隔提交一个任务,建议您使用自定义的事件循环而不是 sleep() 或 usleep(),以防止阻塞主进程(从而暂停事件循环)
use React\EventLoop\Factory; use Toalett\Multiprocessing\ContextBuilder; $loop = Factory::create(); $context = ContextBuilder::create() ->withEventLoop($loop) ->build(); // Submit a job every 5 seconds $loop->addPeriodicTimer(5.0, fn() => $context->submit(...)); $context->run();
事件
当发生有趣的事情时,上下文会发出事件。您可以使用 Context::on 方法添加事件监听器
$context->on('name_of_event', fn() => ...);
以下是上下文发出的这些事件
bootedworker_startedworker_stoppedcongestioncongestion_relievedno_workers_remainingstopped
1. booted 事件
在调用 $context->run() 之后,此事件会被发出。这是上下文发出的第一个事件。它会在事件循环启动后立即发出。
2. worker_started 事件
当工作进程启动(进程已派生)时,会触发此事件。子进程的PID作为参数传递给监听器。
3. worker_stopped 事件
当工作进程停止(子进程已停止)时,会触发此事件。子进程的PID作为参数传递给监听器。
4. congestion 事件
当达到施加的并发限制时,会触发此事件。例如,当并发设置为最多2个子进程,而已经有2个任务正在运行时,提交第三个任务会导致此事件发生。系统会天真地等待一个子进程停止,然后再启动另一个工作进程。
5. congestion_relieved 事件
当拥堵缓解时,会触发此事件。这意味着一个子进程已停止,允许执行新任务。
6. no_workers_remaining 事件
当没有运行中的工作进程时,会触发此事件。这通常意味着没有更多工作要做。当发生此事件时,可以自动停止上下文。这在第一个和最后一个示例中已展示。
7. stopped 事件
可以通过调用Context::stop来停止上下文。当工作进程和事件循环成功停止后,上下文会触发一个stopped事件。
示例
对于大多数开发者来说,通过查看示例来学习东西是最快的方法。提供了三个可执行示例。
使用事件计算停止的工作进程数量
这是一个简单的示例,展示了创建50个作业时的事件触发。每当一个作业停止时,计数器就会增加。当所有作业都完成后,上下文会停止。
可以将清理间隔设置为一个很小的值,以提高响应性。
use Toalett\Multiprocessing\ContextBuilder; use Toalett\Multiprocessing\Task\Interval; const NUM_JOBS = 50; $context = ContextBuilder::create() ->withCleanupInterval(Interval::seconds(0.5)) ->build(); $counter = new Counter(); $context->on('worker_stopped', [$counter, 'increment']); $context->on('no_workers_remaining', [$context, 'stop']); $context->on('stopped', fn() => printf(" %d\n", $counter->value)); for ($i = 0; $i < NUM_JOBS; $i++) { $context->submit(fn() => sleep(2)); print('.'); } $context->run();
使用4个工作进程触发拥堵
此示例比上一个示例更详细。它用于演示拥堵以及上下文如何处理拥堵:上下文会简单地阻止所有执行,直到一个工作进程停止并且有空间可用。
请关注输出中出现的'C'。这表示拥堵:无法启动工作进程。
use React\EventLoop\Factory; use Toalett\Multiprocessing\ContextBuilder; use Toalett\Multiprocessing\Concurrency; $loop = Factory::create(); $context = ContextBuilder::create() ->withEventLoop($loop) ->withConcurrency(Concurrency::atMost(4)) ->build(); $context->on('booted', fn() => print("🚽 toalett context booted\n")); $context->on('congestion', fn() => print('C')); $context->on('congestion_relieved', fn() => print('R')); $context->on('worker_started', fn() => print('+')); $context->on('worker_stopped', fn() => print('-')); // A job is submitted to the context every second. // The job sleeps for a random amount of seconds (0 - 10). $loop->addPeriodicTimer(1, fn() => $context->submit(fn(int $s) => sleep($s), random_int(0, 10))); print("Press CTRL+C to stop.\n"); $context->run();
带有Job类的单个工作进程
由于任务实际上只是一个Closure,因此也可以提交一个实现了__invoke()魔术方法的对象。
在此示例中,执行限制在一个工作进程内,作业是Job类的实例。
use Toalett\Multiprocessing\Concurrency; use Toalett\Multiprocessing\ContextBuilder; use Toalett\Multiprocessing\Task\Interval; $context = ContextBuilder::create() ->withConcurrency(Concurrency::singleWorker()) ->withCleanupInterval(Interval::seconds(0.2)) ->build(); for ($i = 0; $i < 3; $i++) { $title = md5(mt_rand()); $context->submit(new Job($title)); } $context->on('no_workers_remaining', [$context, 'stop']); $context->run();
测试
测试可以在src/Tests目录中找到。