graze/queue

:postbox: PHP 中处理队列的灵活抽象。

v0.7.0 2018-05-25 12:50 UTC

README

PHP ~5.5 Latest Version MIT Licensed Build Status Coverage Status Quality Score Total Downloads

此库提供了一种灵活的抽象层,用于处理队列。

您可以按照您喜欢的任何方式安装它,但我们推荐使用 Composer

~$ composer require graze/queue

文档

队列操作围绕 Message 对象的列表。无论是发送一个还是多个 Message,它始终是一个数组。工作者一次只处理一个 Message 对象,无论是从队列接收一个还是多个。

use Aws\Sqs\SqsClient;
use Graze\Queue\Adapter\SqsAdapter;
use Graze\Queue\Client;
use Graze\Queue\Message\MessageInterface;

$client = new Client(new SqsAdapter(new SqsClient([
    'region'  => 'us-east-1',
    'version' => '2012-11-05',
    'credentials' => [
        'key'    => 'ive_got_the_key',
        'secret' => 'ive_got_the_secret'
    ],
]), 'queue_name'));

// Producer
$client->send([
    $client->create('foo'),
]);

// Consumer
$client->receive(function (MessageInterface $msg) {
    var_dump($msg->getBody());
    var_dump($msg->getMetadata()->getAll());
});

适配器

适配器对象用于满足对队列提供者的低级请求。

目前支持的队列提供者包括

处理器

处理器对象用于执行带有接收消息列表的工作者调用,并处理确认。

当前处理器包括

use Graze\Queue\Client;
use Graze\Queue\Adapter\ArrayAdapter;
use Graze\Queue\Handler\BatchAcknowledgementHandler;
use Graze\Queue\Message\MessageInterface;

// Create client with the Batch Acknowledgement Handler.
$client = new Client(new ArrayAdapter(), [
    'handler' => new BatchAcknowledgementHandler(),
]);

// Receive a maximum of 10 messages.
$client->receive(function (MessageInterface $message) {
    // Do some work.
}, 10);

轮询

通过将 null 作为限制参数传递给 receive 方法,支持轮询队列。传递给您的工人的第二个参数是一个 Closure,您应该使用它来在完成工作后停止轮询。确保您也使用一个能够有效确认工作的处理器!

请注意,单个适配器对象可能会随时停止轮询。可能停止的情况之一是,如果队列长度有限,并且已接收所有可能的消息。

use Graze\Queue\Client;
use Graze\Queue\Adapter\ArrayAdapter;
use Graze\Queue\Handler\BatchAcknowledgementHandler;
use Graze\Queue\Message\MessageInterface;

// Create client with the Batch Acknowledgement Handler.
$client = new Client(new ArrayAdapter(), [
    'handler' => new BatchAcknowledgeHandler(100), // Acknowledge after 100 messages.
]);

// Poll until `$done()` is called.
$client->receive(function (MessageInterface $message, Closure $done) {
    // Do some work.

    // You should always define a break condition (i.e. timeout, expired session, etc).
    if ($breakCondition) $done();
}, null);

许可

此库的内容由 Nature Delivered Ltd. 根据 MIT 许可证 发布。

您可以在 LICENSEhttps://open-source.org.cn/licenses/mit》中找到此许可证的副本。