byjg / message-queue-client
一种通用且可扩展的轻量级消息客户端,用于从亚马逊SQS、亚马逊SNS、谷歌PubSub、RabbitMQ、Kafka等消息服务器发布和消费消息。
4.9.0
2024-01-04 21:19 UTC
Requires (Dev)
- phpunit/phpunit: 5.7.*|7.4.*|^9.5
README
这是一个简单客户端,用于从消息队列服务器发布和消费消息。
功能
- 低代码发布和消费消息
- 消息、队列和连接器对象解耦
- 易于实现新的连接器
┌─────────────────┐ ┌────────────────────────┐
│ │ │ Envelope │
│ │ │ │
│ │ │ │
│ │ │ ┌─────────────────┐ │
│ │ publish() │ │ Pipe │ │
│ ├─────────────────▶│ └─────────────────┘ │
│ │ │ ┌─────────────────┐ │
│ │ │ │ Message │ │
│ │ │ └─────────────────┘ │
│ │ │ │
│ │ └────────────────────────┘
│ Connector │
│ │
│ │
│ │ consume() ┌─────────────────┐
│ │◀────────────────────│ Pipe │
│ │ └─────────────────┘
│ │
│ │
│ │
└─────────────────┘
已实现的连接器
用法
发布
<?php // Register the connector and associate with a scheme ConnectorFactory::registerConnector(MockConnector::class); // Create a connector $connector = ConnectorFactory::create(new Uri("mock://local")); // Create a queue $pipe = new Pipe("test"); $pipe->withDeadLetter(new Pipe("dlq_test")); // Create a message $message = new Message("Hello World"); // Publish the message into the queue $connector->publish(new Envelope($pipe, $message));
消费
<?php // Register the connector and associate with a scheme ConnectorFactory::registerConnector(MockConnector::class); // Create a connector $connector = ConnectorFactory::create(new Uri("mock://local")); // Create a queue $pipe = new Pipe("test"); $pipe->withDeadLetter(new Pipe("dlq_test")); // Connect to the queue and wait to consume the message $connector->consume( $pipe, // Queue name function (Envelope $envelope) { // Callback function to process the message echo "Process the message"; echo $envelope->getMessage()->getBody(); return Message::ACK; }, function (Envelope $envelope, $ex) { // Callback function to process the failed message echo "Process the failed message"; echo $ex->getMessage(); return Message::REQUEUE; } );
消费方法将等待消息,并调用回调函数处理消息。如果没有消息在队列中,该方法将等待直到有消息到达。
如果您想退出消费方法,只需从回调函数返回Message::ACK | Message::EXIT
。
回调函数可能的返回值
Message::ACK
- 确认消息并从队列中删除Message::NACK
- 不确认消息并从队列中删除。如果队列有死信队列,消息将被发送到死信队列。Message::REQUEUE
- 重新入队消息Message::EXIT
- 退出消费方法
消费者客户端
您可以使用ConsumerClientTrait简化消费方法。更多详细信息请参阅docs/consumer-client-trait.md。
连接器
连接器是负责连接到消息队列服务器并发送/接收消息的类。
所有连接器都有以下接口
<?php interface ConnectorInterface { public function setUp(Uri $uri); public function getDriver(); public function publish(Envelope $envelope); public function consume(Pipe $pipe, \Closure $onReceive, \Closure $onError, $identification = null); }
不需要调用getDriver()
方法,因为publish()
和consume()
方法会自动调用它。只有在您需要直接访问连接时才使用getDriver()
方法。