toalett/multiprocessing

易于多进程处理的上下文

v1.1 2020-12-14 16:02 UTC

This package is auto-updated.

Last update: 2024-09-13 23:56:50 UTC


README

欢迎来到 Toalett,这是一个围绕“所有软件都是💩”这一理念发起的谦逊的倡议。
Toalett 是挪威语中“马桶”的意思,听起来比“马桶”更高端。

为什么选择 toalett/multiprocessing

多进程处理 是一种在 PHP (cli) 应用程序中经常使用的技术,用于异步执行任务。由于 PHP 缺乏原生的 多线程,开发者必须依赖于传统的多进程处理来做到这一点。

我们经常看到为了完成这项任务而编写的快速且草率的代码,其中包含对 pcntl_fork() 的调用隐藏在某个地方,导致实现方式丑陋。

Toalett 并不反对快速且草率的 PHP 代码,但鉴于多进程处理如此普遍,使用这个库可能是个不错的选择。

好吧,如何使用它?

使用 composer 安装

$ composer require toalett/multiprocessing

结构

该库提供了一个用于管理多进程处理的单一类:Context。它内部使用 react/event-loop,并通过简单的(但优雅的)evenement/evenement 库发出事件。它将任务委托给内部的 Workers 组件,该组件负责创建和管理子进程。

创建一个 Context

该库包含了一个 ContextBuilder 类,用于构建 Context。它可以提供一个 Concurrency 限制(默认为无限制),一个自定义的 \React\EventLoop\LoopInterface 实例,以及一个在哪个时间间隔内应该执行子进程清理的 Interval。要创建一个 Context,只需调用 build() 方法

use Toalett\Multiprocessing\ContextBuilder;

$builder = ContextBuilder::create();
$context = $builder->build();

提交一个任务

使用 Context::submit 方法提交一个任务

use Toalett\Multiprocessing\ContextBuilder;

$context = ContextBuilder::create()->build();

$job = static function(string $name) {
    print("Hello from {$name}!\n");
    usleep(500_000);
    print("Goodbye from ${name}!\n");
};

$context->submit($job, 'John Snow');

任务将在调用 Context::run 方法之前不执行。
为了使用最多两个进程执行此任务 5 次,我们会这样做

use Toalett\Multiprocessing\Concurrency;
use Toalett\Multiprocessing\ContextBuilder;

$context = ContextBuilder::create()
    ->withConcurrency(Concurrency::atMost(2))
    ->build();

// $job = function(...)...

foreach(['John', 'Stannis', 'Jorah', 'Robert', 'Daario'] as $name) {
    $context->submit($job, $name);
}

$context->run();

如果您想使用间隔提交一个任务,建议您使用自定义的事件循环而不是 sleep()usleep(),以防止阻塞主进程(从而暂停事件循环)

use React\EventLoop\Factory;
use Toalett\Multiprocessing\ContextBuilder;

$loop = Factory::create();
$context = ContextBuilder::create()
    ->withEventLoop($loop)
    ->build();

// Submit a job every 5 seconds
$loop->addPeriodicTimer(5.0, fn() => $context->submit(...));
$context->run();

事件

当发生有趣的事情时,上下文会发出事件。您可以使用 Context::on 方法添加事件监听器

$context->on('name_of_event', fn() => ...);

以下是上下文发出的这些事件

  1. booted
  2. worker_started
  3. worker_stopped
  4. congestion
  5. congestion_relieved
  6. no_workers_remaining
  7. stopped

1. booted 事件

在调用 $context->run() 之后,此事件会被发出。这是上下文发出的第一个事件。它会在事件循环启动后立即发出。

2. worker_started 事件

当工作进程启动(进程已派生)时,会触发此事件。子进程的PID作为参数传递给监听器。

3. worker_stopped 事件

当工作进程停止(子进程已停止)时,会触发此事件。子进程的PID作为参数传递给监听器。

4. congestion 事件

当达到施加的并发限制时,会触发此事件。例如,当并发设置为最多2个子进程,而已经有2个任务正在运行时,提交第三个任务会导致此事件发生。系统会天真地等待一个子进程停止,然后再启动另一个工作进程。

5. congestion_relieved 事件

当拥堵缓解时,会触发此事件。这意味着一个子进程已停止,允许执行新任务。

6. no_workers_remaining 事件

当没有运行中的工作进程时,会触发此事件。这通常意味着没有更多工作要做。当发生此事件时,可以自动停止上下文。这在第一个和最后一个示例中已展示。

7. stopped 事件

可以通过调用Context::stop来停止上下文。当工作进程和事件循环成功停止后,上下文会触发一个stopped事件。

示例

对于大多数开发者来说,通过查看示例来学习东西是最快的方法。提供了三个可执行示例。

使用事件计算停止的工作进程数量

这是一个简单的示例,展示了创建50个作业时的事件触发。每当一个作业停止时,计数器就会增加。当所有作业都完成后,上下文会停止。

可以将清理间隔设置为一个很小的值,以提高响应性。

use Toalett\Multiprocessing\ContextBuilder;
use Toalett\Multiprocessing\Task\Interval;

const NUM_JOBS = 50;

$context = ContextBuilder::create()
    ->withCleanupInterval(Interval::seconds(0.5))
    ->build();

$counter = new Counter();
$context->on('worker_stopped', [$counter, 'increment']);
$context->on('no_workers_remaining', [$context, 'stop']);
$context->on('stopped', fn() => printf(" %d\n", $counter->value));

for ($i = 0; $i < NUM_JOBS; $i++) {
    $context->submit(fn() => sleep(2));
    print('.');
}

$context->run();

使用4个工作进程触发拥堵

此示例比上一个示例更详细。它用于演示拥堵以及上下文如何处理拥堵:上下文会简单地阻止所有执行,直到一个工作进程停止并且有空间可用。

请关注输出中出现的'C'。这表示拥堵:无法启动工作进程。

use React\EventLoop\Factory;
use Toalett\Multiprocessing\ContextBuilder;
use Toalett\Multiprocessing\Concurrency;

$loop = Factory::create();
$context = ContextBuilder::create()
    ->withEventLoop($loop)
    ->withConcurrency(Concurrency::atMost(4))
    ->build();

$context->on('booted', fn() => print("🚽 toalett context booted\n"));
$context->on('congestion', fn() => print('C'));
$context->on('congestion_relieved', fn() => print('R'));
$context->on('worker_started', fn() => print('+'));
$context->on('worker_stopped', fn() => print('-'));

// A job is submitted to the context every second.
// The job sleeps for a random amount of seconds (0 - 10).
$loop->addPeriodicTimer(1, fn() => $context->submit(fn(int $s) => sleep($s), random_int(0, 10)));

print("Press CTRL+C to stop.\n");
$context->run();

带有Job类的单个工作进程

由于任务实际上只是一个Closure,因此也可以提交一个实现了__invoke()魔术方法的对象。

在此示例中,执行限制在一个工作进程内,作业是Job类的实例。

use Toalett\Multiprocessing\Concurrency;
use Toalett\Multiprocessing\ContextBuilder;
use Toalett\Multiprocessing\Task\Interval;

$context = ContextBuilder::create()
    ->withConcurrency(Concurrency::singleWorker())
    ->withCleanupInterval(Interval::seconds(0.2))
    ->build();

for ($i = 0; $i < 3; $i++) {
    $title = md5(mt_rand());
    $context->submit(new Job($title));
}

$context->on('no_workers_remaining', [$context, 'stop']);
$context->run();

测试

测试可以在src/Tests目录中找到。