spatie/async

使用PCNTL扩展实现异步和并行PHP

资助包维护!
spatie

安装: 2,614,122

依赖项: 39

建议者: 2

安全性: 0

星标: 2,613

关注者: 38

分支: 177

开放问题: 6

1.6.1 2023-12-14 13:21 UTC

This package is auto-updated.

Last update: 2024-08-26 14:42:44 UTC


README

Latest Version on Packagist Tests Status Quality Score Total Downloads

此库提供PHP PCNTL扩展的小型且易于使用的包装。它允许并行运行不同的进程,具有易于使用的API。

支持我们

我们投入了大量资源来创建一流的开放源代码包。您可以通过购买我们的付费产品之一来支持我们。

我们非常感谢您从家乡寄来明信片,说明您正在使用我们的哪个包。您可以在我们的联系页面上找到我们的地址。我们将所有收到的明信片发布在我们的虚拟明信片墙上

安装

您可以通过composer安装此包

composer require spatie/async

用法

use Spatie\Async\Pool;

$pool = Pool::create();

foreach ($things as $thing) {
    $pool->add(function () use ($thing) {
        // Do a thing
    })->then(function ($output) {
        // Handle success
    })->catch(function (Throwable $exception) {
        // Handle exception
    });
}

$pool->wait();

事件监听器

在创建异步进程时,您将返回一个ParallelProcess实例。您可以在进程中添加以下事件钩子。

$pool
    ->add(function () {
        // ...
    })
    ->then(function ($output) {
        // On success, `$output` is returned by the process or callable you passed to the queue.
    })
    ->catch(function ($exception) {
        // When an exception is thrown from within a process, it's caught and passed here.
    })
    ->timeout(function () {
        // A process took too long to finish.
    })
;

函数式API

您也可以使用asyncawait辅助函数,而不是使用$pool对象上的方法。

use Spatie\Async\Pool;

$pool = Pool::create();

foreach (range(1, 5) as $i) {
    $pool[] = async(function () {
        usleep(random_int(10, 1000));

        return 2;
    })->then(function (int $output) {
        $this->counter += $output;
    });
}

await($pool);

错误处理

如果从子进程中抛出了ExceptionError,可以通过在->catch()方法中指定回调来逐个进程地捕获。

$pool
    ->add(function () {
        // ...
    })
    ->catch(function ($exception) {
        // Handle the thrown exception for this child process.
    })
;

如果没有添加错误处理程序,则在调用await()$pool->wait()时,错误将在父进程中抛出。

如果子进程意外停止而没有抛出Throwable,则将写入stderr的输出包装并抛出为父进程中的Spatie\Async\ParallelError

按类型捕获异常

通过类型提示catch函数,您可以提供多个错误处理程序,每个用于不同类型的错误。

$pool
    ->add(function () {
        throw new MyException('test');
    })
    ->catch(function (MyException $e) {
        // Handle `MyException`
    })
    ->catch(function (OtherException $e) {
        // Handle `OtherException`
    });

请注意,一旦处理了异常,它就不会触发任何其他处理程序

$pool
    ->add(function () {
        throw new MyException('test');
    })
    ->catch(function (MyException $e) {
        // This one is triggerd when `MyException` is thrown
    })
    ->catch(function (Exception $e) {
        // This one is not triggerd, even though `MyException` extends `Exception`
    });

停止池

如果您需要提前停止池,因为子进程之一已经完成了它所执行的任务,您可以使用$pool->stop()方法。这将防止池启动任何额外的进程。

use Spatie\Async\Pool;

$pool = Pool::create();

// Generate 10k processes generating random numbers
for($i = 0; $i < 10000; $i++) {
    $pool->add(function() use ($i) {
        return rand(0, 100);
    })->then(function($output) use ($pool) {
        // If one of them randomly picks 100, end the pool early.
        if ($output === 100) {
            $pool->stop();
        }
    });
}

$pool->wait();

请注意,在停止后,池将变得无用的,如果需要,应创建新的池。

使用另一个PHP二进制文件

默认情况下,池将使用php来执行其子进程。您可以通过以下方式配置另一个二进制文件

Pool::create()
    ->withBinary('/path/to/php');

与任务一起工作

除了使用闭包之外,您还可以使用Task。当您需要在子进程中执行更多设置工作的情况时,Task非常有用。由于子进程始终从无启动,因此您可能希望在执行任务之前初始化例如依赖注入容器。Task类使这变得更容易。

use Spatie\Async\Task;

class MyTask extends Task
{
    public function configure()
    {
        // Setup eg. dependency container, load config,...
    }

    public function run()
    {
        // Do the real work here.
    }
}

// Add the task to the pool
$pool->add(new MyTask());

简单任务

如果您想封装任务的逻辑,但不想创建完整的Task对象,您还可以将可调用的对象传递给Pool

class InvokableClass
{
    // ...

    public function __invoke()
    {
        // ...
    }
}

$pool->add(new InvokableClass(/* ... */));

池配置

您可以创建任意数量的池,每个池都有自己的进程队列来处理。

池可由开发者配置

use Spatie\Async\Pool;

$pool = Pool::create()

// The maximum amount of processes which can run simultaneously.
    ->concurrency(20)

// The maximum amount of time a process may take to finish in seconds
// (decimal places are supported for more granular timeouts).
    ->timeout(15)

// Configure which autoloader sub processes should use.
    ->autoload(__DIR__ . '/../../vendor/autoload.php')
    
// Configure how long the loop should sleep before re-checking the process statuses in microseconds.
    ->sleepTime(50000)
;

同步回退

如果当前PHP运行时没有安装所需的扩展(pcntlposix),则 Pool 将自动回退到同步执行任务。

Pool 类有一个静态方法 isSupported,您可以调用它来检查您的平台是否能够运行异步进程。

如果您使用 Task 来运行进程,在同步模式下,只有这些任务的 run 方法会被调用。

幕后

当使用此包时,您可能想知道底层发生了什么。

我们使用 symfony/process 组件在PHP中创建和管理子进程。通过动态创建子进程,我们能够在并行中执行PHP脚本。这种并行性在处理多个同步任务时可以显著提高性能,因为这些任务实际上不需要相互等待。通过为这些任务分配单独的进程来运行,底层操作系统可以负责并行运行它们。

动态创建进程时有一个注意事项:您需要确保不会同时创建太多的进程,否则应用程序可能会崩溃。此包提供的 Pool 类通过安排和运行进程来处理您想要的任何数量的进程。

这就是 async()$pool->add() 的作用。现在让我们看看 await()$pool->wait() 的作用。

当创建了多个进程时,每个进程可能都有不同的完成时间。例如,一个进程可能需要等待HTTP调用,而另一个进程需要处理大量数据。有时您的代码中也有需要等待进程返回结果的位置。

这就是为什么我们必须在某个时间点等待的原因:为了确保所有池中的进程都已完成,这样我们就可以确保在没有意外杀死尚未完成的子进程的情况下继续执行。

通过使用 while 循环等待所有进程完成。通过在 SIGCHLD 信号上使用监听器来确定进程何时完成。当操作系统内核通知子进程完成时,会发出此信号。截至PHP 7.1,对监听和处理信号的支持要好得多,这使得这种方法比使用进程分叉或套接字进行通信更高效。您可以在此处了解更多信息:这里

进程完成后,会触发其成功事件,您可以通过 ->then() 函数来连接。同样,当进程失败或超时时,循环会更新该进程的状态并继续。当所有进程都完成后,while循环会看到没有更多要等待的,然后停止。这时,父进程可以继续执行。

与其他库的比较

我们撰写了一篇博客文章,其中包含有关此包的使用案例的更多信息,以及与其他异步PHP库(如ReactPHP和Amp)的比较:http://stitcher.io/blog/asynchronous-php

测试

composer test

变更日志

请参阅 CHANGELOG 了解最近有哪些变化。

贡献

请参阅 CONTRIBUTING 了解详细信息。

安全性

如果您发现了关于安全性的错误,请通过邮件 [email protected] 而不是使用问题跟踪器。

明信片软件

您可以自由使用此包,但如果它进入您的生产环境,我们非常感谢您从您的家乡寄给我们一张明信片,注明您正在使用我们的哪个包。

我们的地址是:Spatie,Kruikstraat 22,2018 安特卫普,比利时。

我们将所有收到的明信片发布在我们的公司网站上。请点击此处查看

鸣谢

许可证

MIT 许可证(MIT)。请参阅许可证文件以获取更多信息。