run-as-root / magento2-message-queue-retry
通过 RabbitMQ 的死信交换提供消息队列重试处理功能。
Requires
- php: ^8.1
- magento/framework: ~103.0.5
- magento/framework-amqp: ~100.4.3
- magento/module-backend: ~102.0.5
- magento/module-config: ~101.2.5
- magento/module-ui: ~101.2.5
- php-amqplib/php-amqplib: ^v3.2.0
Requires (Dev)
- bitexpert/phpstan-magento: ^0.29.0
- magento/magento-coding-standard: *
- pdepend/pdepend: ^2.13
- phpcompatibility/php-compatibility: ^9.3
- phpmd/phpmd: ^2.13
- phpstan/phpstan: ^1.10
- phpunit/phpunit: ~9.5.20
- sebastian/phpcpd: ^6.0
- slevomat/coding-standard: ^8.8
- squizlabs/php_codesniffer: ~3.7.0
README
run-as-root/magento2-message-queue-retry
它提供了利用 RabbitMQ 的 死信交换 功能重复处理相同队列消息的可能性。
目录
先决条件
- Magento 2.4.5 或更高版本
- PHP 8.1 或更高版本
- RabbitMQ
为了使用此模块,您必须手动通过 queue_topology.xml
文件为想要启用重试机制的队列配置死信交换(s)。示例将在 配置 部分给出。
其他必要条件是您的交换必须有一个从交换到仅一个主题和队列的关系,
例如
安装
通过 composer 安装模块
composer require run-as-root/magento2-message-queue-retry
启用模块
bin/magento module:enable RunAsRoot_MessageQueueRetry
功能
- 切换激活
- 配置队列的重试限制
- 管理达到重试限制的消息的行政网格
- 将失败的消息重新入队到其原始队列
- 删除消息
- 下载消息体
工作原理
默认情况下,当在消费者执行期间抛出异常时,Magento 的消费者行为是拒绝消息。如果您为队列使用标准配置(没有死信交换),则消息将被丢弃,不再处理。
此模块将改变一点这种行为。它将引入一个额外的步骤来检查消息是否达到重试限制,如果是,则从 RabbitMQ 中删除并将消息发送到 run_as_root_queue_error_message
MySQL 表,并通过管理员面板手动管理。
如果消息尚未达到重试限制,则将被拒绝,RabbitMQ 将将其发送到死信交换。消息将被自动路由到“延迟”队列,并保持在那里,直到 TTL 时间到达。TTL 时间到达后,消息将被返回到其原始队列。
下面的图示说明了这两种方法
在管理员面板中,将有一个新的网格来管理达到重试限制的消息
路径:RunAsRoot > 管理消息
网格列
- 主题名称:消息所属的主题名称
- 总重试次数:消息被处理的总次数
- 失败描述:处理消息时抛出的异常消息
- 消息体:原始消息体,可以下载为文件,格式为 JSON。
网格操作
- 重新入队:将消息发送到原始队列
- 下载:将消息体内容作为 JSON 文件下载
网格还具有批量操作来删除或重新入队所选消息。
可以在网格和模块配置中为每个操作配置 ACL。
配置
配置队列的重试需要两个步骤
- 配置死信交换
- 声明重试限制 xml 配置
- 启用消息队列重试管理员配置
1. 配置死信交换器
让我们设想一个场景,假设在您的项目中已经存在名为 erp_order_export
的队列,为了简化示例,主题名称、交换器名称和队列名称相同:erp_order_export
。
我们需要更改这两个文件以声明和配置延迟队列
communication.xml
queue_topology.xml
当前队列配置如下:
etc/communication.xml
:
<?xml version="1.0"?> <config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="urn:magento:framework:Communication/etc/communication.xsd"> <topic name="erp_order_export" request="string"/> </config>
etc/queue_topology.xml
:
<?xml version="1.0"?> <config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="urn:magento:framework-message-queue:etc/topology.xsd"> <exchange name="erp_order_export" connection="amqp" type="topic"> <binding id="erp_order_export" topic="erp_order_export" destinationType="queue" destination="erp_order_export"/> </exchange> </config>
您需要将其更改为:
etc/communication.xml
:
<?xml version="1.0"?> <config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="urn:magento:framework:Communication/etc/communication.xsd"> <topic name="erp_order_export" request="string"/> <topic name="erp_order_export_delay" request="string"/> </config>
etc/queue_topology.xml
:
<?xml version="1.0"?> <config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="urn:magento:framework-message-queue:etc/topology.xsd"> <exchange name="erp_order_export" connection="amqp" type="topic"> <binding id="erp_order_export" topic="erp_order_export" destinationType="queue" destination="erp_order_export"> <arguments> <argument name="x-dead-letter-exchange" xsi:type="string">erp_order_export_delay</argument> <argument name="x-dead-letter-routing-key" xsi:type="string">erp_order_export_delay</argument> </arguments> </binding> </exchange> <!-- Delay queue --> <exchange name="erp_order_export_delay" connection="amqp" type="topic"> <binding id="erp_order_export_delay" topic="erp_order_export_delay" destinationType="queue" destination="erp_order_export_delay"> <arguments> <argument name="x-dead-letter-exchange" xsi:type="string">erp_order_export</argument> <argument name="x-dead-letter-routing-key" xsi:type="string">erp_order_export</argument> <argument name="x-message-ttl" xsi:type="number">300000</argument> </arguments> </binding> </exchange> </config>
在 erp_order_export
交换器绑定中,我们添加了 x-dead-letter-exchange
和 x-dead-letter-routing-key
参数,这将在消息被拒绝时将消息路由到 erp_order_export_delay
交换器。
我们添加了 erp_order_export_delay
交换器和绑定,它指向原始交换器(erp_order_export
)。x-message-ttl
参数将配置消息在 erp_order_export_delay
队列中停留的时长,在这个示例中为 5 分钟(300000 毫秒)。当生命周期到期(TTL)时,RabbitMQ 将自动将消息发送到 erp_order_export
。
erp_order_export_delay
队列没有消费者,它将仅用于根据 x-message-ttl
参数定义的周期(delay)持有消息。
2. 声明重试限制 XML 配置
创建名为 Vendor_ModuleName/etc/queue_retry.xml
的文件,内容如下
<?xml version="1.0"?> <config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="urn:RunAsRoot:module:RunAsRoot_MessageQueueRetry:/etc/queue_retry.xsd"> <topic name="erp_order_export" retryLimit="3"/> </config>
3. 启用消息队列重试管理配置
现在您需要切换重试队列模块的激活状态
系统 > 配置 > 以 root 身份运行 > 消息队列重试
注意:配置 消息保留总天数
是消息在数据库中保留的时长。在此期间后,消息将被 Cron 作业自动删除。
run_as_root_clean_old_queue_error_messages
Cron 作业计划每天凌晨 02:00 执行。
跳过重试
如果您的队列配置了重试,但某些场景不需要再次处理消息,只需在异常消息中添加 MESSAGE_QUEUE_SKIP_RETRY
字符串。这样,消息将不会进入重试循环。
重要提示:请确保使用 queue_retry.xml
文件配置队列的重试限制并启用消息队列重试配置。如果您配置了死信交换器,但未执行提到的步骤,消息将进入重试循环。换句话说,它将一直执行,直到消费者处理消息而不会抛出异常。这是 RabbitMQ 死信交换器的默认行为,即使此模块未安装,也会以这种方式工作。
有关如何配置 Magento 2 中的消息队列的更多信息,您可以查看这里。
探索真实场景
如果您想了解更多关于此模块的信息,并探索与之相关的实际场景,请查看我们撰写的博客文章。