byjg/rabbitmq-client

使用byjg/message-queue-client组件的最小RabbitMQ客户端

4.9.0 2024-01-04 21:27 UTC

This package is auto-updated.

Last update: 2024-09-13 00:04:41 UTC


README

Build Status Opensource ByJG GitHub source GitHub license GitHub release

它创建了一个简单的抽象层,用于通过byjg/message-queue-client组件从RabbitMQ服务器发布和消费消息。

有关如何使用消息队列客户端的详细信息,请参阅文档

用法

发布

<?php
// Register the connector and associate with a scheme
ConnectorFactory::registerConnector(RabbitMQConnector::class);

// Create a connector
$connector = ConnectorFactory::create(new Uri("amqp://$user:$pass@$host:$port/$vhost"));

// Create a queue
$pipe = new Pipe("test");
$pipe->withDeadLetter(new Pipe("dlq_test"));

// Create a message
$message = new Message("Hello World");

// Publish the message into the queue
$connector->publish(new Envelope($pipe, $message));

消费

<?php
// Register the connector and associate with a scheme
ConnectorFactory::registerConnector(RabbitMQConnector::class);

// Create a connector
$connector = ConnectorFactory::create(new Uri("amqp://$user:$pass@$host:$port/$vhost"));

// Create a queue
$pipe = new Pipe("test");
$pipe->withDeadLetter(new Pipe("dlq_test"));

// Connect to the queue and wait to consume the message
$connector->consume(
    $pipe,                                 // Queue name
    function (Envelope $envelope) {         // Callback function to process the message
        echo "Process the message";
        echo $envelope->getMessage()->getBody();
        return Message::ACK;
    },
    function (Envelope $envelope, $ex) {    // Callback function to process the failed message
        echo "Process the failed message";
        echo $ex->getMessage();
        return Message::REQUEUE;
    }
);

消费方法将等待消息并调用回调函数处理消息。如果队列中没有消息,该方法将等待直到有消息到达。

如果您想退出消费方法,只需从回调函数中返回Message::ACK | Message::EXIT

回调函数可能的返回值

  • Message::ACK - 识别消息并从队列中删除
  • Message::NACK - 不识别消息并从队列中删除。如果队列有死信队列,消息将被发送到死信队列。
  • Message::REQUEUE - 重新入队消息
  • Message::EXIT - 退出消费方法

RabbitMQ客户端(AMQP协议)

RabbitMQ连接器使用php-amqplib库。

连接器的标准行为是创建一个交换器、一个队列,并将队列绑定到交换器上,使用路由键(默认与队列名称相同)。所有消息都发布到交换器,从队列中消费。

由于连接器创建了队列和交换器,因此建议您不要从现有的队列中发布/消费。如果您使用现有的队列,可能会遇到错误

PHP Fatal error:  Uncaught PhpAmqpLib\Exception\AMQPProtocolChannelException: PRECONDITION_FAILED - Existing queue 'test' declared with other arguments in AMQPChannel.php:224

您可以通过使用Pipe::withProperty()Message::withProperty()方法来更改连接的行为。其中一些默认值已被RabbitMQConnector设置

  • Pipe::withProperty(RabbitMQConnector::EXCHANGE) - 设置交换器名称。默认为队列名称。
  • Pipe::withProperty(RabbitMQConnector::ROUTING_KEY) - 设置路由键。默认为队列名称。
  • Pipe::withProperty('x-message-ttl') - 仅影响死信队列。设置消息的存活时间为毫秒。默认3天。
  • Pipe::withProperty('x-expires') - 仅影响死信队列。设置队列的存活时间为毫秒。默认3天。
  • Message::withProperty('content_type') - 设置消息的内容类型。默认为text/plain。
  • Message::withProperty('delivery_mode') - 设置消息的投递模式。默认为2(持久)。

协议

依赖项

开源ByJG