symplely / spawn
一个简单的 `uv_spawn` 或 `proc-open` 包装 API,用于执行和管理子进程池,实现并行/异步 PHP 阻塞 I/O。
Requires
- php: >7.1
- symfony/process: >4
Requires (Dev)
- phpunit/phpunit: ^6 | ^7 | ^8
Suggests
- ext-uv: >0.2.4
- dev-master
- 5.0.1
- 5.0.0
- 4.1.8
- 4.1.7
- 4.1.6
- 4.1.5
- 4.1.4
- 4.1.3
- 4.1.2
- 4.1.1
- 4.1.0
- 4.0.2
- 4.0.1
- 4.0.0
- 3.1.1
- 3.1.0
- 3.0.5
- 3.0.4
- 3.0.3
- 3.0.2
- 3.0.1
- 3.0.0
- 2.1.0
- 2.0.1
- 2.0.0
- 1.1.10
- 1.1.9
- 1.1.8
- 1.1.7
- 1.1.6
- 1.1.5
- 1.1.4
- 1.1.3
- 1.1.2
- 1.1.1
- 1.1.0
- 1.0.11
- 1.0.10
- 1.0.9
- 1.0.8
- 1.0.7
- 1.0.6
- 1.0.5
- 1.0.4
- 1.0.3
- 1.0.2
- 1.0.1
- 1.0.0
- dev-5x
- dev-4x
This package is auto-updated.
Last update: 2024-09-07 20:00:28 UTC
README
一个简单的 uv_spawn
或 proc-open
包装 API,用于 执行 和 管理 一组 子进程,实现并行/异步 PHP 阻塞 I/O。
目录
本包使用 libuv
的功能,Node.js 库的 PHP 扩展 ext-uv。它使用 uv_spawn
函数启动进程。其性能优于 pcntl-extension 或使用 proc_open
。如果未安装 libuv
,则此包将回退到使用 [symfony/process]。
此包是我们 symplely/coroutine 包的一部分,用于处理任何无法由 Coroutine 本地处理的 阻塞 I/O 进程。
要了解更多关于 libuv 的功能,请阅读在线教程 书。
版本 3x 及以上版本的术语已更改,以与 ext-parallel
扩展使用保持一致,并作为 Thread
运行,但不受该库扩展限制的许多限制。
Channeled
和 Future
类都设计为可扩展,以创建基于 Parallel
的库的自定义 实现。目前需要 libuv
来获得实现的最大好处。
安装
composer require symplely/spawn
如果可用,此包将使用 libuv 功能。执行以下操作之一进行安装。
对于类似 Debian 的发行版,Ubuntu...
apt-get install libuv1-dev php-pear -y
对于类似 RedHat 的发行版,CentOS...
yum install libuv-devel php-pear -y
现在可以通过 Pecl 自动编译、安装和设置。
pecl channel-update pecl.php.net pecl install uv-beta
对于 Windows,有好的消息,通过 libuv
的本地 async 已到来。
稳定 PHP 版本的 Windows 构建可以从 PECL 获取。
直接从 https://windows.php.net/downloads/pecl/releases/uv/ 下载最新版本
将 libuv.dll
提取到与 PHP
二进制可执行文件相同的目录,并将 php_uv.dll
提取到 ext\
目录。
在 php.ini 中启用扩展 php_sockets.dll
和 php_uv.dll
cd C:\Php Invoke-WebRequest "https://windows.php.net/downloads/pecl/releases/uv/0.2.4/php_uv-0.2.4-7.2-ts-vc15-x64.zip" -OutFile "php_uv-0.2.4.zip" #Invoke-WebRequest "https://windows.php.net/downloads/pecl/releases/uv/0.2.4/php_uv-0.2.4-7.3-nts-vc15-x64.zip" -OutFile "php_uv-0.2.4.zip" #Invoke-WebRequest "https://windows.php.net/downloads/pecl/releases/uv/0.2.4/php_uv-0.2.4-7.4-ts-vc15-x64.zip" -OutFile "php_uv-0.2.4.zip" 7z x -y php_uv-0.2.4.zip libuv.dll php_uv.dll copy php_uv.dll ext\php_uv.dll del php_uv.dll del php_uv-0.2.4.zip echo extension=php_sockets.dll >> php.ini echo extension=php_uv.dll >> php.ini
用法
include 'vendor/autoload.php'; use Async\Spawn\Spawn; // Shows output by default and Channel instance is extracted from args. $future = \parallel($function, ...$args) // Shows output by default, turns on yield usage, can include additional file, and the Channel instance is extracted from args. $future = \paralleling($function, $includeFile, ...$args) // Or Does not show output by default and channel instance has to be explicitly passed in. $future = \spawn($function, $timeout, $channel) // Or Show output by default and channel instance has to be explicitly passed in. $future = \spawning($function, $timeout, $channel) // Or $future = Spawn::create(function () use ($thing) { // Do a thing }, $timeout, $channel) ->then(function ($output) { // Handle success })->catch(function (\Throwable $exception) { // Handle exception }); // Wait for `Future` to terminate. Note this should only be executed for local testing only. // Use "How to integrate into your project/package" section instead. // Second option can be used to set to display child output, default is false \spawn_run($future, true); // Or same as $future->displayOn()->run(); // Or $future->run();
通道在子进程和父进程之间传输消息
该功能已完全重新设计,以与 PHP ext-parallel 扩展类似。
请参阅 Channel 页面以获取真实示例。
include 'vendor/autoload.php'; use Async\Spawn\Channeled as Channel; $channel = Channel::make("io"); // Shows output by default and Channel instance is extracted for args. $future = parallel(function ($channel) { $channel = Channel::open($channel); for ($count = 0; $count <= 10; $count++) { $channel->send($count); } echo 'pingpangpong'; $channel->send(false); return 'return whatever'; }, (string) $channel); while (($value = $channel->recv()) !== false) { var_dump($value); } echo \spawn_output($future); // pingpangpong // Or echo \spawn_result($future); // return whatever // Or echo $future->getResult(); // return whatever
事件钩子
在创建异步进程时,您将返回一个 FutureInterface
实例。您可以在 Future
进程上添加以下事件 回调 钩子。
// Shows output by default and Channel instance is extracted for args. $future = parallel($function, ...$args) // Or $future = spawn($function, $timeout, $channel) // Or $future = Spawn::create(function () { // The second argument is optional, Defaults no timeout, // it sets The maximum amount of time a process may take to finish in seconds // The third is the Channel instance pass to Future subprocess. return `whatever`|Object|Closure|; // `whatever` will be encoded, then decoded by parent. }, int $timeout = 0 , $input = null) ->then(function ($result) { // On success, `$result` is returned by the process. }) ->catch(function ($exception) { // When an exception is thrown from within a process, it's caught and passed here. }) ->timeout(function () { // When an timeout is reached, it's caught and passed here. }) ->progress(function ($type, $data) { // Live progressing of output: `$type, $data` is returned by the Future process. // $type is `ERR` for stderr, or `OUT` for stdout. }) ->signal($signal, function ($signal) { // The process will be sent termination `signal` and stopped. // When an signal is triggered, it's caught and passed here. // This feature is only available using `libuv`. });
->then(function ($result) { // On success, `$result` is returned by the Future process or callable you passed. // }, function ($catchException) { // }, function ($progressOutput) { // } ); // To turn on displaying of child output. ->displayOn(); // Stop displaying child output. ->displayOff(); // A `Future` process can be retried. ->restart(); // Wait for `Future` to terminate. Note this should only be executed for local testing only. // Use "How to integrate into your project/package" section instead. ->run();
并行
Parallel类用于管理一组Future
对象。它提供了相同的事件钩子和错误处理。
include 'vendor/autoload.php'; use Async\Spawn\Parallel; $parallel = new Parallel(); foreach ($things as $thing) { // the second argument `optional`, can set the maximum amount of time a process may take to finish in seconds. $parallel->add(function () use ($thing) { // Do a thing }, $optional)->then(function ($output) { // Handle success // On success, `$output` is returned by the process or callable you passed to the queue. })->catch(function (\Throwable $exception) { // Handle exception // When an exception is thrown from within a process, it's caught and passed here. }); } // Wait for Parallel `Future` Pool to terminate. Note this should only be executed for local testing only. // Use "How to integrate into your project/package" section instead. $parallel->wait();
并行配置
您可以根据需要创建任意数量的并行进程池,每个并行进程池都有自己的进程队列来处理。
并行进程池可由开发者进行配置。
use Async\Spawn\Parallel; $parallel = (new Parallel()) // The maximum amount of processes which can run simultaneously. ->concurrency(20) // Configure how long the loop should sleep before re-checking the process statuses in milliseconds. ->sleepTime(50000);
幕后
此包使用uv_spawn
,并在需要时使用proc_open
作为备用方案,在PHP中创建和管理子进程池。通过动态创建子进程,我们能够在并行执行PHP脚本。这种并行性在处理多个不需要相互等待的同步I/O任务时可以显著提高性能。
通过为这些任务分配单独的进程执行,底层操作系统可以负责并行运行它们。
此包提供的Parallel
类通过调度和运行来处理尽可能多的进程。当生成多个进程时,每个进程都可以有不同的完成时间。
通过使用uv_run
或基本的子进程轮询
来等待所有进程完成,直到所有进程都完成。
当一个进程完成时,将触发其成功事件,您可以使用->then()
函数来挂钩。当进程失败时,将触发一个错误事件,您可以使用->catch()
函数来挂钩。当进程超时时,将触发一个超时事件,您可以使用->timeout()
函数来挂钩。
然后迭代将更新该进程的状态并继续执行。
与原作者的 "Spatie/Async" 的区别
此包与原作者的spatie/async实现有所不同。
Runnable
类是具有扩展功能的Future
。Pool
类是Parallel
类,其中一些功能提取到了另一个类FutureHandler
中。ParentRuntime
类是Spawn
类,它可以接受一个string
命令行操作以执行
,返回一个Future
。async
函数是带有额外启动
功能的spawn
,它会显示任何子进程的输出。- 移除了输出限制,除非在
Future
中设置超时,否则没有超时,添加了所有Symfony的Process
功能。 - 不仅限于
Linux
或CLI
,在Windows
和Apple macOS
的Web
环境中也能以相同的方式运行。 - 添加了对
libuv
事件循环库的支持,现在是主要的使用模型,如果未安装,则回退到proc-open
进程。 - Libuv允许更直接的
Channel
消息交换,与proc-open
相同,但功能更有限。
待办事项:将所有类似于ext-parallel
的功能从外部Coroutine
库中迁移过来。
之前提交的PR解决了真正的
Windows
支持问题。
如何将库集成到您的项目/包中
当您将此库包含到项目中时,您不能直接执行spawn_wait()
、spawn_run()
、wait()
或run()
函数/方法。它们主要用于在本地测试此库。您需要适应或创建自定义的事件循环例程。
Parallel
类有一个getFutureHandler()
方法,该方法返回一个FutureHandler
实例。该FutureHandler
有两个方法processing()
和isEmpty()
,您需要在自定义循环例程中调用这两个方法。这两个调用与wait()
方法在while
循环中调用相同的调用一样,额外增加了sleepingTime()
。
processing()
方法将 监控/检查 Future 的 状态,并 执行 任何适当的 事件回调 处理器。 FutureHandler 类可以 接受/处理 自定义的 事件循环,该循环定义了 executeTask(event callback, future)
和 isPcntl()
方法。自定义的 事件循环 对象应提供给 Parallel 实例化。
添加到您的 Event Loop 的基本设置
use Async\Spawn\Parallel; use Async\Spawn\FutureHandler; use Async\Spawn\FutureInterface; use Async\Spawn\ParallelInterface; class setupLoop { /** * @var Parallel */ protected $parallel; /** * @var FutureHandler */ protected $future = null; public function __construct() { $this->parallel = new Parallel($this); $this->future = $this->parallel->getFutureHandler(); } public function addFuture($callable, int $timeout = 0, bool $display = false, $channel = null): FutureInterface { $future = $this->parallel->add($callable, $timeout, $channel); return $display ? $future->displayOn() : $future; } public function getParallel(): ParallelInterface { return $this->parallel; } /** * Check for pending I/O events, signals, futures, streams/sockets/fd activity, timers or etc... */ protected function hasEvents(): bool { return !$this->future->isEmpty() || !$this->ActionEventsCheckers->isEmpty(); } public function runLoop() { while ($this->hasEvents()) { $this->future->processing(); if ($this->waitForAction()); $this->DoEventActions(); } } public function executeTask($event, $parameters = null) { $this->DoEventActions($event, $parameters); // Or just // if (\is_callable($event)) // $event($parameters); } public function isPcntl(): bool {} }
此库使用 opis/closure 包进行 closure/callable
序列化。为了使任何 函数 或 类 方法在 Future
子进程中可用,您必须修改您的 composer.json
文件以确保它被选中。 composer.json
文件应包含指向包含您始终需要的函数的文件的指针,并确保所有新类/命名空间都在其中。您不能随意创建局部 命名 的 函数
或 类
并期望它们可用。
// composer.json "autoload": { "files": [ "Extra/functions.php" ], "psr-4": { "Name\\Space\\": ["Folder/"], "Extra\\Name\\Spaces\\": ["Extra/"] } },
// functions.php if (!\function_exists('___marker')) { // // All additional extra functions needed in a `Future` process... // function ___marker() { return true; } }
错误处理
如果子进程中抛出 Exception
或 Error
,可以通过在 ->catch()
方法中指定回调来按进程捕获。
如果没有添加错误处理器,错误将在父进程中抛出。
如果子进程意外停止而没有抛出 Throwable
,写入 stderr
的输出将被包装,并在父进程中作为 Async\Spawn\SpawnError
抛出。
贡献
鼓励并欢迎贡献;我总是很高兴在 Github 上收到反馈或拉取请求 :) 为错误和新功能创建 Github Issues,并就您感兴趣的评论。
许可
MIT 许可证 (MIT)。有关更多信息,请参阅 许可文件。