Redis 简易消息队列。

2.0.2 2022-04-27 04:35 UTC

README

Build Status codecov License GitHub issues Latest Stable Version Latest Unstable Version composer.lock Total Downloads GitHub stars Dependents

这是一个轻量级的 PHP 消息队列,不需要专用的队列服务器。只需要 Redis 服务器。有关更多信息,请参阅 smrchy/rsmq

这是对 eislambey/php-rsmq 的修改版,以下为修改内容:

  • 使用 predis 而不是 Redis 扩展
  • 为 QueueAttributes 和 Message 提供了一些面向对象的包装器
  • 提供了一个简单的 QueueWorker

目录

安装

composer require andrewbreksa/rsmq

方法

构造

创建 RSMQ 的新实例。

参数

  • $predis (\Predis\ClientInterface): *必需 The Predis 实例
  • $ns (string): 可选 (默认: "rsmq") RSMQ 创建的所有键使用的命名空间前缀
  • $realtime (Boolean): 可选 (默认: false) 启用新消息的实时 PUBLISH

示例

<?php
use Predis\Client;
use AndrewBreksa\RSMQ\RSMQClient;

$predis = new Client(
    [
        'host' => '127.0.0.1',
        'port' => 6379
    ]
);
$this->rsmq = new RSMQClient($predis);

队列

createQueue

创建一个新的队列。

参数

  • $name (string): 队列名称。最大 160 个字符;允许字母数字字符、连字符 (-) 和下划线 (_)。
  • $vt (int): 可选 (默认: 30) 从队列接收到的消息在请求接收消息的其他接收组件中不可见的时间长度(以秒为单位)。允许的值:0-9999999(约 115 天)
  • $delay (int): 可选 (默认: 0) 队列中所有新消息投递的延迟时间(以秒为单位)。允许的值:0-9999999(约 115 天)
  • $maxsize (int): 可选 (默认: 65536) 消息的最大大小(以字节为单位)。允许的值:1024-65536 和 -1(表示无限大小)

返回

  • true (Bool)

抛出

  • \AndrewBreksa\RSMQ\Exceptions\QueueAlreadyExistsException

示例

<?php
/**
 * @var AndrewBreksa\RSMQ\RSMQClientInterface $rsmq
 */

$rsmq->createQueue('myqueue');

listQueues

列出所有队列

返回一个数组

  • ["qname1", "qname2"]

示例

<?php
/**
 * @var AndrewBreksa\RSMQ\RSMQClientInterface $rsmq
 */

$queues = $rsmq->listQueues();

deleteQueue

删除队列及其所有消息。

参数

  • $name (string): 队列名称。

返回

  • true (Bool)

抛出

  • \AndrewBreksa\RSMQ\Exceptions\QueueNotFoundException

示例

<?php
/**
 * @var AndrewBreksa\RSMQ\RSMQClientInterface $rsmq
 */

$rsmq->deleteQueue('myqueue');

getQueueAttributes

获取队列属性、计数器和统计信息

参数

  • $queue (string): 队列名称。

返回一个 \AndrewBreksa\RSMQ\QueueAttributes 对象,具有以下属性

  • vt (int): 队列的可见性超时(以秒为单位)
  • delay (int): 新消息的延迟(以秒为单位)
  • maxSize (int): 消息的最大大小(以字节为单位)
  • totalReceived (int): 从队列接收到的总消息数
  • totalSent (int): 发送到队列的总消息数
  • created (float): 队列创建的戳记(以秒为单位的纪元时间)
  • modified (float): 队列最后通过 setQueueAttributes 修改的戳记(以秒为单位的纪元时间)
  • messageCount (int): 队列中的当前消息数
  • hiddenMessageCount (int): 当前隐藏/不可见消息的数量。消息可能在“在途中”时由于vt参数或带有delay延迟发送而隐藏。

示例

<?php
/**
 * @var AndrewBreksa\RSMQ\RSMQClientInterface $rsmq
 */

$attributes =  $rsmq->getQueueAttributes('myqueue');
echo "visibility timeout: ", $attributes->getVt(), "\n";
echo "delay for new messages: ", $attributes->getDelay(), "\n";
echo "max size in bytes: ", $attributes->getMaxSize(), "\n";
echo "total received messages: ", $attributes->getTotalReceived(), "\n";
echo "total sent messages: ", $attributes->getTotalSent(), "\n";
echo "created: ", $attributes->getCreated(), "\n";
echo "last modified: ", $attributes->getModified(), "\n";
echo "current n of messages: ", $attributes->getMessageCount(), "\n";
echo "hidden messages: ", $attributes->getHiddenMessageCount(), "\n";

setQueueAttributes

设置队列参数。

参数

  • $queue (string): 队列名称。
  • $vt (int): 可选 * 从队列接收到的消息将不可见的时间长度,单位为秒,当其他接收组件请求接收消息时。允许的值:0-9999999(约115天)
  • $delay (int): 可选 队列中所有新消息的投递将延迟的时间,单位为秒。允许的值:0-9999999(约115天)
  • $maxsize (int): 可选 消息的最大大小,单位为字节。允许的值:1024-65536和-1(表示无限制大小)

注意:至少必须提供一个属性(vt、delay、maxsize)。只有提供的属性才会被修改。

返回一个 \AndrewBreksa\RSMQ\QueueAttributes 对象,具有以下属性

  • vt (int): 队列的可见性超时(以秒为单位)
  • delay (int): 新消息的延迟(以秒为单位)
  • maxSize (int): 消息的最大大小(以字节为单位)
  • totalReceived (int): 从队列接收到的总消息数
  • totalSent (int): 发送到队列的总消息数
  • created (float): 队列创建的戳记(以秒为单位的纪元时间)
  • modified (float): 队列最后通过 setQueueAttributes 修改的戳记(以秒为单位的纪元时间)
  • messageCount (int): 队列中的当前消息数
  • hiddenMessageCount (int): 当前隐藏/不可见消息的数量。消息可能在“在途中”时由于vt参数或带有delay延迟发送而隐藏。

抛出

  • \AndrewBreksa\RSMQ\QueueAttributes
  • \AndrewBreksa\RSMQ\QueueParametersValidationException
  • \AndrewBreksa\RSMQ\QueueNotFoundException

示例

<?php
/**
 * @var AndrewBreksa\RSMQ\RSMQClientInterface $rsmq
 */

$queue = 'myqueue';
$vt = 50;
$delay = 10;
$maxsize = 2048;
$rsmq->setQueueAttributes($queue, $vt, $delay, $maxsize);

消息

sendMessage

发送新的消息。

参数

  • $queue (string)
  • $message (string)
  • $delay (int): 可选 (默认:队列设置) 消息投递将延迟的时间,单位为秒。允许的值:0-9999999(约115天)

返回

  • $id (string): 内部消息ID。

抛出

  • \AndrewBreksa\RSMQ\Exceptions\MessageToLongException
  • \AndrewBreksa\RSMQ\Exceptions\QueueNotFoundException
  • \AndrewBreksa\RSMQ\Exceptions\QueueParametersValidationException

示例

<?php
/**
 * @var AndrewBreksa\RSMQ\RSMQClientInterface $rsmq
 */

$id = $rsmq->sendMessage('myqueue', 'a message');
echo "Message Sent. ID: ", $id;

receiveMessage

从队列接收下一个消息。

参数

  • $queue (string): 队列名称。
  • $vt (int): 可选 (默认:队列设置) 接收到的消息将对其他人不可见的时间长度,单位为秒。允许的值:0-9999999(约115天)

返回一个具有以下属性的\AndrewBreksa\RSMQ\Message对象

  • message (string): 消息内容。
  • id (string): 内部消息ID。
  • sent (int): 此消息发送/创建的时间戳。
  • firstReceived (int): 此消息首次接收的时间戳。
  • receiveCount (int): 此消息被接收的次数。

注意:如果没有消息,将返回一个空数组

抛出

  • \AndrewBreksa\RSMQ\Exceptions\QueueNotFoundException
  • \AndrewBreksa\RSMQ\Exceptions\QueueParametersValidationException

示例

<?php
/**
 * @var AndrewBreksa\RSMQ\RSMQClientInterface $rsmq
 */

$message = $rsmq->receiveMessage('myqueue');
echo "Message ID: ", $message->getId();
echo "Message: ", $message->getMessage();

deleteMessage

参数

  • $queue (string): 队列名称。
  • $id (string): 要删除的消息ID。

返回

  • 如果成功,返回true,如果消息未找到,返回false(bool)。

抛出

  • \AndrewBreksa\RSMQ\Exceptions\QueueParametersValidationException

示例

<?php
/**
 * @var AndrewBreksa\RSMQ\RSMQClientInterface $rsmq
 */

$id = $rsmq->sendMessage('queue', 'a message');
$rsmq->deleteMessage('queue', $id);

popMessage

从队列接收下一个消息 并删除它

重要:此方法会立即删除接收到的消息。如果在处理消息时出现问题,将无法再次接收该消息。

参数

  • $queue (string): 队列名称。

返回一个具有以下属性的\AndrewBreksa\RSMQ\Message对象

  • message (string): 消息内容。
  • id (string): 内部消息ID。
  • sent (int): 此消息发送/创建的时间戳。
  • firstReceived (int): 此消息首次接收的时间戳。
  • receiveCount (int): 此消息被接收的次数。

注意:如果没有消息,将返回一个空对象

抛出

  • \AndrewBreksa\RSMQ\Exceptions\QueueNotFoundException
  • \AndrewBreksa\RSMQ\Exceptions\QueueParametersValidationException

示例

<?php
/**
 * @var AndrewBreksa\RSMQ\RSMQClientInterface $rsmq
 */

$message = $rsmq->popMessage('myqueue');
echo "Message ID: ", $message->getId();
echo "Message: ", $message->getMessage();

changeMessageVisibility

更改单个消息的可见性计时器。消息将再次可见的时间是从当前时间(现在)加上vt计算得出的。

参数

  • qname (string): 队列名称。
  • id (string): 消息ID。
  • vt (int): 此消息不可见的时间长度,单位为秒。允许的值:0-9999999(约115天)

返回

  • 如果成功,返回true,如果消息未找到,返回false(bool)。

抛出

  • \AndrewBreksa\RSMQ\Exceptions\QueueParametersValidationException
  • \AndrewBreksa\RSMQ\Exceptions\QueueNotFoundException

示例

<?php
/**
 * @var AndrewBreksa\RSMQ\RSMQClientInterface $rsmq
 */

$queue = 'myqueue';
$id = $rsmq->sendMessage($queue, 'a message');
if($rsmq->changeMessageVisibility($queue, $id, 60)) {
	echo "Message hidden for 60 secs";
}

实时

在创建AndrewBreksa\RSMQ\RSMQClient实例时,您可以通过传递\AndrewBreksa\RSMQ\RSMQClient::__construct$realtime参数为true来启用新消息的实时PUBLISH。每当通过sendMessage发送新消息时,都会向{rsmq.ns}:rt:{qname}发出Redis PUBLISH

带有默认设置的RSMQ示例

  • 队列testQueue已经包含5条消息。
  • 正在向队列testQueue发送新消息。
  • 将发出以下Redis命令:PUBLISH rsmq:rt:testQueue 6

实时选项允许在将新消息发送到RSMQ时发送一个PUBLISH,但是没有在此基础上构建更多功能。您的应用程序可以使用Redis的SUBSCRIBE命令来接收新消息的通知,然后尝试从队列中轮询,但是由于Redis pub/sub系统的运作方式,所有监听器都将收到新消息的通知,这种方法不适合在拥有多个订阅进程的环境中处理消息。

QueueWorker

QueueWorker类提供了一个方便的方式来消费RSMQ消息,要使用它

<?php
/**
 * @var AndrewBreksa\RSMQ\RSMQClientInterface $rsmq
 */

use AndrewBreksa\RSMQ\ExecutorInterface;
use AndrewBreksa\RSMQ\Message;
use AndrewBreksa\RSMQ\QueueWorker;
use AndrewBreksa\RSMQ\WorkerSleepProvider;

$executor = new class() implements ExecutorInterface{
    public function __invoke(Message $message) : bool {
        //@todo: do some work, true will ack/delete the message, false will allow the queue's config to "re-publish"
        return true;
    }
};

$sleepProvider = new class() implements WorkerSleepProvider{
    public function getSleep() : ?int {
        /**
         * This allows you to return null to stop the worker, which can be used with something like redis to mark.
         *
         * Note that this method is called _before_ we poll for a message, and therefore if it returns null we'll eject
         * before we process a message.
         */
        return 1;
    }
};

$worker = new QueueWorker($rsmq, $executor, $sleepProvider, 'test_queue');
$worker->work(); // here we can optionally pass true to only process one message

许可证

MIT许可。请参阅LICENSE