lukasz93p / async-message-channel
允许添加并处理异步消息的包。目前基于RabbitMQ。
1.1
2019-12-14 10:58 UTC
Requires
- php-amqplib/php-amqplib: ^2.10
- psr/log: ^1.1
Requires (Dev)
- phpunit/phpunit: ^8.4
This package is auto-updated.
Last update: 2024-09-14 21:43:31 UTC
README
当前实现使用RabbitMQ,因此要使用它,您必须安装此代理。
为什么使用此包?
- 它抽象了与异步消息发布和处理相关的所有困难。
- 它保证消息可靠发布 - 在发布失败的情况下,即使只有一个消息抛出异常。
- 它有助于将未成功处理的消息送回队列。
如何使用
发布
1 添加环境配置
MQ_BROKER_HOST={rabbit host}
MQ_BROKER_PORT={rabbit port}
MQ_BROKER_USER={rabbit user}
MQ_BROKER_PASSWORD={rabbit password}
2 创建 AsynchronousMessageChannel 实例
$asynchronousMessageChannel = AsynchronousMessageChannelFactory::withLogger($implementationOfPsrLoggerInterface);
Logger 是实现 Psr\Log\LoggerInterface 类的实例,它将用于记录在消息处理过程中可能发生的错误。
3 发布消息
消息必须是实现 PublishableMessage 类的实例。目前实现与RabbitMQ要求一致,因此 PublishableMessage 定义了三个方法
body- 返回消息体作为字符串routingKey- 返回由RabbitMQ用于将消息路由到正确队列的路由键exchangeName- 返回用于消息发布的RabbitMQ交换机名称
您可以使用默认实现
$asynchronousMessageChannel = AsynchronousMessageChannelFactory::withLogger($implementationOfPsrLoggerInterface); $publishableMessage = BasicMessage::publishable($routingKeyForMessage, $exchangeNameForMessage, $messageBody); $asynchronousMessageChannel->add([$publishableMessage]);
AsynchronousMessageChannel::add 接收 PublishableMessage 数组,并以可靠的方式批量发布它们。如果从RabbitMQ收到发布失败的信息,则抛出 MessagePublishingFailed。
处理
1 创建 MessageHandler
对于消息处理,需要实现 MessageHandler,该接口仅定义了一个方法
interface MessageHandler { /** * @param ProcessableMessage $message * @throws Throwable * @throws MessageConstantlyUnprocessable * @throws MessageTemporaryUnprocessable */ public function handle(ProcessableMessage $message): void; }
如您所见,handle 接收 ProcessableMessage 作为唯一的参数。
interface ProcessableMessage { public function body(): string; }
2 在 MessageHandler::handle 中决定如何处理已处理的消息
客户端代码可以通过实现 handle 方法来决定如何处理已处理的消息
- 如果
handle方法抛出任何异常/可抛出物(除MessageConstantlyUnprocessable之外),则AsynchronousMessageChannel将从RabbitMQ中reject消息。当消息被拒绝时,它将从队列中删除,但您可以将RabbitMQ配置为使用fallback queue,并指示代理将消息从fallback queue传递回任何其他队列(可能是原始添加消息的队列),并有一定的延迟。 - 如果
handle抛出MessageConstantlyUnprocessable,则AsynchronousMessageChannel通知RabbitMQ消息已成功处理,然后RabbitMQ简单地删除消息。
总结
MessageHandler::handle抛出的所有异常都将被记录。- 如果消息已成功处理,则
MessageHandler::handle不应抛出任何异常。 - 如果消息处理失败,但您 不希望 再次接收该消息并记录异常,则在
MessageHandler::handle中抛出MessageConstantlyUnprocessable。 - 如果消息处理失败,但您 希望 再次接收该消息并记录异常,则在
MessageHandler::handle中抛出任何异常(您可以更明确地抛出MessageTemporaryUnprocessable)。要再次接收消息,您还必须为RabbitMQ配置fallback queue。
3 开始处理消息
要开始处理消息,您需要一个 AsynchronousMessageChannel 实例
$asynchronousMessageChannel = AsynchronousMessageChannelFactory::withLogger($implementationOfPsrLoggerInterface);
然后您应该使用 AsynchronousMessageChannel::startProcessingQueue
$asynchronousMessageChannel->startProcessingQueue($myImplementationOfMessageHandler, $nameOfRabbitMQQueueFromWhichMessagesWillBeProcessed);