chickentom888 / rsmq
Redis 简单消息队列。
Requires
- php: ^7.4|^8.0
- ext-mbstring: *
- predis/predis: ^1.1
Requires (Dev)
- mockery/mockery: ^1.3
- phpstan/phpstan: ^0.11.12 || ^0.12.0 || ^1.0.0
- phpunit/phpunit: ^8.3 || ^9.0
- roave/security-advisories: dev-master
- squizlabs/php_codesniffer: ^3.5
- vimeo/psalm: ^3.12 || ^4.0
This package is auto-updated.
Last update: 2024-09-20 06:01:33 UTC
README
这是一个用于PHP的轻量级消息队列,不需要专门的队列服务器。只需要一个Redis服务器。有关更多信息,请参阅 smrchy/rsmq。
这是基于 eislambey/php-rsmq 的分支,以下是变更内容
- 使用 predis 而不是Redis扩展
- 为 QueueAttributes 和 Message 提供了一些OO包装器
- 提供了一个简单的 QueueWorker
目录
安装
composer require chickentom888/php-rsmq
方法
构造函数
创建一个新的RSMQ实例。
参数
$predis(\Predis\ClientInterface): *必需 The Predis实例$ns(string): 可选 (默认: "rsmq") RSMQ创建的所有键的命名空间前缀$realtime(Boolean): 可选 (默认: false) 启用实时PUBLISH新消息
示例
<?php use Predis\Client; use ChickenTom888\RSMQ\RSMQClient; $predis = new Client( [ 'host' => '127.0.0.1', 'port' => 6379 ] ); $prefix = 'rsmq'; $this->rsmq = new RSMQClient($predis, $prefix);
队列
创建队列
创建一个新队列。
参数
$name(string): 队列名称。最大160个字符;允许字母数字字符、连字符 (-) 和下划线 (_)。$vt(int): 可选 (默认: 30) 从队列接收的消息在请求接收消息的其他接收组件中将不可见的秒数。允许的值:0-9999999(约115天)$delay(int): 可选 (默认: 0) 队列中所有新消息投递的延迟时间(以秒为单位)。允许的值:0-9999999(约115天)$maxsize(int): 可选 (默认: 65536) 消息的最大大小(以字节为单位)。允许的值:1024-65536和-1(无限大小)
返回
true(Bool)
抛出
\ChickenTom888\RSMQ\Exceptions\QueueAlreadyExistsException
示例
<?php /** * @var ChickenTom888\RSMQ\RSMQClientInterface $rsmq */ $rsmq->createQueue('myqueue');
列出队列
列出所有队列
返回一个数组
["qname1", "qname2"]
示例
<?php /** * @var ChickenTom888\RSMQ\RSMQClientInterface $rsmq */ $queues = $rsmq->listQueues();
删除队列
删除队列及其所有消息。
参数
$name(string): 队列名称。
返回
true(Bool)
抛出
\ChickenTom888\RSMQ\Exceptions\QueueNotFoundException
示例
<?php /** * @var ChickenTom888\RSMQ\RSMQClientInterface $rsmq */ $rsmq->deleteQueue('myqueue');
获取队列属性
获取队列属性、计数器和统计信息
参数
$queue(string): 队列名称。
返回一个具有以下属性的 \ChickenTom888\RSMQ\QueueAttributes 对象
vt(int): 队列的可见性超时时间(以秒为单位)delay(int): 新消息的延迟时间(以秒为单位)maxSize(int): 消息的最大大小(以字节为单位)totalReceived(int): 从队列接收到的消息总数totalSent(int): 发送到队列的消息总数created(float): 队列创建的时间戳(以秒为单位的纪元)modified(float): 上次使用setQueueAttributes修改队列的时间戳(以秒为单位的纪元)messageCount(int): 队列中的当前消息数hiddenMessageCount(int): 当前隐藏/不可见消息的数量。消息可以在由于 "vt" 参数或使用 "delay" 发送时处于 "in flight" 状态而隐藏。
示例
<?php /** * @var ChickenTom888\RSMQ\RSMQClientInterface $rsmq */ $attributes = $rsmq->getQueueAttributes('myqueue'); echo "visibility timeout: ", $attributes->getVt(), "\n"; echo "delay for new messages: ", $attributes->getDelay(), "\n"; echo "max size in bytes: ", $attributes->getMaxSize(), "\n"; echo "total received messages: ", $attributes->getTotalReceived(), "\n"; echo "total sent messages: ", $attributes->getTotalSent(), "\n"; echo "created: ", $attributes->getCreated(), "\n"; echo "last modified: ", $attributes->getModified(), "\n"; echo "current n of messages: ", $attributes->getMessageCount(), "\n"; echo "hidden messages: ", $attributes->getHiddenMessageCount(), "\n";
设置队列属性
设置队列参数。
参数
$queue(string): 队列名称。$vt(int): 可选 * 从队列接收到的消息在请求接收消息的其他组件看来将不可见的时间长度,单位为秒。允许的值:0-9999999(约115天)$delay(int): 可选 队列中所有新消息投递的延迟时间,单位为秒。允许的值:0-9999999(约115天)$maxsize(int): 可选 消息的最大大小,单位为字节。允许的值:1024-65536和-1(表示无限制大小)
注意:至少必须提供以下属性之一(vt, delay, maxsize)。只有提供的属性才会被修改。
返回一个具有以下属性的 \ChickenTom888\RSMQ\QueueAttributes 对象
vt(int): 队列的可见性超时时间(以秒为单位)delay(int): 新消息的延迟时间(以秒为单位)maxSize(int): 消息的最大大小(以字节为单位)totalReceived(int): 从队列接收到的消息总数totalSent(int): 发送到队列的消息总数created(float): 队列创建的时间戳(以秒为单位的纪元)modified(float): 上次使用setQueueAttributes修改队列的时间戳(以秒为单位的纪元)messageCount(int): 队列中的当前消息数hiddenMessageCount(int): 当前隐藏/不可见消息的数量。消息可以在由于 "vt" 参数或使用 "delay" 发送时处于 "in flight" 状态而隐藏。
抛出
\ChickenTom888\RSMQ\QueueAttributes\ChickenTom888\RSMQ\QueueParametersValidationException\ChickenTom888\RSMQ\QueueNotFoundException
示例
<?php /** * @var ChickenTom888\RSMQ\RSMQClientInterface $rsmq */ $queue = 'myqueue'; $vt = 50; $delay = 10; $maxsize = 2048; $rsmq->setQueueAttributes($queue, $vt, $delay, $maxsize);
消息
发送消息
发送一条新消息。
参数
$queue(string)$message(string)$delay(int): 可选 (默认:队列设置) 消息投递的延迟时间,单位为秒。允许的值:0-9999999(约115天)
返回
$id(string): 内部消息ID。
抛出
\ChickenTom888\RSMQ\Exceptions\MessageToLongException\ChickenTom888\RSMQ\Exceptions\QueueNotFoundException\ChickenTom888\RSMQ\Exceptions\QueueParametersValidationException
示例
<?php /** * @var ChickenTom888\RSMQ\RSMQClientInterface $rsmq */ $id = $rsmq->sendMessage('myqueue', 'a message'); echo "Message Sent. ID: ", $id;
接收消息
从队列接收下一条消息。
参数
$queue(string): 队列名称。$vt(int): 可选 (默认:队列设置) 接收到的消息对其他人不可见的时间长度,单位为秒。允许的值:0-9999999(约115天)
返回一个具有以下属性的 \ChickenTom888\RSMQ\Message 对象
message(string): 消息内容。id(string): 内部消息ID。sent(int): 消息发送/创建的时间戳。firstReceived(int): 消息首次接收的时间戳。receiveCount(int): 消息被接收的次数。
注意:如果没有消息,将返回一个空数组
抛出
\ChickenTom888\RSMQ\Exceptions\QueueNotFoundException\ChickenTom888\RSMQ\Exceptions\QueueParametersValidationException
示例
<?php /** * @var ChickenTom888\RSMQ\RSMQClientInterface $rsmq */ $message = $rsmq->receiveMessage('myqueue'); echo "Message ID: ", $message->getId(); echo "Message: ", $message->getMessage();
删除消息
参数
$queue(string): 队列名称。$id(string): 要删除的消息ID。
返回
true表示成功,false表示消息未找到(bool)。
抛出
\ChickenTom888\RSMQ\Exceptions\QueueParametersValidationException
示例
<?php /** * @var ChickenTom888\RSMQ\RSMQClientInterface $rsmq */ $id = $rsmq->sendMessage('queue', 'a message'); $rsmq->deleteMessage('queue', $id);
弹出消息
从队列接收下一条消息 并删除它。
重要: 此方法会立即删除接收到的消息。如果在处理消息时出现错误,将无法再次接收该消息。
参数
$queue(string): 队列名称。
返回一个具有以下属性的 \ChickenTom888\RSMQ\Message 对象
message(string): 消息内容。id(string): 内部消息ID。sent(int): 消息发送/创建的时间戳。firstReceived(int): 消息首次接收的时间戳。receiveCount(int): 消息被接收的次数。
注意:如果没有消息,将返回一个空对象
抛出
\ChickenTom888\RSMQ\Exceptions\QueueNotFoundException\ChickenTom888\RSMQ\Exceptions\QueueParametersValidationException
示例
<?php /** * @var ChickenTom888\RSMQ\RSMQClientInterface $rsmq */ $message = $rsmq->popMessage('myqueue'); echo "Message ID: ", $message->getId(); echo "Message: ", $message->getMessage();
更改消息可见性
更改单个消息的可见性计时器。消息再次可见的时间是从当前时间(now)加上 vt 计算得出的。
参数
qname(string): 队列名称。id(string): 消息ID。vt(int): 此消息不可见的时间长度,单位为秒。允许的值:0-9999999(约115天)
返回
true表示成功,false表示消息未找到(bool)。
抛出
\ChickenTom888\RSMQ\Exceptions\QueueParametersValidationException\ChickenTom888\RSMQ\Exceptions\QueueNotFoundException
示例
<?php /** * @var ChickenTom888\RSMQ\RSMQClientInterface $rsmq */ $queue = 'myqueue'; $id = $rsmq->sendMessage($queue, 'a message'); if($rsmq->changeMessageVisibility($queue, $id, 60)) { echo "Message hidden for 60 secs"; }
实时
在创建 ChickenTom888\RSMQ\RSMQClient 实例时,您可以通过将 \ChickenTom888\RSMQ\RSMQClient::__construct 的 $realtime 参数传递为 true 来启用新消息的实时 PUBLISH。每当通过 sendMessage 发送新消息时,都会向 {rsmq.ns}:rt:{qname} 发出 Redis PUBLISH 命令。
RSMQ 默认设置的示例
- 队列
testQueue已包含5条消息。 - 正在向队列
testQueue发送一条新消息。 - 将执行以下 Redis 命令:
PUBLISH rsmq:rt:testQueue 6
实时选项允许在向RSMQ发送新消息时发送一个PUBLISH,但是没有在此基础上构建更多功能。您的应用程序可以使用Redis的SUBSCRIBE命令来接收新消息的通知,然后尝试从队列中进行轮询。然而,由于Redis的pub/sub系统的工作方式,所有监听器都会收到新消息的通知,这种方法不适合在拥有多个订阅进程的环境中驱动消息处理。
队列工作者
QueueWorker类提供了一种简单的方式来消费RSMQ消息,要使用它
<?php /** * @var ChickenTom888\RSMQ\RSMQClientInterface $rsmq */ use ChickenTom888\RSMQ\ExecutorInterface; use ChickenTom888\RSMQ\Message; use ChickenTom888\RSMQ\QueueWorker; use ChickenTom888\RSMQ\WorkerSleepProvider; $executor = new class() implements ExecutorInterface{ public function __invoke(Message $message) : bool { //@todo: do some work, true will ack/delete the message, false will allow the queue's config to "re-publish" return true; } }; $sleepProvider = new class() implements WorkerSleepProvider{ public function getSleep() : ?int { /** * This allows you to return null to stop the worker, which can be used with something like redis to mark. * * Note that this method is called _before_ we poll for a message, and therefore if it returns null we'll eject * before we process a message. */ return 1; } }; $worker = new QueueWorker($rsmq, $executor, $sleepProvider, 'test_queue'); $worker->work(); // here we can optionally pass true to only process one message
授权协议
MIT许可证。请参阅LICENSE