juzaweb/async

此包已被弃用且不再维护。未建议替代包。

基于PCNTL扩展的异步和并行PHP

资助包维护!
spatie

1.5.4 2021-09-30 07:32 UTC

This package is auto-updated.

Last update: 2024-04-29 04:53:42 UTC


README

Latest Version on Packagist Build Status Quality Score Total Downloads

此库提供了一个围绕PHP的PCNTL扩展的小型且易于使用的包装器。它允许并行运行不同的进程,具有易于使用的API。

支持我们

68747470733a2f2f6769746875622d6164732e73332e65752d63656e7472616c2d312e616d617a6f6e6177732e636f6d2f6173796e632e6a70673f743d31

我们投入了大量资源来创建最佳类别的开源包。您可以通过购买我们的付费产品之一来支持我们。

我们非常感谢您从家乡寄来明信片,并提到您正在使用我们哪个包。您可以在我们的联系页面上找到我们的地址。我们将所有收到的明信片发布在我们的虚拟明信片墙上

安装

您可以通过composer安装此包

composer require spatie/async

用法

use Spatie\Async\Pool;

$pool = Pool::create();

foreach ($things as $thing) {
    $pool->add(function () use ($thing) {
        // Do a thing
    })->then(function ($output) {
        // Handle success
    })->catch(function (Throwable $exception) {
        // Handle exception
    });
}

$pool->wait();

事件监听器

在创建异步进程时,您将返回一个ParallelProcess实例。您可以在进程上添加以下事件钩子。

$pool
    ->add(function () {
        // ...
    })
    ->then(function ($output) {
        // On success, `$output` is returned by the process or callable you passed to the queue.
    })
    ->catch(function ($exception) {
        // When an exception is thrown from within a process, it's caught and passed here.
    })
    ->timeout(function () {
        // A process took too long to finish.
    })
;

功能API

您也可以使用asyncawait辅助函数,而不是在$pool对象上使用方法。

use Spatie\Async\Pool;

$pool = Pool::create();

foreach (range(1, 5) as $i) {
    $pool[] = async(function () {
        usleep(random_int(10, 1000));

        return 2;
    })->then(function (int $output) {
        $this->counter += $output;
    });
}

await($pool);

错误处理

如果子进程中抛出了ExceptionError,可以在->catch()方法中指定一个回调来按进程捕获。

$pool
    ->add(function () {
        // ...
    })
    ->catch(function ($exception) {
        // Handle the thrown exception for this child process.
    })
;

如果没有添加错误处理器,当调用await()$pool->wait()时,错误将在父进程中抛出。

如果子进程意外停止而没有抛出Throwable,则写入stderr的输出将被包装并作为父进程中的Spatie\Async\ParallelError抛出。

按类型捕获异常

通过类型提示catch函数,您可以提供多个错误处理器,每个处理器针对错误的不同类型。

$pool
    ->add(function () {
        throw new MyException('test');
    })
    ->catch(function (MyException $e) {
        // Handle `MyException`
    })
    ->catch(function (OtherException $e) {
        // Handle `OtherException`
    });

请注意,一旦异常被处理,就不会触发其他处理器

$pool
    ->add(function () {
        throw new MyException('test');
    })
    ->catch(function (MyException $e) {
        // This one is triggerd when `MyException` is thrown
    })
    ->catch(function (Exception $e) {
        // This one is not triggerd, even though `MyException` extends `Exception`
    });

停止池

如果您需要提前停止池,因为子进程已经完成了任务,您可以使用$pool->stop()方法。这将防止池启动任何额外的进程。

use Spatie\Async\Pool;

$pool = Pool::create();

// Generate 10k processes generating random numbers
for($i = 0; $i < 10000; $i++) {
    $pool->add(function() use ($i) {
        return rand(0, 100);
    })->then(function($output) use ($pool) {
        // If one of them randomly picks 100, end the pool early.
        if ($output === 100) {
            $pool->stop();
        }
    });
}

$pool->wait();

请注意,停止后池将变得无用,如果需要,应创建新的池。

使用另一个PHP二进制文件

默认情况下,池将使用php来执行其子进程。您可以通过以下方式配置其他二进制文件

Pool::create()
    ->withBinary('/path/to/php');

与任务一起工作

除了使用闭包外,您还可以使用Task。在需要更多子进程设置工作的场景中,Task非常有用。因为子进程总是从无到有地启动,所以您可能想在执行任务之前初始化例如依赖项容器。Task类使这更容易做到。

use Spatie\Async\Task;

class MyTask extends Task
{
    public function configure()
    {
        // Setup eg. dependency container, load config,...
    }

    public function run()
    {
        // Do the real work here.
    }
}

// Add the task to the pool
$pool->add(new MyTask());

简单任务

如果您想封装任务逻辑,但又不想创建完整的Task对象,您也可以将可调用的对象传递给Pool

class InvokableClass
{
    // ...

    public function __invoke()
    {
        // ...
    }
}

$pool->add(new InvokableClass(/* ... */));

池配置

您可以创建任意数量的池,每个池都有自己的进程队列。

池可由开发者配置

use Spatie\Async\Pool;

$pool = Pool::create()

// The maximum amount of processes which can run simultaneously.
    ->concurrency(20)

// The maximum amount of time a process may take to finish in seconds
// (decimal places are supported for more granular timeouts).
    ->timeout(15)

// Configure which autoloader sub processes should use.
    ->autoload(__DIR__ . '/../../vendor/autoload.php')
    
// Configure how long the loop should sleep before re-checking the process statuses in microseconds.
    ->sleepTime(50000)
;

同步回退

如果当前PHP运行时未安装所需的扩展(pcntlposix),则Pool将自动回退到任务的同步执行。

Pool类有一个静态方法isSupported,您可以通过调用此方法来检查您的平台是否能够运行异步进程。

如果您使用Task来运行进程,在同步模式下运行时,只会调用这些任务的run方法。

幕后

在使用这个包时,您可能想知道幕后发生了什么。

我们使用symfony/process组件在PHP中创建和管理子进程。通过动态创建子进程,我们能够并行执行PHP脚本。这种并行性在处理多个同步任务时可以显著提高性能,因为这些任务实际上并不需要相互等待。通过为这些任务分配一个单独的进程来运行,底层操作系统可以负责并行运行它们。

动态生成进程时有一个需要注意的地方:您需要确保一次不会生成太多进程,否则应用程序可能会崩溃。此包提供的Pool类会处理您想要的任何数量的进程,通过安排和运行它们,在可能的时候。

这就是async()$pool->add()所做的事情。现在让我们看看await()$pool->wait()所做的是什么。

当生成多个进程时,每个进程可能有自己的完成时间。例如,一个进程可能需要等待HTTP调用,而另一个则必须处理大量数据。有时您的代码中也有必须等待进程结果返回的点。

这就是为什么我们必须在某个时间点等待的原因:等待池中的所有进程完成,这样我们就可以确保可以安全继续,而不会意外杀死尚未完成的子进程。

通过使用while循环来等待所有进程完成,该循环将等待直到所有进程完成。确定进程何时完成是通过监听SIGCHLD信号来完成的。当操作系统内核通知子进程完成时,会发出此信号。截至PHP 7.1,对监听和处理信号的支持有了很大改善,这使得这种方法比使用进程fork或套接字进行通信等方法更高效。您可以在此处了解更多信息:这里

当进程完成时,将触发其成功事件,您可以使用->then()函数来挂钩。同样,当进程失败或超时时,循环将更新该进程的状态并继续。当所有进程都完成后,while循环将看到没有更多等待的内容,然后停止。这是父进程可以继续执行的时刻。

与其他库的比较

我们撰写了一篇博客文章,其中包含有关此包使用案例的更多信息,以及与其他异步PHP库(如ReactPHP和Amp)的比较:http://stitcher.io/blog/asynchronous-php

测试

composer test

变更日志

请参阅CHANGELOG以获取有关最近更改的更多信息。

贡献

请参阅CONTRIBUTING以获取详细信息。

安全

如果您发现任何安全相关的问题,请通过电子邮件freek@spatie.be报告,而不是使用问题跟踪器。

明信片软件

您可以自由使用此包,但如果它进入您的生产环境,我们非常感激您从家乡寄给我们一张明信片,说明您正在使用我们哪个包。

我们的地址是:Spatie,Kruikstraat 22,2018 安特卫普,比利时。

我们将发布收到的所有明信片在我们的公司网站上

鸣谢

许可证

MIT 许可证 (MIT)。请参阅许可证文件获取更多信息。