byjg/redis-queue-client

使用 byjg/message-queue-client 组件的 Redis 队列客户端,功能最小化

4.9.0 2024-01-04 21:38 UTC

This package is auto-updated.

Last update: 2024-09-13 00:06:57 UTC


README

Build Status Opensource ByJG GitHub source GitHub license GitHub release

它通过使用 byjg/message-queue-client 组件,为使用 Redis 发送和接收消息创建了一个简单的抽象层。

有关如何使用消息队列客户端的详细信息,请参阅 文档

用法

发布

<?php
// Register the connector and associate with a scheme
ConnectorFactory::registerConnector(RedisQueueConnector::class);

// Create a connector
$connector = ConnectorFactory::create(new Uri("redis://$user:$pass@$host:$port"));

// Create a queue
$pipe = new Pipe("test");
$pipe->withDeadLetter(new Pipe("dlq_test"));

// Create a message
$message = new Message("Hello World");

// Publish the message into the queue
$connector->publish(new Envelope($pipe, $message));

消费

<?php
// Register the connector and associate with a scheme
ConnectorFactory::registerConnector(RedisQueueConnector::class);

// Create a connector
$connector = ConnectorFactory::create(new Uri("redis://$user:$pass@$host:$port"));

// Create a queue
$pipe = new Pipe("test");
$pipe->withDeadLetter(new Pipe("dlq_test"));

// Connect to the queue and wait to consume the message
$connector->consume(
    $pipe,                                 // Queue name
    function (Envelope $envelope) {         // Callback function to process the message
        echo "Process the message";
        echo $envelope->getMessage()->getBody();
        return Message::ACK;
    },
    function (Envelope $envelope, $ex) {    // Callback function to process the failed message
        echo "Process the failed message";
        echo $ex->getMessage();
        return Message::REQUEUE;
    }
);

消费方法将等待消息并调用回调函数来处理该消息。如果队列中没有消息,该方法将等待直到有消息到达。

如果您想退出消费方法,只需从回调函数中返回 Message::ACK | Message::EXIT

回调函数可能的返回值

  • Message::ACK - 确认消息并从队列中删除
  • Message::NACK - 不确认消息并从队列中删除。如果队列有死信队列,消息将被发送到死信队列。
  • Message::REQUEUE - 重新入队消息
  • Message::EXIT - 退出消费方法

协议

依赖

开源 ByJG