lukasz93p / async-message-channel

允许添加并处理异步消息的包。目前基于RabbitMQ。

1.1 2019-12-14 10:58 UTC

This package is auto-updated.

Last update: 2024-09-14 21:43:31 UTC


README

当前实现使用RabbitMQ,因此要使用它,您必须安装此代理。

为什么使用此包?

  • 它抽象了与异步消息发布和处理相关的所有困难。
  • 它保证消息可靠发布 - 在发布失败的情况下,即使只有一个消息抛出异常。
  • 它有助于将未成功处理的消息送回队列。

如何使用

发布

1 添加环境配置

MQ_BROKER_HOST={rabbit host}
MQ_BROKER_PORT={rabbit port}
MQ_BROKER_USER={rabbit user}
MQ_BROKER_PASSWORD={rabbit password}

2 创建 AsynchronousMessageChannel 实例

$asynchronousMessageChannel = AsynchronousMessageChannelFactory::withLogger($implementationOfPsrLoggerInterface);

Logger 是实现 Psr\Log\LoggerInterface 类的实例,它将用于记录在消息处理过程中可能发生的错误。

3 发布消息

消息必须是实现 PublishableMessage 类的实例。目前实现与RabbitMQ要求一致,因此 PublishableMessage 定义了三个方法

  • body - 返回消息体作为字符串
  • routingKey - 返回由RabbitMQ用于将消息路由到正确队列的路由键
  • exchangeName - 返回用于消息发布的RabbitMQ交换机名称

您可以使用默认实现

$asynchronousMessageChannel = AsynchronousMessageChannelFactory::withLogger($implementationOfPsrLoggerInterface);
$publishableMessage = BasicMessage::publishable($routingKeyForMessage, $exchangeNameForMessage, $messageBody);
$asynchronousMessageChannel->add([$publishableMessage]);

AsynchronousMessageChannel::add 接收 PublishableMessage 数组,并以可靠的方式批量发布它们。如果从RabbitMQ收到发布失败的信息,则抛出 MessagePublishingFailed

处理

1 创建 MessageHandler

对于消息处理,需要实现 MessageHandler,该接口仅定义了一个方法

interface MessageHandler
{
    /**
     * @param ProcessableMessage $message
     * @throws Throwable
     * @throws MessageConstantlyUnprocessable
     * @throws MessageTemporaryUnprocessable
     */
    public function handle(ProcessableMessage $message): void;
}

如您所见,handle 接收 ProcessableMessage 作为唯一的参数。

interface ProcessableMessage
{
    public function body(): string;
}

2 在 MessageHandler::handle 中决定如何处理已处理的消息

客户端代码可以通过实现 handle 方法来决定如何处理已处理的消息

  • 如果 handle 方法抛出任何异常/可抛出物(除 MessageConstantlyUnprocessable 之外),则 AsynchronousMessageChannel 将从RabbitMQ中 reject 消息。当消息被拒绝时,它将从队列中删除,但您可以将RabbitMQ配置为使用 fallback queue,并指示代理将消息从 fallback queue 传递回任何其他队列(可能是原始添加消息的队列),并有一定的延迟。
  • 如果 handle 抛出 MessageConstantlyUnprocessable,则 AsynchronousMessageChannel 通知RabbitMQ消息已成功处理,然后RabbitMQ简单地删除消息。
总结
  • MessageHandler::handle 抛出的所有异常都将被记录。
  • 如果消息已成功处理,则 MessageHandler::handle 不应抛出任何异常。
  • 如果消息处理失败,但您 不希望 再次接收该消息并记录异常,则在 MessageHandler::handle 中抛出 MessageConstantlyUnprocessable
  • 如果消息处理失败,但您 希望 再次接收该消息并记录异常,则在 MessageHandler::handle 中抛出任何异常(您可以更明确地抛出 MessageTemporaryUnprocessable)。要再次接收消息,您还必须为RabbitMQ配置 fallback queue

3 开始处理消息

要开始处理消息,您需要一个 AsynchronousMessageChannel 实例

$asynchronousMessageChannel = AsynchronousMessageChannelFactory::withLogger($implementationOfPsrLoggerInterface);

然后您应该使用 AsynchronousMessageChannel::startProcessingQueue

$asynchronousMessageChannel->startProcessingQueue($myImplementationOfMessageHandler, $nameOfRabbitMQQueueFromWhichMessagesWillBeProcessed);