byjg / redis-queue-client
使用 byjg/message-queue-client 组件的 Redis 队列客户端,功能最小化
4.9.0
2024-01-04 21:38 UTC
Requires
- php: >=7.4
- ext-curl: *
- ext-redis: *
- byjg/message-queue-client: 4.9.*
Requires (Dev)
- phpunit/phpunit: 5.7.*|7.4.*|^9.5
README
它通过使用 byjg/message-queue-client 组件,为使用 Redis 发送和接收消息创建了一个简单的抽象层。
有关如何使用消息队列客户端的详细信息,请参阅 文档
用法
发布
<?php // Register the connector and associate with a scheme ConnectorFactory::registerConnector(RedisQueueConnector::class); // Create a connector $connector = ConnectorFactory::create(new Uri("redis://$user:$pass@$host:$port")); // Create a queue $pipe = new Pipe("test"); $pipe->withDeadLetter(new Pipe("dlq_test")); // Create a message $message = new Message("Hello World"); // Publish the message into the queue $connector->publish(new Envelope($pipe, $message));
消费
<?php // Register the connector and associate with a scheme ConnectorFactory::registerConnector(RedisQueueConnector::class); // Create a connector $connector = ConnectorFactory::create(new Uri("redis://$user:$pass@$host:$port")); // Create a queue $pipe = new Pipe("test"); $pipe->withDeadLetter(new Pipe("dlq_test")); // Connect to the queue and wait to consume the message $connector->consume( $pipe, // Queue name function (Envelope $envelope) { // Callback function to process the message echo "Process the message"; echo $envelope->getMessage()->getBody(); return Message::ACK; }, function (Envelope $envelope, $ex) { // Callback function to process the failed message echo "Process the failed message"; echo $ex->getMessage(); return Message::REQUEUE; } );
消费方法将等待消息并调用回调函数来处理该消息。如果队列中没有消息,该方法将等待直到有消息到达。
如果您想退出消费方法,只需从回调函数中返回 Message::ACK | Message::EXIT
。
回调函数可能的返回值
Message::ACK
- 确认消息并从队列中删除Message::NACK
- 不确认消息并从队列中删除。如果队列有死信队列,消息将被发送到死信队列。Message::REQUEUE
- 重新入队消息Message::EXIT
- 退出消费方法