andrewbreksa / rsmq
Redis 简易消息队列。
Requires
- php: ^7.4|^8.0
- ext-mbstring: *
- predis/predis: ^1.1
Requires (Dev)
- mockery/mockery: ^1.3
- phpstan/phpstan: ^0.11.12 || ^0.12.0 || ^1.0.0
- phpunit/phpunit: ^8.3 || ^9.0
- roave/security-advisories: dev-master
- squizlabs/php_codesniffer: ^3.5
- vimeo/psalm: ^3.12 || ^4.0
- dev-master
- 2.0.2
- 2.0.1
- 2.0.0
- 1.2.0
- 1.0.2
- 1.0.1
- 1.0.0
- 0.2.1
- 0.2.0
- v0.1.0
- dev-dependabot/composer/phpunit/phpunit-9.6.7
- dev-dependabot/composer/vimeo/psalm-5.9.0
- dev-dependabot/composer/predis/predis-2.1.2
- dev-dependabot/composer/squizlabs/php_codesniffer-3.7.2
- dev-dependabot/composer/mockery/mockery-1.5.1
- dev-cross-client-tests
This package is auto-updated.
Last update: 2024-09-15 02:14:43 UTC
README
这是一个轻量级的 PHP 消息队列,不需要专用的队列服务器。只需要 Redis 服务器。有关更多信息,请参阅 smrchy/rsmq。
这是对 eislambey/php-rsmq 的修改版,以下为修改内容:
- 使用 predis 而不是 Redis 扩展
- 为 QueueAttributes 和 Message 提供了一些面向对象的包装器
- 提供了一个简单的 QueueWorker
目录
安装
composer require andrewbreksa/rsmq
方法
构造
创建 RSMQ 的新实例。
参数
$predis
(\Predis\ClientInterface): *必需 The Predis 实例$ns
(string): 可选 (默认: "rsmq") RSMQ 创建的所有键使用的命名空间前缀$realtime
(Boolean): 可选 (默认: false) 启用新消息的实时 PUBLISH
示例
<?php use Predis\Client; use AndrewBreksa\RSMQ\RSMQClient; $predis = new Client( [ 'host' => '127.0.0.1', 'port' => 6379 ] ); $this->rsmq = new RSMQClient($predis);
队列
createQueue
创建一个新的队列。
参数
$name
(string): 队列名称。最大 160 个字符;允许字母数字字符、连字符 (-) 和下划线 (_)。$vt
(int): 可选 (默认: 30) 从队列接收到的消息在请求接收消息的其他接收组件中不可见的时间长度(以秒为单位)。允许的值:0-9999999(约 115 天)$delay
(int): 可选 (默认: 0) 队列中所有新消息投递的延迟时间(以秒为单位)。允许的值:0-9999999(约 115 天)$maxsize
(int): 可选 (默认: 65536) 消息的最大大小(以字节为单位)。允许的值:1024-65536 和 -1(表示无限大小)
返回
true
(Bool)
抛出
\AndrewBreksa\RSMQ\Exceptions\QueueAlreadyExistsException
示例
<?php /** * @var AndrewBreksa\RSMQ\RSMQClientInterface $rsmq */ $rsmq->createQueue('myqueue');
listQueues
列出所有队列
返回一个数组
["qname1", "qname2"]
示例
<?php /** * @var AndrewBreksa\RSMQ\RSMQClientInterface $rsmq */ $queues = $rsmq->listQueues();
deleteQueue
删除队列及其所有消息。
参数
$name
(string): 队列名称。
返回
true
(Bool)
抛出
\AndrewBreksa\RSMQ\Exceptions\QueueNotFoundException
示例
<?php /** * @var AndrewBreksa\RSMQ\RSMQClientInterface $rsmq */ $rsmq->deleteQueue('myqueue');
getQueueAttributes
获取队列属性、计数器和统计信息
参数
$queue
(string): 队列名称。
返回一个 \AndrewBreksa\RSMQ\QueueAttributes
对象,具有以下属性
vt
(int): 队列的可见性超时(以秒为单位)delay
(int): 新消息的延迟(以秒为单位)maxSize
(int): 消息的最大大小(以字节为单位)totalReceived
(int): 从队列接收到的总消息数totalSent
(int): 发送到队列的总消息数created
(float): 队列创建的戳记(以秒为单位的纪元时间)modified
(float): 队列最后通过setQueueAttributes
修改的戳记(以秒为单位的纪元时间)messageCount
(int): 队列中的当前消息数hiddenMessageCount
(int): 当前隐藏/不可见消息的数量。消息可能在“在途中”时由于vt
参数或带有delay
延迟发送而隐藏。
示例
<?php /** * @var AndrewBreksa\RSMQ\RSMQClientInterface $rsmq */ $attributes = $rsmq->getQueueAttributes('myqueue'); echo "visibility timeout: ", $attributes->getVt(), "\n"; echo "delay for new messages: ", $attributes->getDelay(), "\n"; echo "max size in bytes: ", $attributes->getMaxSize(), "\n"; echo "total received messages: ", $attributes->getTotalReceived(), "\n"; echo "total sent messages: ", $attributes->getTotalSent(), "\n"; echo "created: ", $attributes->getCreated(), "\n"; echo "last modified: ", $attributes->getModified(), "\n"; echo "current n of messages: ", $attributes->getMessageCount(), "\n"; echo "hidden messages: ", $attributes->getHiddenMessageCount(), "\n";
setQueueAttributes
设置队列参数。
参数
$queue
(string): 队列名称。$vt
(int): 可选 * 从队列接收到的消息将不可见的时间长度,单位为秒,当其他接收组件请求接收消息时。允许的值:0-9999999(约115天)$delay
(int): 可选 队列中所有新消息的投递将延迟的时间,单位为秒。允许的值:0-9999999(约115天)$maxsize
(int): 可选 消息的最大大小,单位为字节。允许的值:1024-65536和-1(表示无限制大小)
注意:至少必须提供一个属性(vt、delay、maxsize)。只有提供的属性才会被修改。
返回一个 \AndrewBreksa\RSMQ\QueueAttributes
对象,具有以下属性
vt
(int): 队列的可见性超时(以秒为单位)delay
(int): 新消息的延迟(以秒为单位)maxSize
(int): 消息的最大大小(以字节为单位)totalReceived
(int): 从队列接收到的总消息数totalSent
(int): 发送到队列的总消息数created
(float): 队列创建的戳记(以秒为单位的纪元时间)modified
(float): 队列最后通过setQueueAttributes
修改的戳记(以秒为单位的纪元时间)messageCount
(int): 队列中的当前消息数hiddenMessageCount
(int): 当前隐藏/不可见消息的数量。消息可能在“在途中”时由于vt
参数或带有delay
延迟发送而隐藏。
抛出
\AndrewBreksa\RSMQ\QueueAttributes
\AndrewBreksa\RSMQ\QueueParametersValidationException
\AndrewBreksa\RSMQ\QueueNotFoundException
示例
<?php /** * @var AndrewBreksa\RSMQ\RSMQClientInterface $rsmq */ $queue = 'myqueue'; $vt = 50; $delay = 10; $maxsize = 2048; $rsmq->setQueueAttributes($queue, $vt, $delay, $maxsize);
消息
sendMessage
发送新的消息。
参数
$queue
(string)$message
(string)$delay
(int): 可选 (默认:队列设置) 消息投递将延迟的时间,单位为秒。允许的值:0-9999999(约115天)
返回
$id
(string): 内部消息ID。
抛出
\AndrewBreksa\RSMQ\Exceptions\MessageToLongException
\AndrewBreksa\RSMQ\Exceptions\QueueNotFoundException
\AndrewBreksa\RSMQ\Exceptions\QueueParametersValidationException
示例
<?php /** * @var AndrewBreksa\RSMQ\RSMQClientInterface $rsmq */ $id = $rsmq->sendMessage('myqueue', 'a message'); echo "Message Sent. ID: ", $id;
receiveMessage
从队列接收下一个消息。
参数
$queue
(string): 队列名称。$vt
(int): 可选 (默认:队列设置) 接收到的消息将对其他人不可见的时间长度,单位为秒。允许的值:0-9999999(约115天)
返回一个具有以下属性的\AndrewBreksa\RSMQ\Message
对象
message
(string): 消息内容。id
(string): 内部消息ID。sent
(int): 此消息发送/创建的时间戳。firstReceived
(int): 此消息首次接收的时间戳。receiveCount
(int): 此消息被接收的次数。
注意:如果没有消息,将返回一个空数组
抛出
\AndrewBreksa\RSMQ\Exceptions\QueueNotFoundException
\AndrewBreksa\RSMQ\Exceptions\QueueParametersValidationException
示例
<?php /** * @var AndrewBreksa\RSMQ\RSMQClientInterface $rsmq */ $message = $rsmq->receiveMessage('myqueue'); echo "Message ID: ", $message->getId(); echo "Message: ", $message->getMessage();
deleteMessage
参数
$queue
(string): 队列名称。$id
(string): 要删除的消息ID。
返回
- 如果成功,返回
true
,如果消息未找到,返回false
(bool)。
抛出
\AndrewBreksa\RSMQ\Exceptions\QueueParametersValidationException
示例
<?php /** * @var AndrewBreksa\RSMQ\RSMQClientInterface $rsmq */ $id = $rsmq->sendMessage('queue', 'a message'); $rsmq->deleteMessage('queue', $id);
popMessage
从队列接收下一个消息 并删除它。
重要:此方法会立即删除接收到的消息。如果在处理消息时出现问题,将无法再次接收该消息。
参数
$queue
(string): 队列名称。
返回一个具有以下属性的\AndrewBreksa\RSMQ\Message
对象
message
(string): 消息内容。id
(string): 内部消息ID。sent
(int): 此消息发送/创建的时间戳。firstReceived
(int): 此消息首次接收的时间戳。receiveCount
(int): 此消息被接收的次数。
注意:如果没有消息,将返回一个空对象
抛出
\AndrewBreksa\RSMQ\Exceptions\QueueNotFoundException
\AndrewBreksa\RSMQ\Exceptions\QueueParametersValidationException
示例
<?php /** * @var AndrewBreksa\RSMQ\RSMQClientInterface $rsmq */ $message = $rsmq->popMessage('myqueue'); echo "Message ID: ", $message->getId(); echo "Message: ", $message->getMessage();
changeMessageVisibility
更改单个消息的可见性计时器。消息将再次可见的时间是从当前时间(现在)加上vt
计算得出的。
参数
qname
(string): 队列名称。id
(string): 消息ID。vt
(int): 此消息不可见的时间长度,单位为秒。允许的值:0-9999999(约115天)
返回
- 如果成功,返回
true
,如果消息未找到,返回false
(bool)。
抛出
\AndrewBreksa\RSMQ\Exceptions\QueueParametersValidationException
\AndrewBreksa\RSMQ\Exceptions\QueueNotFoundException
示例
<?php /** * @var AndrewBreksa\RSMQ\RSMQClientInterface $rsmq */ $queue = 'myqueue'; $id = $rsmq->sendMessage($queue, 'a message'); if($rsmq->changeMessageVisibility($queue, $id, 60)) { echo "Message hidden for 60 secs"; }
实时
在创建AndrewBreksa\RSMQ\RSMQClient
实例时,您可以通过传递\AndrewBreksa\RSMQ\RSMQClient::__construct
的$realtime
参数为true
来启用新消息的实时PUBLISH
。每当通过sendMessage
发送新消息时,都会向{rsmq.ns}:rt:{qname}
发出Redis PUBLISH
。
带有默认设置的RSMQ示例
- 队列
testQueue
已经包含5条消息。 - 正在向队列
testQueue
发送新消息。 - 将发出以下Redis命令:
PUBLISH rsmq:rt:testQueue 6
实时选项允许在将新消息发送到RSMQ时发送一个PUBLISH
,但是没有在此基础上构建更多功能。您的应用程序可以使用Redis的SUBSCRIBE
命令来接收新消息的通知,然后尝试从队列中轮询,但是由于Redis pub/sub系统的运作方式,所有监听器都将收到新消息的通知,这种方法不适合在拥有多个订阅进程的环境中处理消息。
QueueWorker
QueueWorker类提供了一个方便的方式来消费RSMQ消息,要使用它
<?php /** * @var AndrewBreksa\RSMQ\RSMQClientInterface $rsmq */ use AndrewBreksa\RSMQ\ExecutorInterface; use AndrewBreksa\RSMQ\Message; use AndrewBreksa\RSMQ\QueueWorker; use AndrewBreksa\RSMQ\WorkerSleepProvider; $executor = new class() implements ExecutorInterface{ public function __invoke(Message $message) : bool { //@todo: do some work, true will ack/delete the message, false will allow the queue's config to "re-publish" return true; } }; $sleepProvider = new class() implements WorkerSleepProvider{ public function getSleep() : ?int { /** * This allows you to return null to stop the worker, which can be used with something like redis to mark. * * Note that this method is called _before_ we poll for a message, and therefore if it returns null we'll eject * before we process a message. */ return 1; } }; $worker = new QueueWorker($rsmq, $executor, $sleepProvider, 'test_queue'); $worker->work(); // here we can optionally pass true to only process one message
许可证
MIT许可。请参阅LICENSE