juzaweb / async
基于PCNTL扩展的异步和并行PHP
Requires
- php: ^7.1 || ^8.0
- opis/closure: ^3.4.2
- symfony/process: ^3.3 || ^4.0 || ^5.0
Requires (Dev)
- larapack/dd: ^1.1
- phpunit/phpunit: ^7.5 || ^8.0 || ^9.0
- symfony/stopwatch: ^4.0 || ^5.0
Suggests
- ext-pcntl: Required to use async processes
- ext-posix: Required to use async processes
README
此库提供了一个围绕PHP的PCNTL扩展的小型且易于使用的包装器。它允许并行运行不同的进程,具有易于使用的API。
支持我们
我们投入了大量资源来创建最佳类别的开源包。您可以通过购买我们的付费产品之一来支持我们。
我们非常感谢您从家乡寄来明信片,并提到您正在使用我们哪个包。您可以在我们的联系页面上找到我们的地址。我们将所有收到的明信片发布在我们的虚拟明信片墙上。
安装
您可以通过composer安装此包
composer require spatie/async
用法
use Spatie\Async\Pool; $pool = Pool::create(); foreach ($things as $thing) { $pool->add(function () use ($thing) { // Do a thing })->then(function ($output) { // Handle success })->catch(function (Throwable $exception) { // Handle exception }); } $pool->wait();
事件监听器
在创建异步进程时,您将返回一个ParallelProcess
实例。您可以在进程上添加以下事件钩子。
$pool ->add(function () { // ... }) ->then(function ($output) { // On success, `$output` is returned by the process or callable you passed to the queue. }) ->catch(function ($exception) { // When an exception is thrown from within a process, it's caught and passed here. }) ->timeout(function () { // A process took too long to finish. }) ;
功能API
您也可以使用async
和await
辅助函数,而不是在$pool
对象上使用方法。
use Spatie\Async\Pool; $pool = Pool::create(); foreach (range(1, 5) as $i) { $pool[] = async(function () { usleep(random_int(10, 1000)); return 2; })->then(function (int $output) { $this->counter += $output; }); } await($pool);
错误处理
如果子进程中抛出了Exception
或Error
,可以在->catch()
方法中指定一个回调来按进程捕获。
$pool ->add(function () { // ... }) ->catch(function ($exception) { // Handle the thrown exception for this child process. }) ;
如果没有添加错误处理器,当调用await()
或$pool->wait()
时,错误将在父进程中抛出。
如果子进程意外停止而没有抛出Throwable
,则写入stderr
的输出将被包装并作为父进程中的Spatie\Async\ParallelError
抛出。
按类型捕获异常
通过类型提示catch
函数,您可以提供多个错误处理器,每个处理器针对错误的不同类型。
$pool ->add(function () { throw new MyException('test'); }) ->catch(function (MyException $e) { // Handle `MyException` }) ->catch(function (OtherException $e) { // Handle `OtherException` });
请注意,一旦异常被处理,就不会触发其他处理器
$pool ->add(function () { throw new MyException('test'); }) ->catch(function (MyException $e) { // This one is triggerd when `MyException` is thrown }) ->catch(function (Exception $e) { // This one is not triggerd, even though `MyException` extends `Exception` });
停止池
如果您需要提前停止池,因为子进程已经完成了任务,您可以使用$pool->stop()
方法。这将防止池启动任何额外的进程。
use Spatie\Async\Pool; $pool = Pool::create(); // Generate 10k processes generating random numbers for($i = 0; $i < 10000; $i++) { $pool->add(function() use ($i) { return rand(0, 100); })->then(function($output) use ($pool) { // If one of them randomly picks 100, end the pool early. if ($output === 100) { $pool->stop(); } }); } $pool->wait();
请注意,停止后池将变得无用,如果需要,应创建新的池。
使用另一个PHP二进制文件
默认情况下,池将使用php
来执行其子进程。您可以通过以下方式配置其他二进制文件
Pool::create() ->withBinary('/path/to/php');
与任务一起工作
除了使用闭包外,您还可以使用Task
。在需要更多子进程设置工作的场景中,Task
非常有用。因为子进程总是从无到有地启动,所以您可能想在执行任务之前初始化例如依赖项容器。Task
类使这更容易做到。
use Spatie\Async\Task; class MyTask extends Task { public function configure() { // Setup eg. dependency container, load config,... } public function run() { // Do the real work here. } } // Add the task to the pool $pool->add(new MyTask());
简单任务
如果您想封装任务逻辑,但又不想创建完整的Task
对象,您也可以将可调用的对象传递给Pool
。
class InvokableClass { // ... public function __invoke() { // ... } } $pool->add(new InvokableClass(/* ... */));
池配置
您可以创建任意数量的池,每个池都有自己的进程队列。
池可由开发者配置
use Spatie\Async\Pool; $pool = Pool::create() // The maximum amount of processes which can run simultaneously. ->concurrency(20) // The maximum amount of time a process may take to finish in seconds // (decimal places are supported for more granular timeouts). ->timeout(15) // Configure which autoloader sub processes should use. ->autoload(__DIR__ . '/../../vendor/autoload.php') // Configure how long the loop should sleep before re-checking the process statuses in microseconds. ->sleepTime(50000) ;
同步回退
如果当前PHP运行时未安装所需的扩展(pcntl
和posix
),则Pool
将自动回退到任务的同步执行。
Pool
类有一个静态方法isSupported
,您可以通过调用此方法来检查您的平台是否能够运行异步进程。
如果您使用Task
来运行进程,在同步模式下运行时,只会调用这些任务的run
方法。
幕后
在使用这个包时,您可能想知道幕后发生了什么。
我们使用symfony/process
组件在PHP中创建和管理子进程。通过动态创建子进程,我们能够并行执行PHP脚本。这种并行性在处理多个同步任务时可以显著提高性能,因为这些任务实际上并不需要相互等待。通过为这些任务分配一个单独的进程来运行,底层操作系统可以负责并行运行它们。
动态生成进程时有一个需要注意的地方:您需要确保一次不会生成太多进程,否则应用程序可能会崩溃。此包提供的Pool
类会处理您想要的任何数量的进程,通过安排和运行它们,在可能的时候。
这就是async()
或$pool->add()
所做的事情。现在让我们看看await()
或$pool->wait()
所做的是什么。
当生成多个进程时,每个进程可能有自己的完成时间。例如,一个进程可能需要等待HTTP调用,而另一个则必须处理大量数据。有时您的代码中也有必须等待进程结果返回的点。
这就是为什么我们必须在某个时间点等待的原因:等待池中的所有进程完成,这样我们就可以确保可以安全继续,而不会意外杀死尚未完成的子进程。
通过使用while
循环来等待所有进程完成,该循环将等待直到所有进程完成。确定进程何时完成是通过监听SIGCHLD
信号来完成的。当操作系统内核通知子进程完成时,会发出此信号。截至PHP 7.1,对监听和处理信号的支持有了很大改善,这使得这种方法比使用进程fork或套接字进行通信等方法更高效。您可以在此处了解更多信息:这里。
当进程完成时,将触发其成功事件,您可以使用->then()
函数来挂钩。同样,当进程失败或超时时,循环将更新该进程的状态并继续。当所有进程都完成后,while循环将看到没有更多等待的内容,然后停止。这是父进程可以继续执行的时刻。
与其他库的比较
我们撰写了一篇博客文章,其中包含有关此包使用案例的更多信息,以及与其他异步PHP库(如ReactPHP和Amp)的比较:http://stitcher.io/blog/asynchronous-php。
测试
composer test
变更日志
请参阅CHANGELOG以获取有关最近更改的更多信息。
贡献
请参阅CONTRIBUTING以获取详细信息。
安全
如果您发现任何安全相关的问题,请通过电子邮件freek@spatie.be报告,而不是使用问题跟踪器。
明信片软件
您可以自由使用此包,但如果它进入您的生产环境,我们非常感激您从家乡寄给我们一张明信片,说明您正在使用我们哪个包。
我们的地址是:Spatie,Kruikstraat 22,2018 安特卫普,比利时。
我们将发布收到的所有明信片在我们的公司网站上。
鸣谢
许可证
MIT 许可证 (MIT)。请参阅许可证文件获取更多信息。