rdlowrey/alert

此包已被废弃,不再维护。作者建议使用 amphp/amp 包。

PHP应用程序的非阻塞并发框架。

资助包维护!
amphp

安装数: 2,559

依赖者: 2

建议者: 0

安全: 0

星标: 4,005

关注者: 131

分支: 259

v3.0.0 2022-12-18 16:52 UTC

README

AMPHP是一组为PHP设计的事件驱动库的集合,考虑了纤维和并发。 amphp/amp特别提供了用于异步编程的基本原语:未来和取消。我们现在使用Revolt而不是与amphp/amp一起分发事件循环实现。

Amp大量使用PHP 8.1附带的自带纤维来编写异步代码,就像同步、阻塞代码一样。与早期版本相比,不需要基于生成器的协程或回调。类似于线程,每个纤维都有自己的调用栈,但纤维由事件循环协同调度。使用Amp\async()并发地运行事情。

动机

传统上,PHP遵循顺序执行模型。PHP引擎按顺序顺序执行每一行。然而,程序通常由多个可以并发执行的独立子程序组成。

如果你查询数据库,你会以阻塞的方式发送查询并等待数据库服务器的响应。一旦你有了响应,你就可以开始做下一件事。我们可以在等待时坐在这里什么也不做,而可以发送下一个数据库查询,或者执行对API的HTTP调用。让我们利用我们通常花在等待I/O上的时间!

Revolt允许这样的并发I/O操作。我们通过避免回调来降低认知负担。我们的API可以像任何其他库一样使用,除了事物可以并发执行,因为我们底层使用非阻塞I/O。使用Amp\async()并发地运行事情,并在需要时使用Future::await()等待结果!

多年来,在PHP中实现并发已有很多技术,例如PHP 5中提供的回调和生成器。这些方法都存在“你的函数是什么颜色”问题,我们通过PHP 8.1中提供的纤维解决了这个问题。它们允许具有多个独立调用栈的并发。

纤维由事件循环协同调度,因此它们也被称为协程。重要的是要理解在任何给定时间只有一个协程正在运行,所有其他协程在此期间都被挂起。

你可以将协程比作使用单个CPU核心运行多个程序的计算机。每个程序都获得一个执行时间槽。然而,协程不是抢占式的。它们不会获得固定的时间槽。它们必须自愿将控制权交给事件循环。

任何阻塞I/O函数在等待I/O时都会阻塞整个进程。你想要避免它们。如果你还没有阅读安装指南,请查看Hello World示例,它演示了阻塞函数的效果。AMPHP提供的库避免I/O阻塞。

安装

此软件包可以作为Composer依赖项进行安装。

composer require amphp/amp

如果您使用此库,您很可能想要使用Revolt来安排事件,即使它作为依赖项自动安装,您也应该单独要求它。

composer require revolt/event-loop

这些软件包为PHP中异步/并发应用程序提供基本构建块。我们在这些基础上提供了许多软件包,例如。

要求

此软件包需要PHP 8.1或更高版本。无需任何扩展!

扩展只有在您的应用程序需要大量并发套接字连接时才需要,通常此限制配置为最多1024个文件描述符。

使用

协程

协程是可中断的函数。在PHP中,它们可以使用fibers实现。

注意
Amp的先前版本使用生成器来执行类似的目的,但纤程可以在调用堆栈的任何位置中断,使之前的Amp\call()样板代码变得不必要。

在任何给定时间,只有一个纤程正在运行。当一个协程挂起时,协程的执行会暂时中断,允许运行其他任务。一旦定时器到期、流操作成为可能或任何等待的Future完成,执行就会恢复。

协程的低级挂起和恢复由Revolt的Suspension API处理。

<?php

require __DIR__ . '/vendor/autoload.php';

use Revolt\EventLoop;

$suspension = EventLoop::getSuspension();

EventLoop::delay(5, function () use ($suspension): void {
    print '++ Executing callback created by EventLoop::delay()' . PHP_EOL;

    $suspension->resume(null);
});

print '++ Suspending to event loop...' . PHP_EOL;

$suspension->suspend();

print '++ Script end' . PHP_EOL;

在Revolt事件循环上注册的回调会自动作为协程运行,并且可以安全地挂起它们。除了事件循环API之外,还可以使用Amp\async()来启动独立的调用堆栈。

<?php

use function Amp\delay;

require __DIR__ . '/vendor/autoload.php';

Amp\async(function () {
    print '++ Executing callback passed to async()' . PHP_EOL;

    delay(3);

    print '++ Finished callback passed to async()' . PHP_EOL;
});

print '++ Suspending to event loop...' . PHP_EOL;
delay(5);

print '++ Script end' . PHP_EOL;

Future

Future是表示异步操作最终结果的对象。它有三个状态

  • 成功完成:Future已成功完成。
  • 错误:Future失败,并抛出异常。
  • 挂起:Future仍在挂起。

成功完成的Future类似于返回值,而失败的Future类似于抛出异常。

处理异步API的一种方法是在操作启动时传递回调,并在完成时调用它

doSomething(function ($error, $value) {
    if ($error) {
        /* ... */
    } else {
        /* ... */
    }
});

回调方法有几个缺点。

  • 传递回调并在其中执行依赖于第一个操作结果的进一步操作会很快变得混乱。
  • 函数需要一个显式的回调作为输入参数,而返回值则被简单忽略。没有涉及回调就无法使用此API。

这正是futures发挥作用的地方。它们是像其他任何返回值一样返回的结果占位符。调用者可以选择使用 Future::await() 等待结果,或者注册一个或多个回调。

try {
    $value = doSomething()->await();
} catch (...) {
    /* ... */
}

组合器

在并发应用程序中,将会有多个futures,你可能想等待它们全部完成,或者只等待第一个完成。

await

Amp\Future\await($iterable, $cancellation) 等待 iterable 中的所有 Future 对象。如果其中一个 Future 实例发生错误,操作将因该异常而中断。否则,结果是一个与输入 iterable 中的键匹配其完成值的数组。

由于 await() 组合器允许你同时并发执行许多异步操作,因此它非常强大。让我们通过使用 amphp/http-client 来检索多个HTTP资源并发地进行一个示例。

<?php

use Amp\Future;
use Amp\Http\Client\HttpClientBuilder;
use Amp\Http\Client\Request;

$httpClient = HttpClientBuilder::buildDefault();
$uris = [
    "google" => "https://www.google.com",
    "news"   => "https://news.google.com",
    "bing"   => "https://www.bing.com",
    "yahoo"  => "https://www.yahoo.com",
];

try {
    $responses = Future\await(array_map(function ($uri) use ($httpClient) {
        return Amp\async(fn () => $httpClient->request(new Request($uri, 'HEAD')));
    }, $uris));

    foreach ($responses as $key => $response) {
        printf(
            "%s | HTTP/%s %d %s\n",
            $key,
            $response->getProtocolVersion(),
            $response->getStatus(),
            $response->getReason()
        );
    }
} catch (Exception $e) {
    // If any one of the requests fails the combo will fail
    echo $e->getMessage(), "\n";
}
awaitAnyN

Amp\Future\awaitAnyN($count, $iterable, $cancellation)await() 相同,但可以容忍个别错误。当 iterable 中的恰好 $count 个实例成功完成时,将返回一个结果。返回值是一个值数组。组件数组中的个别键是从传递给函数进行评估的 iterable 中保留的。

awaitAll

Amp\Promise\awaitAll($iterable, $cancellation) 等待所有futures,并以 [$errors, $values] 数组的形式返回它们的结果。

awaitFirst

Amp\Promise\awaitFirst($iterable, $cancellation) 解包第一个完成的 Future,无论是成功完成还是出错。

awaitAny

Amp\Promise\awaitAny($iterable, $cancellation) 解包第一个成功完成的 Future

Future 创建

Futures可以通过几种方式创建。大多数代码将使用 Amp\async(),它接受一个函数并将其作为另一个Fiber中的协程运行。

有时,接口强制返回一个 Future,但结果立即可用,例如,因为它们被缓存。在这些情况下,可以使用 Future::complete(mixed)Future::error(Throwable) 来构造一个立即完成的 Future

DeferredFuture

注意
下面描述的 DeferredFuture API 是一个高级API,许多应用程序可能不需要。尽可能使用 Amp\async()组合器

Amp\DeferredFuture 负责完成挂起的 Future。你创建一个 Amp\DeferredFuture,并使用它的 getFuture 方法返回一个 Amp\Future 给调用者。一旦结果准备好,你将使用在关联的 DeferredFuture 上调用 completeerror 来完成调用者持有的 Future

final class DeferredFuture
{
    public function getFuture(): Future;
    public function complete(mixed $value = null);
    public function error(Throwable $throwable);
}

警告
如果你正在传递 DeferredFuture 对象,你可能做错了什么。它们应该是你操作的内状态。

警告
你不能用另一个未来完成一个未来;在这种情况下,在调用 DeferredFuture::complete() 之前使用 Future::await()

以下是一个简单的异步值生产者 asyncMultiply() 创建一个 DeferredFuture 并将其关联的 Future 返回给调用者的例子。

<?php // Example async producer using DeferredFuture

use Revolt\EventLoop;

function asyncMultiply(int $x, int $y): Future
{
    $deferred = new Amp\DeferredFuture;

    // Complete the async result one second from now
    EventLoop::delay(1, function () use ($deferred, $x, $y) {
        $deferred->complete($x * $y);
    });

    return $deferred->getFuture();
}

$future = asyncMultiply(6, 7);
$result = $future->await();

var_dump($result); // int(42)

取消操作

支持取消操作的操作都接受一个 Cancellation 实例作为参数。取消操作是允许注册处理程序以订阅取消请求的对象。这些对象传递给子操作或由操作本身处理。

$cancellation->throwIfRequested() 可用于在取消请求后使用 CancelledException 失败当前操作。虽然 throwIfRequested() 工作得很好,但某些操作可能希望使用回调来订阅。他们可以使用 Cancellation::subscribe() 订阅可能发生的任何取消请求。

调用者可以通过以下实现之一创建 Cancellation

注意
取消操作仅提供建议。DNS 解析器可能忽略查询发送后的取消请求,因为响应仍然需要处理,并且仍然可以缓存。HTTP 客户端可能会继续几乎完成 HTTP 请求以重用连接,但可能会中止分块编码响应,因为它无法知道继续是否实际上比取消更便宜。

超时取消操作

TimeoutCancellations 在指定秒数后自动取消自己。

request("...", new Amp\TimeoutCancellation(30));

信号取消操作

SignalCancellation 在当前进程接收到指定信号后自动取消自己。

request("...", new Amp\SignalCancellation(SIGINT));

延迟取消操作

DeferredCancellation 允许通过调用方法手动取消。如果你需要在某处注册一些自定义回调而不是提供自己的实现,这是首选的方式。只有调用者可以访问 DeferredCancellation 并使用 DeferredCancellation::cancel() 来取消操作。

$deferredCancellation = new Amp\DeferredCancellation();

// Register some custom callback somewhere
onSomeEvent(fn () => $deferredCancellation->cancel());

request("...", $deferredCancellation->getCancellation());

空取消操作

NullCancellation 永远不会取消。取消通常是可选的,通常通过使参数可空来实现。为了避免使用像 if ($cancellation) 这样的守卫,可以使用 NullCancellation

$cancellation ??= new NullCancellationToken();

组合取消操作

CompositeCancellation 结合多个独立的取消对象。如果这些取消中的任何一个被取消,则 CompositeCancellation 本身也会被取消。

版本控制

amphp/amp 遵循与所有其他 amphp 包相同的 semver 语义版本规范。

兼容包

兼容包应使用 GitHub 上的 amphp 主题。

安全

如果您发现任何与安全相关的问题,请通过电子邮件发送至 me@kelunik.com,而不是使用问题跟踪器。

许可协议

MIT 许可协议 (MIT)。请参阅 LICENSE 了解更多信息。