m6web / redis-message-broker
此包已被废弃,不再维护。没有建议的替代包。
基于redis的消息代理组件
v0.6.0
2018-09-18 15:21 UTC
Requires
- php: >=7.1
- predis/predis: ^1.1.1
Requires (Dev)
- atoum/atoum: ~3.0.0
- m6web/php-cs-fixer-config: ^1.0
- m6web/redis-mock: ^3.3.1
README
此组件可以帮助您在redis后端上构建消息代理系统。它将利用redis集群功能,在产生消息时将消息分片到多个redis列表中。在非自动确认模式下,消费者实现了一个工作列表,以确保在处理失败时不会丢失任何消息。
按照设计,产生非常快(一个Redis命令),如果手动确认消息,消费可能会较慢。
您应该使用Redis >= 2.8。
使用方法
生产者
<?php use M6Web\Component\RedisMessageBroker; use Predis\Client as PredisClient; $redisClient = new PredisClient(); // refer to PredisDocumentation $queue = new RedisMessageBroker\Queue\Definition('raoul'); $message = new RedisMessageBroker\MessageEnvelope(uniqid(), 'un message'); $producer = new RedisMessageBroker\MessageHandler\Producer($queue, $redisClient); $producer->publishMessage($message);
消费者
消费者应该被包装在工作者中。应向消费者构造函数传递一个唯一ID。如果您使用工作者,uniqId必须对每个工作者是常数。
<?php use M6Web\Component\RedisMessageBroker; use Predis\Client as PredisClient; $redisClient = new PredisClient(); // refer to PredisDocumentation $queue = new RedisMessageBroker\Queue\Definition('raoul'); $consumer = new RedisMessageBroker\MessageHandler\Consumer($queue, $redisClient, uniqid()); $message = $consumer->getMessageEnvelope();
检查器
检查器方法允许您计算队列中准备好或正在处理的消息数量。
<?php use M6Web\Component\RedisMessageBroker; use Predis\Client as PredisClient; $redisClient = new PredisClient(); // refer to PredisDocumentation $queue = new RedisMessageBroker\Queue\Definition('raoul'); $inspector = new RedisMessageBroker\Queue\Inspector($queue, $redisClient); $countInProgress = $inspector->countInProgressMessages(); $countReady = $inspector->countReadyMessages();
清理
清理方法允许您在消息队列中执行清理。清理非常慢,因为需要扫描队列中的所有消息。
<?php use M6Web\Component\RedisMessageBroker; use Predis\Client as PredisClient; $redisClient = new PredisClient(); // refer to PredisDocumentation $queue = new RedisMessageBroker\Queue\Definition('raoul'); $cleanup = new RedisMessageBroker\Queue\Cleanup($queue, $redisClient); $cleanup->cleanOldMessages( 3600, // erase messages older than 3600 seconds true // clean message in the ready queue too. Mandatory use if you are in no-autoack mode );
队列选项
为了避免热点,可以在多个列表上对队列进行分片
$queue = new RedisMessageBroker\Queue\Definition('raoul', 10); // shard on 10 lists
在此模式下,消息将在10个列表之间写入和读取。FIFO不再保证。
消费者选项
手动消息确认
使用setNoAutoAck()
<?php use M6Web\Component\RedisMessageBroker; use Predis\Client as PredisClient; $redisClient = new PredisClient(); // refer to PredisDocumentation $queue = new RedisMessageBroker\Queue\Definition('raoul'); $consumer = new RedisMessageBroker\MessageHandler\Consumer($queue, $redisClient, uniqid()); $consumer->setNoAutoAck(); $message = $consumer->getMessageEnvelope(); if ($message) { // do something with the message $consumer->ack($message); // erase the message from the working list }
查找消费者未确认的旧消息
每个消费者在对象构造期间都有一个唯一的ID。此ID允许消费者定义一个唯一的工作列表,其中消息在getMessage
和ack
之间存储。可以使用LostMessageConsumer类查找消费者工作列表,并将x秒以上(messageTtl
参数)的消息从这些列表移动到队列列表中。当达到重试次数时,maxRetry
参数将消息放入死信列表。
<?php use M6Web\Component\RedisMessageBroker; use Predis\Client as PredisClient; $redisClient = new PredisClient(); // refer to PredisDocumentation $queue = new RedisMessageBroker\Queue\Definition('raoul'); $consumer = new RedisMessageBroker\MessageHandler\LostMessagesConsumer($queue, $redisClient, 360, 3); $consumer->requeueOldMessages();
messageTtl
参数需要大于您处理消息的最大时间。(否则,在处理时将消息视为旧消息)