jenky/concurrency

使用PSR-18客户端实现并发发送HTTP请求的简单高效解决方案。

0.1.0 2023-11-05 13:13 UTC

This package is auto-updated.

Last update: 2024-09-05 15:14:56 UTC


README

Latest Version on Packagist Github Actions Codecov Total Downloads Software License

使用PSR-18客户端实现并发发送HTTP请求的简单高效解决方案。

Peak是一个库,它允许使用请求池并发发送请求。它利用AMPHP、ReactPHP或PSL的事件循环来并发处理和管理请求。

要求

  • PHP 8.1或更高版本。
  • 一个底层使用Fibers支持非阻塞I/O的包(现在称为驱动)。

安装

您可以通过composer安装此包

composer require fansipan/peak

此外,根据您选择的驱动程序,可能还需要安装以下包。

AMPHP

composer require amphp/pipeline

PSL

composer require azjezz/psl

ReactPHP

composer require clue/mq-react react/async

用法

创建请求池

典型应用会使用PoolFactory类来创建一个池。

use Fansipan\Peak\PoolFactory;

/** @var \Psr\Http\Client\ClientInterface $client */
$pool = PoolFactory::createForClient($client);

它将尝试使用AsyncClientFactory创建客户端的异步版本。支持的客户端是GuzzleSymfony HTTPClientPsr18Client)。

您可以使用任何PSR-18客户端实现与ReactPHP驱动程序一起使用。如果使用了不支持的客户端,它将被替换为Browser HTTP客户端(需要安装react/http)。

Fansipan\Peak\PoolFactory提供基于已安装包的配置请求池,这适用于大多数情况。但是,如果您希望,您可以指定特定的实现,如果它在您的平台和/或应用程序上可用。

首先,您需要创建您的驱动程序

use Fansipan\Peak\Concurrency\AmpDeferred;
use Fansipan\Peak\Concurrency\PslDeferred;
use Fansipan\Peak\Concurrency\ReactDeferred;

// AMPHP
$defer = new AmpDeferred();

// PSL
$defer = new PslDeferred();

// ReactPHP
$defer = new ReactDeferred();

然后创建一个异步客户端,这本质上是PSR-18客户端的包装器

use Fansipan\Peak\Client\GuzzleClient;
use Fansipan\Peak\Client\SymfonyClient;
use Fansipan\Peak\ClientPool;

// Guzzle

$asyncClient = new GuzzleClient($defer);
// or using existing Guzzle client
/** @var \GuzzleHttp\ClientInterface $client */
$asyncClient = new GuzzleClient($defer, $client);

// Symfony HTTP Client

$asyncClient = new SymfonyClient($defer);
// or using existing Symfony client
/** @var \Symfony\Contracts\HttpClient\HttpClientInterface $client */
$asyncClient = new SymfonyClient($defer, $client);


$pool = new ClientPool($asyncClient);

发送请求

send方法接受一个PSR-7请求的迭代器或接受一个Psr\Http\Client\ClientInterface实例的闭包/可调用类。

use Psr\Http\Client\ClientInterface;
use Psr\Http\Message\RequestInterface;
use Psr\Http\Message\ResponseInterface;

// Using array
$responses = $pool->send([
    $psr7Request,
    fn (ClientInterface $client): ResponseInterface => $client->sendRequest($psr7Request),
]);

var_dump($responses[0]);
var_dump($responses[1]);

// Using generator when you have an indeterminate amount of requests you wish to send
$requests = static function (int $total) {
    for ($i = 0; $i < $total; $i++) {
        yield $psr7Request;
    }
}
$responses = $pool->send($requests(100));

检索响应

如上例所示,每个响应实例都可以使用索引访问。但是,响应顺序没有保证。如果您愿意,您可以给请求分配名称,以便轻松跟踪已发送的特定请求。这允许您通过其分配的名称访问相应的响应。

use Psr\Http\Client\ClientInterface;
use Psr\Http\Message\RequestInterface;
use Psr\Http\Message\ResponseInterface;

$responses = $pool->send([
    'first' => $psr7Request,
    'second' => fn (ClientInterface $client): ResponseInterface => $client->sendRequest($psr7Request),
]);

// Or using generator

$requests = function (): \Generator {
    yield 'first' => $psr7Request;
    yield 'second' => fn (ClientInterface $client): ResponseInterface => $client->sendRequest($psr7Request);
};

$responses = $pool->send($requests());

var_dump($responses['first']);
var_dump($responses['second']);

并发限制

发送过多的请求可能会耗尽您这边的所有资源,甚至如果远程端看到您这边的请求数量不合理,可能会被禁用。

因此,通常建议在发送端将并发限制在一个合理的值。通常使用一个相对较小的限制,因为同时做超过十几件事情可能会轻易压倒接收端。

您可以使用concurrent方法设置并发发送的最大请求数。默认值是25

$response = $pool
    ->concurrent(10) // Process up to 10 requests concurrently
    ->send($requests);

超过并发限制的额外请求将自动入队,直到某个挂起的请求完成。

测试

composer test

变更日志

有关最近更改的更多信息,请参阅CHANGELOG

贡献

请参阅CONTRIBUTINGCODE_OF_CONDUCT以获取详细信息。

安全

如果您发现任何与安全相关的问题,请通过contact@lynh.me发送电子邮件,而不是使用问题跟踪器。

致谢

许可协议

MIT许可协议(MIT)。请参阅许可文件以获取更多信息。