byjg / rabbitmq-client
使用byjg/message-queue-client组件的最小RabbitMQ客户端
4.9.0
2024-01-04 21:27 UTC
Requires
- php: >=7.4
- ext-curl: *
- byjg/message-queue-client: 4.9.*
- php-amqplib/php-amqplib: ^3.5
Requires (Dev)
- phpunit/phpunit: 5.7.*|7.4.*|^9.5
README
它创建了一个简单的抽象层,用于通过byjg/message-queue-client组件从RabbitMQ服务器发布和消费消息。
有关如何使用消息队列客户端的详细信息,请参阅文档
用法
发布
<?php // Register the connector and associate with a scheme ConnectorFactory::registerConnector(RabbitMQConnector::class); // Create a connector $connector = ConnectorFactory::create(new Uri("amqp://$user:$pass@$host:$port/$vhost")); // Create a queue $pipe = new Pipe("test"); $pipe->withDeadLetter(new Pipe("dlq_test")); // Create a message $message = new Message("Hello World"); // Publish the message into the queue $connector->publish(new Envelope($pipe, $message));
消费
<?php // Register the connector and associate with a scheme ConnectorFactory::registerConnector(RabbitMQConnector::class); // Create a connector $connector = ConnectorFactory::create(new Uri("amqp://$user:$pass@$host:$port/$vhost")); // Create a queue $pipe = new Pipe("test"); $pipe->withDeadLetter(new Pipe("dlq_test")); // Connect to the queue and wait to consume the message $connector->consume( $pipe, // Queue name function (Envelope $envelope) { // Callback function to process the message echo "Process the message"; echo $envelope->getMessage()->getBody(); return Message::ACK; }, function (Envelope $envelope, $ex) { // Callback function to process the failed message echo "Process the failed message"; echo $ex->getMessage(); return Message::REQUEUE; } );
消费方法将等待消息并调用回调函数处理消息。如果队列中没有消息,该方法将等待直到有消息到达。
如果您想退出消费方法,只需从回调函数中返回Message::ACK | Message::EXIT
。
回调函数可能的返回值
Message::ACK
- 识别消息并从队列中删除Message::NACK
- 不识别消息并从队列中删除。如果队列有死信队列,消息将被发送到死信队列。Message::REQUEUE
- 重新入队消息Message::EXIT
- 退出消费方法
RabbitMQ客户端(AMQP协议)
RabbitMQ连接器使用php-amqplib库。
连接器的标准行为是创建一个交换器、一个队列,并将队列绑定到交换器上,使用路由键(默认与队列名称相同)。所有消息都发布到交换器,从队列中消费。
由于连接器创建了队列和交换器,因此建议您不要从现有的队列中发布/消费。如果您使用现有的队列,可能会遇到错误
PHP Fatal error: Uncaught PhpAmqpLib\Exception\AMQPProtocolChannelException: PRECONDITION_FAILED - Existing queue 'test' declared with other arguments in AMQPChannel.php:224
您可以通过使用Pipe::withProperty()
和Message::withProperty()
方法来更改连接的行为。其中一些默认值已被RabbitMQConnector设置
Pipe::withProperty(RabbitMQConnector::EXCHANGE)
- 设置交换器名称。默认为队列名称。Pipe::withProperty(RabbitMQConnector::ROUTING_KEY)
- 设置路由键。默认为队列名称。Pipe::withProperty('x-message-ttl')
- 仅影响死信队列。设置消息的存活时间为毫秒。默认3天。Pipe::withProperty('x-expires')
- 仅影响死信队列。设置队列的存活时间为毫秒。默认3天。Message::withProperty('content_type')
- 设置消息的内容类型。默认为text/plain。Message::withProperty('delivery_mode')
- 设置消息的投递模式。默认为2(持久)。
协议