m6web/redis-message-broker

此包已被废弃,不再维护。没有建议的替代包。

基于redis的消息代理组件

v0.6.0 2018-09-18 15:21 UTC

README

此组件可以帮助您在redis后端上构建消息代理系统。它将利用redis集群功能,在产生消息时将消息分片到多个redis列表中。在非自动确认模式下,消费者实现了一个工作列表,以确保在处理失败时不会丢失任何消息。

按照设计,产生非常快(一个Redis命令),如果手动确认消息,消费可能会较慢。

您应该使用Redis >= 2.8。

使用方法

生产者

<?php
use M6Web\Component\RedisMessageBroker;
use Predis\Client as PredisClient;

$redisClient = new PredisClient(); // refer to PredisDocumentation
$queue = new RedisMessageBroker\Queue\Definition('raoul');
$message = new RedisMessageBroker\MessageEnvelope(uniqid(), 'un message');

$producer = new RedisMessageBroker\MessageHandler\Producer($queue, $redisClient);
$producer->publishMessage($message);

消费者

消费者应该被包装在工作者中。应向消费者构造函数传递一个唯一ID。如果您使用工作者,uniqId必须对每个工作者是常数。

<?php
use M6Web\Component\RedisMessageBroker;
use Predis\Client as PredisClient;

$redisClient = new PredisClient(); // refer to PredisDocumentation
$queue = new RedisMessageBroker\Queue\Definition('raoul');
$consumer = new RedisMessageBroker\MessageHandler\Consumer($queue, $redisClient, uniqid());

$message = $consumer->getMessageEnvelope();

检查器

检查器方法允许您计算队列中准备好或正在处理的消息数量。

<?php
use M6Web\Component\RedisMessageBroker;
use Predis\Client as PredisClient;

$redisClient = new PredisClient(); // refer to PredisDocumentation
$queue = new RedisMessageBroker\Queue\Definition('raoul');

$inspector = new RedisMessageBroker\Queue\Inspector($queue, $redisClient);
$countInProgress = $inspector->countInProgressMessages();
$countReady = $inspector->countReadyMessages();

清理

清理方法允许您在消息队列中执行清理。清理非常慢,因为需要扫描队列中的所有消息。

<?php
use M6Web\Component\RedisMessageBroker;
use Predis\Client as PredisClient;

$redisClient = new PredisClient(); // refer to PredisDocumentation
$queue = new RedisMessageBroker\Queue\Definition('raoul');

$cleanup = new RedisMessageBroker\Queue\Cleanup($queue, $redisClient);
$cleanup->cleanOldMessages(
    3600, // erase messages older than 3600 seconds
    true  // clean message in the ready queue too. Mandatory use if you are in no-autoack mode 
);

队列选项

为了避免热点,可以在多个列表上对队列进行分片

$queue = new RedisMessageBroker\Queue\Definition('raoul', 10); // shard on 10 lists

在此模式下,消息将在10个列表之间写入和读取。FIFO不再保证。

消费者选项

手动消息确认

使用setNoAutoAck()

<?php
use M6Web\Component\RedisMessageBroker;
use Predis\Client as PredisClient;

$redisClient = new PredisClient(); // refer to PredisDocumentation
$queue = new RedisMessageBroker\Queue\Definition('raoul');
$consumer = new RedisMessageBroker\MessageHandler\Consumer($queue, $redisClient, uniqid());
$consumer->setNoAutoAck();

$message = $consumer->getMessageEnvelope();
if ($message) {
    // do something with the message
    $consumer->ack($message); // erase the message from the working list
}

查找消费者未确认的旧消息

每个消费者在对象构造期间都有一个唯一的ID。此ID允许消费者定义一个唯一的工作列表,其中消息在getMessageack之间存储。可以使用LostMessageConsumer类查找消费者工作列表,并将x秒以上(messageTtl参数)的消息从这些列表移动到队列列表中。当达到重试次数时,maxRetry参数将消息放入死信列表。

<?php
use M6Web\Component\RedisMessageBroker;
use Predis\Client as PredisClient;

$redisClient = new PredisClient(); // refer to PredisDocumentation
$queue = new RedisMessageBroker\Queue\Definition('raoul');
$consumer = new RedisMessageBroker\MessageHandler\LostMessagesConsumer($queue, $redisClient, 360, 3);
$consumer->requeueOldMessages();

messageTtl参数需要大于您处理消息的最大时间。(否则,在处理时将消息视为旧消息)