amphp/amp

PHP应用的非阻塞并发框架。

维护者

详细信息

github.com/amphp/amp

主页

源代码

问题

资助包维护!
amphp

安装次数: 72,366,244

依赖者: 309

建议者: 6

安全性: 0

星标: 4,205

关注者: 132

分支: 255

开放问题: 18

v3.0.2 2024-05-10 21:37 UTC

README

AMPHP是一组为PHP设计的事件驱动库集合,考虑了纤维和并发。特别地,amphp/amp提供了未来和取消作为异步编程的基本原语。我们现在使用Revolt,而不是在amphp/amp中提供事件循环实现。

Amp大量使用PHP 8.1提供的纤维来编写类似于同步、阻塞代码的异步代码。与早期版本相比,不需要基于生成器的协程或回调。与线程类似,每个纤维都有自己的调用栈,但纤维由事件循环合作调度。使用Amp\async()并发运行。

动机

传统上,PHP遵循顺序执行模型。PHP引擎按顺序顺序执行每行代码。然而,程序通常由多个可以并发执行的独立子程序组成。

如果你查询数据库,你会以阻塞的方式发送查询并等待数据库服务器的响应。一旦你得到响应,你就可以开始做下一件事。我们本可以在等待时发送下一个数据库查询或对API进行HTTP调用。让我们利用我们通常用于等待I/O的时间!

Revolt允许这种并发I/O操作。我们通过避免回调来降低认知负担。我们的API可以像任何其他库一样使用,除了它们可以并发工作,因为我们底层使用非阻塞I/O。使用Amp\async()并发运行,并在需要时使用Future::await()等待结果!

多年来,PHP中实现并行的技术有很多,例如PHP 5中提供的回调和生成器。这些方法都存在“你的函数是什么颜色”问题,我们通过在PHP 8.1中提供纤维来解决它。它们允许具有多个独立调用栈的并行。

纤维由事件循环合作调度,因此它们也被称为协程。重要的是要理解在任何给定时间只有一个协程正在运行,其他所有协程在此期间都处于挂起状态。

你可以将协程比作一个使用单个CPU核心运行多个程序的计算机。每个程序都有一个执行时间槽。然而,协程不是抢占式的。它们不会得到固定的时间槽。它们必须自愿放弃控制权给事件循环。

任何阻塞I/O函数在等待I/O时都会阻塞整个进程。你想要避免它们。如果你还没有阅读安装指南,请查看Hello World示例,该示例演示了阻塞函数的效果。AMPHP提供的库避免了I/O的阻塞。

安装

此包可以作为Composer依赖项安装。

composer require amphp/amp

如果您使用这个库,您很可能想使用Revolt来安排事件,即使它作为依赖项自动安装,您也必须单独要求它。

composer require revolt/event-loop

这些包为PHP中的异步/并发应用程序提供了基本构建块。我们提供了许多在这些包之上构建的包,例如:

需求

此包需要PHP 8.1或更高版本。不需要任何扩展!

扩展仅在您的应用程序需要大量并发套接字连接时才需要,通常此限制配置为最多1024个文件描述符。

使用方法

协程

协程是可中断的函数。在PHP中,可以使用fibers来实现。

注意Amp的早期版本使用生成器来执行类似操作,但纤丝可以在调用堆栈的任何地方被中断,这使得之前的Amp\call()样板代码变得不必要。

在任何给定时间,只有一个纤丝正在运行。当协程挂起时,协程的执行会暂时中断,允许运行其他任务。一旦计时器到期,可以进行流操作,或者任何待定的Future完成,执行就会恢复。

协程的低级挂起和恢复由Revolt的Suspension API处理。

<?php

require __DIR__ . '/vendor/autoload.php';

use Revolt\EventLoop;

$suspension = EventLoop::getSuspension();

EventLoop::delay(5, function () use ($suspension): void {
    print '++ Executing callback created by EventLoop::delay()' . PHP_EOL;

    $suspension->resume(null);
});

print '++ Suspending to event loop...' . PHP_EOL;

$suspension->suspend();

print '++ Script end' . PHP_EOL;

在Revolt事件循环上注册的回调会自动作为协程运行,并且可以安全地挂起它们。除了事件循环API之外,还可以使用Amp\async()来启动独立的调用堆栈。

<?php

use function Amp\delay;

require __DIR__ . '/vendor/autoload.php';

Amp\async(function () {
    print '++ Executing callback passed to async()' . PHP_EOL;

    delay(3);

    print '++ Finished callback passed to async()' . PHP_EOL;
});

print '++ Suspending to event loop...' . PHP_EOL;
delay(5);

print '++ Script end' . PHP_EOL;

Future

Future是一个表示异步操作最终结果的对象。有三种状态:

  • 成功完成:Future已成功完成。
  • 出错:Future因异常失败。
  • 挂起:Future仍在挂起。

成功完成的Future类似于返回值,而失败的Future类似于抛出异常。

处理异步API的一种方法是在操作开始时传递回调,并在操作完成后调用它。

doSomething(function ($error, $value) {
    if ($error) {
        /* ... */
    } else {
        /* ... */
    }
});

回调方法有几个缺点。

  • 在回调中传递回调并执行依赖于第一次操作结果的进一步操作会很快变得混乱。
  • 显式回调是函数的输入参数,而返回值被简单地忽略。没有不涉及回调的方式来使用此API。

这就是Future发挥作用的地方。它们是结果的占位符,就像任何其他返回值一样被返回。调用者可以选择使用Future::await()等待结果或注册一个或多个回调。

try {
    $value = doSomething()->await();
} catch (...) {
    /* ... */
}

组合器

在并发应用程序中,可能会有多个Future,您可能希望等待所有Future,或者只等待第一个。

await

Amp\Future\await($iterable, $cancellation) 等待一个可迭代对象中的所有 Future 对象。如果任何一个 Future 实例出现错误,操作将因该异常而终止。否则,结果是一个与输入 iterable 的键匹配的完成值的数组。

await() 组合子非常强大,因为它允许你同时并发执行多个异步操作。让我们看看使用 amphp/http-client 来并发检索多个 HTTP 资源的示例。

<?php

use Amp\Future;
use Amp\Http\Client\HttpClientBuilder;
use Amp\Http\Client\Request;

$httpClient = HttpClientBuilder::buildDefault();
$uris = [
    "google" => "https://www.google.com",
    "news"   => "https://news.google.com",
    "bing"   => "https://www.bing.com",
    "yahoo"  => "https://www.yahoo.com",
];

try {
    $responses = Future\await(array_map(function ($uri) use ($httpClient) {
        return Amp\async(fn () => $httpClient->request(new Request($uri, 'HEAD')));
    }, $uris));

    foreach ($responses as $key => $response) {
        printf(
            "%s | HTTP/%s %d %s\n",
            $key,
            $response->getProtocolVersion(),
            $response->getStatus(),
            $response->getReason()
        );
    }
} catch (Exception $e) {
    // If any one of the requests fails the combo will fail
    echo $e->getMessage(), "\n";
}
awaitAnyN

Amp\Future\awaitAnyN($count, $iterable, $cancellation)await() 相同,但它可以容忍单个错误。一旦可迭代对象中的 $count 个实例成功完成,就返回一个结果。返回值是一个值数组。组件数组中的单个键保留自传递给函数进行评估的可迭代对象。

awaitAll

Amp\Promise\awaitAll($iterable, $cancellation) 等待所有未来并返回一个包含 [$errors, $values] 的数组。

awaitFirst

Amp\Promise\awaitFirst($iterable, $cancellation) 解包第一个完成的 Future,无论它是成功完成还是出错。

awaitAny

Amp\Promise\awaitAny($iterable, $cancellation) 解包第一个成功完成的 Future

Future 创建

未来可以通过几种方式创建。大多数代码将使用 Amp\async(),它接受一个函数并在另一个 Fiber 中运行它作为协程。

有时接口强制返回一个 Future,但结果立即可用,例如,因为它们被缓存。在这些情况下,可以使用 Future::complete(mixed)Future::error(Throwable) 来构造一个立即完成的 Future

DeferredFuture

注意 下面的 DeferredFuture API 是一个高级 API,许多应用程序可能不需要它。在可能的情况下,请使用 Amp\async()组合子

Amp\DeferredFuture 负责完成一个挂起的 Future。你创建一个 Amp\DeferredFuture,并使用它的 getFuture 方法返回一个 Amp\Future 给调用者。一旦结果准备就绪,你将使用链接的 DeferredFuture 上的 completeerror 完成调用者持有的 Future

final class DeferredFuture
{
    public function getFuture(): Future;
    public function complete(mixed $value = null);
    public function error(Throwable $throwable);
}

警告 如果你正在传递 DeferredFuture 对象,你很可能在做一些错误的事情。它们应该是你的操作的内态。

警告 你不能使用另一个未来来完成一个未来;在这种情况下,在调用 DeferredFuture::complete() 之前,请使用 Future::await()

以下是一个异步值生产者 asyncMultiply() 的简单示例,它创建一个 DeferredFuture 并将其关联的 Future 返回给调用者。

<?php // Example async producer using DeferredFuture

use Revolt\EventLoop;

function asyncMultiply(int $x, int $y): Future
{
    $deferred = new Amp\DeferredFuture;

    // Complete the async result one second from now
    EventLoop::delay(1, function () use ($deferred, $x, $y) {
        $deferred->complete($x * $y);
    });

    return $deferred->getFuture();
}

$future = asyncMultiply(6, 7);
$result = $future->await();

var_dump($result); // int(42)

取消

支持取消的操作接受一个 Cancellation 实例作为参数。取消是对象,允许注册处理程序以订阅取消请求。这些对象被传递给子操作或必须由操作本身处理。

$cancellation->throwIfRequested() 可以在取消请求被提出后使用,以使用 CancelledException 失败当前操作。虽然 throwIfRequested() 工作得很好,但某些操作可能希望使用回调进行订阅。他们可以通过使用 Cancellation::subscribe() 订阅任何可能发生的取消请求来这样做。

调用者可以通过以下实现之一创建一个 Cancellation

注意 取消是建议性的。DNS 解析器可能在查询发送后忽略取消请求,因为响应必须处理,并且仍然可以被缓存。HTTP 客户端可能会继续一个几乎完成 HTTP 请求以重用连接,但可能会终止分块编码响应,因为它无法知道继续实际上是否比取消更便宜。

TimeoutCancellation

TimeoutCancellations 在指定秒数后自动取消。

request("...", new Amp\TimeoutCancellation(30));

SignalCancellation

当当前进程收到指定信号后,SignalCancellation 会自动取消。

request("...", new Amp\SignalCancellation(SIGINT));

DeferredCancellation

DeferredCancellation 允许通过调用方法来手动取消。如果您需要在某处注册自定义回调而不是提供自己的实现,这是首选方式。只有调用者可以访问 DeferredCancellation 并使用 DeferredCancellation::cancel() 来取消操作。

$deferredCancellation = new Amp\DeferredCancellation();

// Register some custom callback somewhere
onSomeEvent(fn () => $deferredCancellation->cancel());

request("...", $deferredCancellation->getCancellation());

NullCancellation

NullCancellation 从不会被取消。取消通常是可选的,这通常通过使参数可以为 null 来实现。为了避免使用像 if ($cancellation) 这样的守卫,可以使用 NullCancellation

$cancellation ??= new NullCancellationToken();

CompositeCancellation

CompositeCancellation 结合了多个独立的取消对象。如果这些取消中的任何一个被取消,则 CompositeCancellation 本身也将被取消。

版本控制

amphp/amp 遵循类似于所有其他 amphp 包的 semver 语义版本规范。

兼容包

兼容包应使用 GitHub 上的 amphp 主题。

安全性

如果您发现任何安全问题,请通过电子邮件 [email protected] 而不是使用问题跟踪器来报告。

许可证

MIT 许可证 (MIT)。有关更多信息,请参阅 LICENSE