amphp/parallel

Amp 的并行处理组件

资助包维护!
amphp

安装: 16,864,246

依赖项: 68

建议者: 1

安全: 0

星级: 773

关注者: 19

分支: 64

开放问题: 15

v2.2.9 2024-03-24 18:27 UTC

README

AMPHP 是一组针对 PHP 设计的事件驱动库,考虑到纤维和并发性。 amphp/parallel 提供使用多个进程或线程的 真正的并行处理无阻塞,无需扩展

为了尽可能灵活,这个库包含了一系列非阻塞的并发工具,可以根据需要独立使用,还有一个“有观点”的工作 API,允许您将工作单元分配给一组工作进程。

Latest Release MIT License

要求

  • PHP 8.1+

使用线程而不是进程的可选要求

安装

此包可以作为 Composer 依赖项安装。

composer require amphp/parallel

使用

此库的基本用法是提交阻塞任务由工作池执行,以避免阻塞主事件循环。

<?php

require __DIR__ . '/../vendor/autoload.php';

use Amp\Future;
use Amp\Parallel\Worker;
use function Amp\async;

$urls = [
    'https://secure.php.net',
    'https://amphp.org',
    'https://github.com',
];

$executions = [];
foreach ($urls as $url) {
    // FetchTask is just an example, you'll have to implement
    // the Task interface for your task.
    $executions[$url] = Worker\submit(new FetchTask($url));
}

// Each submission returns an Execution instance to allow two-way
// communication with a task. Here we're only interested in the
// task result, so we use the Future from Execution::getFuture()
$responses = Future\await(array_map(
    fn (Worker\Execution $e) => $e->getFuture(),
    $executions,
));

foreach ($responses as $url => $response) {
    \printf("Read %d bytes from %s\n", \strlen($response), $url);
}

FetchTask 仅仅是作为这里阻塞函数的一个示例。如果您只想并发获取多个 HTTP 资源,最好使用我们的非阻塞 HTTP 客户端 amphp/http-client

注意 您调用的函数必须由 Composer 预定义或自动加载,因此它们也存在于工作进程或线程中。

工作者

Worker 提供了一个简单接口,用于在单独的 PHP 进程或线程中并行执行 PHP 代码。实现 Task 的类用于定义要并行运行的代码。

任务

Task 接口有一个单一的 run() 方法,该方法在工作进程中被调用以调度需要执行的工作。可以编写使用阻塞代码的 run() 方法,因为代码是在单独的进程或线程中执行的。

在主进程中序列化 Task 实例,并在工作进程中反序列化。这意味着在主进程和工作进程之间传递的所有数据都需要可序列化。

以下示例中定义了一个 Task,它调用一个阻塞函数(file_get_contents() 仅作为阻塞函数的示例,请使用 http-client 进行非阻塞 HTTP 请求)。

执行任务的子进程或线程可以被重用以执行多个任务。

// FetchTask.php
// Tasks must be defined in a file which can be loaded by the composer autoloader.

use Amp\Cancellation;
use Amp\Parallel\Worker\Task;
use Amp\Sync\Channel;

class FetchTask implements Task
{
    public function __construct(
        private readonly string $url,
    ) {
    }

    public function run(Channel $channel, Cancellation $cancellation): string
    {
        return file_get_contents($this->url); // Example blocking function
    }
}
// main.php

$worker = Amp\Parallel\Worker\createWorker();
$task = new FetchTask('https://amphp.org');

$execution = $worker->submit($task);

// $data will be the return value from FetchTask::run()
$data = $execution->await();

在任务之间共享数据

任务可能希望在任务运行之间共享数据。将 Cache 实例存储在静态属性中,该属性仅在 Task::run() 内初始化,是我们推荐的数据共享策略。

use Amp\Cache\LocalCache;
use Amp\Cancellation;
use Amp\Parallel\Worker\Task;
use Amp\Sync\Channel;

final class ExampleTask implements Task
{
    private static ?LocalCache $cache = null;
    
    public function run(Channel $channel, Cancellation $cancellation): mixed
    {
        $cache = self::$cache ??= new LocalCache();
        $cachedValue = $cache->get('cache-key');
        // Use and modify $cachedValue...
        $cache->set('cache-key', $updatedValue);
        return $updatedValue;
    }
}

您可能希望提供挂钩以使用模拟数据初始化缓存以进行测试。

由于工作进程可能正在执行多个任务,因此在创建或更新缓存值时,如果任务使用异步 I/O 生成缓存值,则考虑使用 AtomicCache 而不是 LocalCacheAtomicCache 有基于缓存键提供互斥的方法。

任务取消

提供给Worker::submit()Cancellation可以用于请求取消工作进程中的任务。当父进程中请求取消时,提供给Task::run()Cancellation将被取消。任务可以选择忽略这个取消请求,或者相应地操作并从Task::run()抛出CancelledException。如果忽略取消请求,任务可以继续执行并返回一个值,该值将作为未请求取消的情况返回给父进程。

工作进程池

使用工作进程最简单的方法是通过工作进程池。工作进程池可以像工作进程一样提交任务,但与使用单个工作进程不同,池使用多个工作进程来执行任务。这允许多个任务同时执行。

WorkerPool接口扩展了Worker,增加了获取池信息或从池中提取单个Worker实例的方法。池使用多个Worker实例来执行Task实例。

如果应该在一个单独的工作进程中运行一系列任务,请使用WorkerPool::getWorker()方法从池中提取单个工作进程。当返回的实例被销毁时,工作进程将自动返回到池中。

全局工作进程池

全局工作进程池是可用的,可以使用函数Amp\Parallel\Worker\workerPool(?WorkerPool $pool = null)来设置。传递一个WorkerPool实例将全局池设置为给定的实例。不传递实例调用该函数将返回当前的全局实例。

子进程或线程

上下文简化了并行编写和运行PHP的过程。要并行运行的脚本必须返回一个可调用的函数,该函数将在子进程或线程中运行。可调用的函数接收一个参数——一个Channel实例,可以用于在父进程和子进程或线程之间发送数据。任何可序列化的数据都可以通过此通道发送。扩展Channel接口的Context对象是通信通道的另一端。

使用ContextFactory创建上下文。默认的DefaultContextFactory将使用最佳可用的方法创建上下文,如果安装了ext-parallel则创建线程,否则使用子进程。还提供了ThreadContextFactory(需要PHP 8.2+的ZTS构建和ext-parallel以创建线程)和ProcessContextFactory,以便创建特定类型的上下文。

在下面的示例中,使用子进程或线程调用一个阻塞函数(file_get_contents()只是一个阻塞函数的例子,对于非阻塞HTTP请求请使用http-client)。然后将该函数的结果通过Channel对象发送回父进程。子可调用的返回值可以通过Context::join()方法获得。

子进程或线程

// child.php

use Amp\Sync\Channel;

return function (Channel $channel): mixed {
    $url = $channel->receive();

    $data = file_get_contents($url); // Example blocking function

    $channel->send($data);

    return 'Any serializable data';
};

父进程

// parent.php

use Amp\Parallel\Context\ProcessContext;

// Creates and starts a child process or thread.
$context = Amp\Parallel\Context\contextFactory()->start(__DIR__ . '/child.php');

$url = 'https://google.com';
$context->send($url);

$requestData = $context->receive();
printf("Received %d bytes from %s\n", \strlen($requestData), $url);

$returnValue = $context->join();
printf("Child processes exited with '%s'\n", $returnValue);

子进程或线程对于执行CPU密集型操作(如图像处理)或运行根据父进程输入执行定期任务的守护进程也非常有用。

上下文创建

可以使用函数Amp\Parallel\Context\startContext()创建执行上下文,该函数使用全局的ContextFactory。默认情况下,全局工厂是DefaultContextFactory的一个实例,但可以使用函数Amp\Parallel\Context\contextFactory()来覆盖该实例。

// Using the global context factory from Amp\Parallel\Context\contextFactory()
$context = Amp\Parallel\Context\startContext(__DIR__ . '/child.php');

// Creating a specific context factory and using it to create a context.
$contextFactory = new Amp\Parallel\Context\ProcessContextFactory();
$context = $contextFactory->start(__DIR__ . '/child.php');

上下文工厂由工作进程池用于创建执行任务的上下文。向工作进程池提供一个自定义的ContextFactory允许在池工作进程中执行自定义引导或其他行为。

可以通过ContextFactory创建执行上下文。工作进程池使用上下文工厂来创建工作进程。

全局工作进程池是可用的,可以使用函数Amp\Parallel\Worker\workerPool(?WorkerPool $pool = null)来设置。传递一个WorkerPool实例将全局池设置为给定的实例。不传递实例调用该函数将返回当前的全局实例。

IPC

使用单个 Channel 创建一个上下文,该通道可用于在父进程和子进程之间双向发送数据。通道是一种高级数据交换方式,允许序列化数据通过通道发送。Channel 实现处理序列化和反序列化数据、消息封帧以及在父进程和子进程之间的基础套接字上的分块。

注意 应仅使用通道在父进程和子进程之间发送 数据。尝试在通道上发送数据库连接或文件句柄等资源将不会工作。此类资源应在每个子进程中打开。一个值得注意的例外是:可以使用 amphp/cluster 提供的工具在父进程和子进程之间发送服务器和客户端网络套接字。

下面的示例代码定义了一个类 AppMessage,其中包含一个消息类型枚举和相关依赖枚举情况的消息数据。所有在父进程和子进程之间通过通道发送的消息都使用 AppMessage 的实例来定义消息意图。子进程也可以使用不同的类进行回复,但这里为了简洁没有这么做。可以采用任何最适合您应用程序的消息策略,唯一的要求是发送到通道的任何结构都必须是可序列化的。

下面的示例在从 STDIN 接收路径后向子进程发送消息以处理图像,然后等待子进程的回复。当提供空路径时,父进程向子进程发送 null 以打破消息循环,并在子进程退出之前退出自己。

// AppMessage.php

class AppMessage {
    public function __construct(
        public readonly AppMessageType $type,
        public readonly mixed $data,
    ) {
    }
}
// AppMessageType.php

enum AppMessageType {
    case ProcessedImage;
    case ProcessImageFromPath;
    // Other enum cases for further message types...
}
// parent.php

use Amp\Parallel\Context\ProcessContextFactory;

$contextFactory = new ProcessContextFactory();
$context = $contextFactory->start(__DIR__ . '/child.php');

$stdin = Amp\ByteStream\getStdin();

while ($path = $stdin->read()) {
    $message = new AppMessage(AppMessageType::ProcessImageFromPath, $path);
    $context->send($message);

    $reply = $context->receive(); // Wait for reply from child context with processed image data.
}

$context->send(null); // End loop in child process.
$context->join();
// child.php

use Amp\Sync\Channel;

return function (Channel $channel): void {
    /** @var AppMessage|null $message */
    while ($message = $channel->receive()) {
        $reply = match ($message->type) {
            AppMessageType::ProcessImageFromPath => new AppMessage(
                AppMessageType::ProcessedImage,
                ImageProcessor::process($message->data),
            ),
            // Handle other message types...
        }
        
        $channel->send($reply);
    }
};

创建 IPC 套接字

有时需要在父进程和子进程上下文之间创建另一个套接字以进行专门的 IPC。一个例子是使用 ClientSocketReceivePipeClientSocketSendPipe 在父进程和子进程之间发送套接字,这些在 amphp/cluster 中找到。父进程中的 IpcHub 实例和子进程中的 Amp\Parallel\Ipc\connect() 函数。

下面的示例在父进程和子进程之间创建一个单独的 IPC 套接字,然后使用 amphp/cluster 在父进程和子进程中分别创建 ClientSocketReceivePipeClientSocketSendPipe 的实例。

// parent.php
use Amp\Cluster\ClientSocketSendPipe;
use Amp\Parallel\Context\ProcessContextFactory;
use Amp\Parallel\Ipc\LocalIpcHub;

$ipcHub = new LocalIpcHub();

// Sharing the IpcHub instance with the context factory isn't required,
// but reduces the number of opened sockets.
$contextFactory = new ProcessContextFactory(ipcHub: $ipcHub); 

$context = $contextFactory->start(__DIR__ . '/child.php');

$connectionKey = $ipcHub->generateKey();
$context->send(['uri' => $ipcHub->getUri(), 'key' => $connectionKey]);

// $socket will be a bidirectional socket to the child.
$socket = $ipcHub->accept($connectionKey);

$socketPipe = new ClientSocketSendPipe($socket);
// child.php
use Amp\Cluster\ClientSocketReceivePipe;
use Amp\Sync\Channel;

return function (Channel $channel): void {
    ['uri' => $uri, 'key' => $connectionKey] = $channel->receive();
    
    // $socket will be a bidirectional socket to the parent.
    $socket = Amp\Parallel\Ipc\connect($uri, $connectionKey);
    
    $socketPipe = new ClientSocketReceivePipe($socket);
};

调试

可以使用 PhpStorm 和 Xdebug 在子进程中使用步骤调试,通过在 IDE 中监听调试连接。

在 PhpStorm 设置中,在 PHP > Debug 下方,确保选中“可以接受外部连接”框。具体使用的端口并不重要,您的可能不同。

PhpStorm Xdebug settings

要使子进程连接到 IDE 并在子进程中设置的断点处停止,请开启监听调试连接。

关闭监听

Debug listening off

开启监听

Debug listening on

不需要手动设置任何 PHP ini 设置。调用父 PHP 进程时由 PhpStorm 设置的设置将转发到子进程。

从 PhpStorm 以调试模式运行父脚本,并在子进程中执行的代码中设置断点。执行应在任何子进程中设置的断点处停止。

调试器正在运行

Debug running

在子进程中停止在断点处时,父进程和任何其他子进程的执行将继续。PhpStorm 将为连接到调试器的每个子进程打开一个新的调试器选项卡,因此您可能需要在调试时限制创建的子进程数量,或者连接的数量可能会变得难以承受!如果您在父进程和子进程中设置断点,您可能需要切换调试选项卡以恢复父进程和子进程。

版本控制

amphp/parallel 遵循与所有其他 amphp 包相同的 semver 语义化版本规范。

安全

如果您发现任何与安全相关的问题,请使用私有的安全问题报告器,而不是使用公共的问题跟踪器。

许可证

MIT 许可证 (MIT)。请参阅 LICENSE 以获取更多信息。