clue/mq-react

Mini Queue,基于ReactPHP的轻量级内存消息队列,可同时并发处理许多(但不是太多)事情

v1.6.0 2023-07-28 14:12 UTC

This package is auto-updated.

Last update: 2024-08-29 11:56:18 UTC


README

CI status code coverage installs on Packagist

Mini Queue,基于ReactPHP的轻量级内存消息队列,可同时并发处理许多(但不是太多)事情。

假设你爬取一个页面并发现你需要向以下页面发送100个HTTP请求,每个请求耗时约0.2秒。你可以选择按顺序发送它们(大约需要20秒),或者使用ReactPHP并发地请求所有页面。这对于少量操作来说效果很好,但发送过多的请求可能会耗尽你方的资源,或者因为远程方认为你的请求数量不合理而被禁止。相反,你可以使用这个库来有效地限制操作速率,并将多余的请求排队,以便一次不过度处理太多操作。这个库提供了一个简单易用的API,可以轻松管理各种异步操作,无需处理大部分底层细节。你可以用它来限制多个HTTP请求、数据库查询或几乎任何已使用Promise的API。

  • 异步执行操作 - 处理任意数量的异步操作,并选择同时处理多少个操作,以及内存中可以排队多少个操作。在收到响应后立即处理结果。基于Promise的设计提供了处理无序结果的合理接口。
  • 轻量级、SOLID设计 - 提供了一个“足够好”的抽象,不会干扰你。基于经过良好测试的组件和已建立的概念,而不是重新发明轮子。
  • 良好的测试覆盖率 - 配备了自动测试套件,并在现实世界中定期测试。

目录

支持我们

我们在开发、维护和更新我们出色的开源项目上投入了大量的时间。你可以通过在GitHub上成为赞助商来帮助我们维持我们工作的这种高质量。赞助商将获得许多回报,有关详细信息,请参阅我们的赞助页面

让我们一起把这些项目提升到下一个层次!🚀

快速入门示例

一旦 安装,你可以使用以下代码访问HTTP Web服务器并发送大量HTTP GET请求

<?php

require __DIR__ . '/vendor/autoload.php';

$browser = new React\Http\Browser();

// load a huge array of URLs to fetch
$urls = file('urls.txt');

// each job should use the browser to GET a certain URL
// limit number of concurrent jobs here
$q = new Clue\React\Mq\Queue(3, null, function ($url) use ($browser) {
    return $browser->get($url);
});

foreach ($urls as $url) {
    $q($url)->then(function (Psr\Http\Message\ResponseInterface $response) use ($url) {
        echo $url . ': ' . $response->getBody()->getSize() . ' bytes' . PHP_EOL;
    }, function (Exception $e) {
        echo 'Error: ' . $e->getMessage() . PHP_EOL;
    });
}

另请参阅示例

用法

队列

队列负责管理您的操作并确保一次不要执行太多操作。它是对漏桶算法的一种非常简单和轻量级的内存实现。

这意味着您可以控制可以并发执行的操作数量。如果您向队列中添加一个作业,并且它仍然低于限制,它将立即执行。如果您继续向队列中添加新的作业,并且其并发限制达到,它将不会启动新的操作,而是将此作业排队以供将来执行。一旦某个待处理的操作完成,它将从队列中取出下一个作业并执行此操作。

可以使用new Queue(int $concurrency, ?int $limit, callable(mixed):PromiseInterface $handler)调用创建一个新的队列实例。您可以创建任意数量的队列,例如当您想对不同类型的操作应用不同的限制时。

$concurrency参数设置了一个新的软限制,即一次可以处理的作业的最大数量。找到合适的并发限制取决于您的特定用例。通常,将并发限制限制在一个相对较小的值是很常见的,因为同时做超过十件事可能会轻易压倒接收方。

$limit参数设置了一个新的硬限制,即一次可能有多少作业处于待处理状态(保存在内存中)。根据您的特定用例,通常可以安全地保留几百或几千个作业在内存中。如果您不想应用上限,可以传递一个null值,这在语义上比传递一个大数字更有意义。

// handle up to 10 jobs concurrently, but keep no more than 1000 in memory
$q = new Queue(10, 1000, $handler);
// handle up to 10 jobs concurrently, do not limit queue size
$q = new Queue(10, null, $handler);
// handle up to 10 jobs concurrently, reject all further jobs
$q = new Queue(10, 10, $handler);

$handler参数必须是一个有效的可调用函数,它接受您的作业参数,调用适当的操作,并返回一个Promise作为其未来结果的占位符。

// using a Closure as handler is usually recommended
$q = new Queue(10, null, function ($url) use ($browser) {
    return $browser->get($url);
});
// accepts any callable, so PHP's array notation is also supported
$q = new Queue(10, null, array($browser, 'get'));

承诺

这个库假设您想要并发处理基于Promise的API的异步操作。

出于演示目的,本文档中的示例使用ReactPHP的异步HTTP客户端,但您可以使用任何与此项目兼容的Promise-based API。它的API可以像这样使用

$browser = new React\Http\Browser();

$promise = $browser->get($url);

如果您将其包裹在上述Queue实例中,此代码将看起来像这样

$browser = new React\Http\Browser();

$q = new Queue(10, null, function ($url) use ($browser) {
    return $browser->get($url);
});

$promise = $q($url);

$q实例是可调用的,因此调用$q(...$args)实际上将被转发为$browser->get(...$args),正如在并发限制以下时在$handler参数中给出的那样。

每个操作都期望是异步的(非阻塞的),因此您实际上可以并发调用多个操作(并行发送多个请求)。$handler负责对每个请求响应一个解析值,顺序不保证。这些操作使用基于Promise的接口,这使得它容易响应操作何时完成(即成功解决或因错误而被拒绝)。

$promise->then(
    function ($result) {
        var_dump('Result received', $result);
    },
    function (Exception $error) {
        var_dump('There was an error', $error->getMessage());
    }
);

每个操作可能需要一些时间才能完成,但由于其异步性质,您实际上可以开始任意数量的(排队)操作。一旦达到并发限制,此调用将简单地排队,并且这将返回一个挂起的Promise,一旦另一个操作完成,它将启动实际操作。这意味着这是完全透明的处理,您不需要自己担心此并发限制。

如果您觉得这很奇怪,您还可以使用更传统的阻塞API

取消

返回的Promise是以一种方式实现的,可以在它仍然是挂起状态时取消。取消挂起的操作将调用其取消处理程序,该处理程序负责使用异常拒绝其值,并清理任何底层资源。

$promise = $q($url);

Loop::addTimer(2.0, function () use ($promise) {
    $promise->cancel();
});

同样,取消一个已排队但尚未启动的操作将直接被拒绝,而不会启动该操作。

超时

默认情况下,该库不限制单个操作可以持续多长时间,因此生成的promise可能会长时间挂起。许多用例都涉及到某种“超时”逻辑,以便在达到一定阈值后取消操作。

您可以使用与上一章中相同的方式使用取消,或者您可能想查看使用react/promise-timer,它通过简单的API帮助处理这个问题。

应用超时后的代码看起来可能如下所示

use React\Promise\Timer;

$q = new Queue(10, null, function ($uri) use ($browser) {
    return Timer\timeout($browser->get($uri), 2.0);
});

$promise = $q($uri);

生成的promise可以像通常一样消费,上述代码将确保此操作的执行时间不会超过指定的超时时间(即在实际上开始执行之后)。特别是要注意这与将超时应用于生成的promise的区别。以下代码将确保排队和执行此操作的总时间不会超过指定的超时时间

// usually not recommended
$promise = Timer\timeout($q($url), 2.0);

有关更多详细信息,请参阅react/promise-timer

all()

静态方法all(int $concurrency, array<TKey,TIn> $jobs, callable(TIn):PromiseInterface<TOut> $handler): PromiseInterface<array<TKey,TOut>>可用于通过给定的$handler并发处理所有给定的作业。

这是一个方便的方法,它使用内部Queue来安排所有作业,同时限制并发性以确保一次最多运行不超过$concurrency个作业。它将返回一个promise,该promise在成功时解析为所有作业的结果。

$browser = new React\Http\Browser();

$promise = Queue::all(3, $urls, function ($url) use ($browser) {
    return $browser->get($url);
});

$promise->then(function (array $responses) {
    echo 'All ' . count($responses) . ' successful!' . PHP_EOL;
}, function (Exception $e) {
    echo 'Error: ' . $e->getMessage() . PHP_EOL;
});

如果任何一个作业失败,它将拒绝生成的promise并尝试取消所有未完成的作业。同样,在生成的promise上调用cancel()也将尝试取消所有未完成的作业。有关详细信息,请参阅promises取消

$concurrency参数设置了一个新的软限制,用于处理的最大并发作业数。找到合适的并发限制取决于您的特定用例。通常会将并发限制在一个相对较小的值,因为同时做超过十几件事情可能会轻易压倒接收方。使用1值将确保所有作业一个接一个地处理,从而有效地创建一个“瀑布”式的作业流。使用小于1的值将导致没有处理任何作业的情况下拒绝InvalidArgumentException

// handle up to 10 jobs concurrently
$promise = Queue::all(10, $jobs, $handler);
// handle each job after another without concurrency (waterfall)
$promise = Queue::all(1, $jobs, $handler);

$jobs参数必须是一个包含所有要处理的作业的数组。该数组中的每个值都将传递给$handler以启动一个作业。数组键将在生成的数组中保留,而数组值将被$handler返回的作业结果替换。如果此数组为空,此方法将解析为空数组而不会处理任何作业。

$handler参数必须是一个有效的可调用函数,它接受您的作业参数,调用适当的操作,并返回一个Promise作为其未来结果的占位符。如果给定的参数不是一个有效的可调用函数,此方法将拒绝InvalidArgumentException而不会处理任何作业。

// using a Closure as handler is usually recommended
$promise = Queue::all(10, $jobs, function ($url) use ($browser) {
    return $browser->get($url);
});
// accepts any callable, so PHP's array notation is also supported
$promise = Queue::all(10, $jobs, array($browser, 'get'));

请注意,返回响应消息数组意味着整个响应体都必须保留在内存中。

any()

静态方法any(int $concurrency, array<TKey,TIn> $jobs, callable(TIn):Promise<TOut> $handler): PromiseInterface<TOut>可用于通过给定的$handler并发处理给定的作业,并解析为第一个解析值。

这是一个方便的方法,它使用内部Queue来安排所有作业,同时限制并发性以确保一次最多运行不超过$concurrency个作业。它将返回一个promise,该promise在成功时解析为第一个作业的结果,然后尝试取消所有未完成的作业。

$browser = new React\Http\Browser();

$promise = Queue::any(3, $urls, function ($url) use ($browser) {
    return $browser->get($url);
});

$promise->then(function (ResponseInterface $response) {
    echo 'First response: ' . $response->getBody() . PHP_EOL;
}, function (Exception $e) {
    echo 'Error: ' . $e->getMessage() . PHP_EOL;
});

如果所有任务都失败,它将拒绝生成的承诺。同样,在生成的承诺上调用 cancel() 将尝试取消所有未完成的任务。有关详细信息,请参阅 承诺取消

$concurrency参数设置了一个新的软限制,用于处理的最大并发作业数。找到合适的并发限制取决于您的特定用例。通常会将并发限制在一个相对较小的值,因为同时做超过十几件事情可能会轻易压倒接收方。使用1值将确保所有作业一个接一个地处理,从而有效地创建一个“瀑布”式的作业流。使用小于1的值将导致没有处理任何作业的情况下拒绝InvalidArgumentException

// handle up to 10 jobs concurrently
$promise = Queue::any(10, $jobs, $handler);
// handle each job after another without concurrency (waterfall)
$promise = Queue::any(1, $jobs, $handler);

$jobs 参数必须是一个包含所有待处理任务的数组。数组中的每个值都将传递给 $handler 以启动一个任务。数组键没有影响,承诺将简单地解析为 $handler 返回的第一个成功任务的作业结果。如果此数组为空,则此方法将拒绝而不处理任何作业。

$handler 参数必须是一个有效的可调用参数,它接受您的作业参数,调用适当的操作,并返回一个承诺作为其未来结果的占位符。如果给定参数不是有效的可调用参数,则此方法将拒绝处理任何作业并抛出 InvalidArgumentExceptionn

// using a Closure as handler is usually recommended
$promise = Queue::any(10, $jobs, function ($url) use ($browser) {
    return $browser->get($url);
});
// accepts any callable, so PHP's array notation is also supported
$promise = Queue::any(10, $jobs, array($browser, 'get'));

阻塞

如上所述,这个库默认提供了一个强大、异步的API。

您还可以使用 reactphp/async 将其集成到您的传统、阻塞环境中。这允许您像这样简单地等待异步HTTP请求

use function React\Async\await;

$browser = new React\Http\Browser();

$promise = Queue::all(3, $urls, function ($url) use ($browser) {
    return $browser->get($url);
});

try {
    $responses = await($promise);
    // responses successfully received
} catch (Exception $e) {
    // an error occurred while performing the requests
}

同样,您也可以将此包装在一个函数中,以提供简单的API并隐藏所有异步细节

use function React\Async\await; 

/**
 * Concurrently downloads all the given URIs
 *
 * @param string[] $uris       list of URIs to download
 * @return ResponseInterface[] map with a response object for each URI
 * @throws Exception if any of the URIs can not be downloaded
 */
function download(array $uris)
{
    $browser = new React\Http\Browser();

    $promise = Queue::all(3, $uris, function ($uri) use ($browser) {
        return $browser->get($uri);
    });

    return await($promise);
}

这得益于PHP 8.1+中可用的fibers以及我们的兼容性API,后者也适用于所有支持的PHP版本。请参阅 reactphp/async 获取更多详细信息。

请注意,返回响应消息数组意味着整个响应体都必须保留在内存中。

安装

推荐使用 Composer 安装此库。 您是Composer新手吗?

此项目遵循 SemVer。这将安装最新支持版本

composer require clue/mq-react:^1.6

有关版本升级的详细信息,请参阅 CHANGELOG

此项目旨在在任何平台上运行,因此不需要任何PHP扩展,并支持在PHP 5.3到当前PHP 8+上运行。强烈建议您使用此项目的最新支持版本。

测试

要运行测试套件,您首先需要克隆此存储库,然后通过 Composer 安装所有依赖项

composer install

要运行测试套件,请转到项目根目录并运行

vendor/bin/phpunit

测试套件已配置为始终确保所有支持环境中的100%代码覆盖率。如果您已安装Xdebug扩展,您还可以本地生成代码覆盖率报告,如下所示

XDEBUG_MODE=coverage vendor/bin/phpunit --coverage-text

许可证

此项目在 MIT许可证 下发布。

我想感谢德国户外装备和服装在线零售商 Bergfreunde GmbH 对第一个版本的支持!🎉感谢像这样的赞助商,他们理解开源开发的重要性,我才能证明在开源开发上花费时间和精力是有意义的,而不是传统的付费工作。

你知道吗?我提供定制开发服务,为版本发布和贡献提供赞助发票。如需了解详情,请联系我(@clue)。