modstore/async

由 spatie/async 衍生

1.0.2 2019-05-17 06:49 UTC

This package is auto-updated.

Last update: 2024-09-06 12:09:34 UTC


README

Latest Version on Packagist Build Status Quality Score StyleCI Total Downloads

这个库为PHP的PCNTL扩展提供了一个小巧且易于使用的包装器。它允许并行运行不同的进程,并提供了易于使用的API。

安装

您可以通过composer安装此包

composer require spatie/async

用法

use Spatie\Async\Pool;

$pool = Pool::create();

foreach ($things as $thing) {
    $pool->add(function () use ($thing) {
        // Do a thing
    })->then(function ($output) {
        // Handle success
    })->catch(function (Throwable $exception) {
        // Handle exception
    });
}

$pool->wait();

事件监听器

在创建异步进程时,您将获得一个返回的ParallelProcess实例。您可以在进程上添加以下事件钩子。

$pool
    ->add(function () {
        // ...
    })
    ->then(function ($output) {
        // On success, `$output` is returned by the process or callable you passed to the queue.
    })
    ->catch(function ($exception) {
        // When an exception is thrown from within a process, it's caught and passed here.
    })
    ->timeout(function () {
        // A process took too long to finish.
    })
;

功能API

除了使用$pool对象上的方法外,您还可以使用asyncawait辅助函数。

use Spatie\Async\Process;

$pool = Pool::create();

foreach (range(1, 5) as $i) {
    $pool[] = async(function () {
        usleep(random_int(10, 1000));

        return 2;
    })->then(function (int $output) {
        $this->counter += $output;
    });
}

await($pool);

错误处理

如果子进程抛出了ExceptionError,则可以通过在->catch()方法中指定回调来逐个捕获。

$pool
    ->add(function () {
        // ...
    })
    ->catch(function ($exception) {
        // Handle the thrown exception for this child process.
    })
;

如果没有添加错误处理程序,则在调用await()$pool->wait()时,错误将在父进程中抛出。

如果子进程意外停止而没有抛出Throwable,则写入stderr的输出将被包装并作为Spatie\Async\ParallelError在父进程中抛出。

按类型捕获异常

通过类型提示catch函数,您可以为每种错误类型提供多个错误处理程序。

$pool
    ->add(function () {
        throw new MyException('test');
    })
    ->catch(function (MyException $e) {
        // Handle `MyException`
    })
    ->catch(function (OtherException $e) {
        // Handle `OtherException`
    });

请注意,一旦异常被处理,它就不会触发其他处理程序

$pool
    ->add(function () {
        throw new MyException('test');
    })
    ->catch(function (MyException $e) {
        // This one is triggerd when `MyException` is thrown
    })
    ->catch(function (Exception $e) {
        // This one is not triggerd, even though `MyException` extends `Exception`
    });

处理任务

除了使用闭包之外,您还可以使用Task。在需要更多子进程设置工作的情况下,Task非常有用。因为子进程总是从无到有启动的,所以您可能希望在执行任务之前初始化依赖项容器等。Task类使这更容易完成。

use Spatie\Async\Task;

class MyTask extends Task
{
    public function configure()
    {
        // Setup eg. dependency container, load config,...
    }

    public function run()
    {
        // Do the real work here.
    }
}

// Add the task to the pool
$pool->add(new MyTask());

简单任务

如果您想封装任务的逻辑,但又不想创建完整的Task对象,您还可以将可调用对象传递给Pool

class InvokableClass
{
    // ...

    public function __invoke()
    {
        // ...
    }
}

$pool->add(new InvokableClass(/* ... */));

池配置

您可以创建任意数量的池,每个池都有自己的进程队列,将处理这些进程。

池可由开发人员配置

use Spatie\Async\Pool;

$pool = Pool::create()

// The maximum amount of processes which can run simultaneously.
    ->concurrency(20)

// The maximum amount of time a process may take to finish in seconds.
    ->timeout(15)

// Configure which autoloader sub processes should use.
    ->autoload(__DIR__ . '/../../vendor/autoload.php')
    
// Configure how long the loop should sleep before re-checking the process statuses in milliseconds.
    ->sleepTime(50000)
;

同步回退

如果当前PHP运行时未安装所需的扩展(pcntlposix),则Pool将自动回退到任务的同步执行。

Pool类有一个静态方法isSupported,您可以通过调用它来检查您的平台是否能够运行异步进程。

如果您使用Task来运行进程,则仅在同步模式下调用这些任务的run方法。

幕后

当使用此包时,您可能想知道表面之下发生了什么。

我们使用symfony/process组件在PHP中创建和管理子进程。通过动态创建子进程,我们能够并行执行PHP脚本。这种并行性可以显著提高处理多个同步任务时的性能,这些任务实际上不需要互相等待。通过给这些任务分配单独的进程来运行,底层操作系统可以负责并行运行它们。

在动态创建进程时有一个注意事项:你需要确保一次不要创建太多进程,否则应用程序可能会崩溃。本包提供的Pool类通过调度和运行进程来处理你想要的任何数量的进程。

这就是async()$pool->add()所做的工作。现在让我们看看await()$pool->wait()做了什么。

当创建多个进程时,每个进程可能需要不同的时间来完成。例如,一个进程可能需要等待HTTP调用,而另一个进程可能需要处理大量数据。有时你的代码中也有需要等待进程返回结果的位置。

这就是为什么我们必须在某个时间点等待的原因:等待池中的所有进程都完成,这样我们就可以确定可以继续,而不会意外杀死尚未完成的子进程。

使用while循环等待所有进程完成,它将等待直到所有进程都完成。通过在SIGCHLD信号上使用监听器来确定进程何时完成。这个信号在操作系统内核完成子进程时发出。从PHP 7.1开始,对监听和处理信号的支持要好得多,这使得这种方法比使用进程分叉或套接字进行通信更高效。你可以在这里了解更多信息:这里

当进程完成时,会触发其成功事件,你可以通过->then()函数来连接。同样,当进程失败或超时时,循环将更新该进程的状态并继续。当所有进程都完成时,while循环将看到没有更多等待的内容,然后停止。这是父进程可以继续执行的时刻。

与其他库的比较

我们已经写了一篇博客文章,其中包含有关此包用例的更多信息,以及与其他异步PHP库(如ReactPHP和Amp)的比较:http://stitcher.io/blog/asynchronous-php

测试

composer test

变更日志

请参阅CHANGELOG以获取有关最近更改的更多信息。

贡献

请参阅CONTRIBUTING以获取详细信息。

安全性

如果你发现任何与安全相关的问题,请通过电子邮件freek@spatie.be与我们联系,而不是使用问题跟踪器。

明信片软件

你可以自由使用这个包,但如果它进入了你的生产环境,我们非常感激你从你的家乡给我们寄一张明信片,并说明你正在使用我们的哪些包。

我们的地址是:Spatie,Samberstraat 69D,2060 安特卫普,比利时。

我们将所有收到的明信片发布在我们的公司网站上

鸣谢

支持我们

Spatie是一家位于比利时的安特卫普的网页设计公司。你可以在我们的网站上找到所有我们的开源项目的概述:我们的网站

如果你的业务依赖于我们的贡献,请与我们联系并在Patreon上支持我们。所有承诺都将致力于分配人力资源以维护和新奇事物。

许可

MIT许可(MIT)。请参阅许可文件以获取更多信息。