vasek-purchart / rabbit-mq-consumer-handler-bundle
以安全和有效的方式处理 RabbitMQ 消费者中的消息
Requires
- php: ~7.2
- consistence/consistence: ~1.0|~2.0
- doctrine/orm: ~2.6
- php-amqplib/rabbitmq-bundle: ~1.14
- psr/log: ~1.0
- symfony/config: ~3.3|~4.0
- symfony/dependency-injection: ~3.3|~4.0
- symfony/http-kernel: ~3.3|~4.0
- symfony/yaml: ~3.3|~4.0
Requires (Dev)
This package is auto-updated.
Last update: 2024-09-12 08:42:59 UTC
README
以安全和有效的方式处理 RabbitMQ 消费者中的消息
注意: 此包假定您正在使用 RabbitMqBundle
消息队列消费者通常需要长时间运行的过程,在它们终止之前应该处理许多不同的消息。在理想情况下,它们将无限期地运行。但由于我们并不生活在一个理想的世界,错误不可避免地会发生。这些通常包括
- 预期的应用程序异常,
- 意外的应用程序异常,
- 其他异常和错误,如连接中断等,
- 内存泄漏和其他意外行为。
此包的目的是封装对这些状态的处理器,自动化可以自动化的那些,并提供舒适的方式来处理剩余的那些。
此包可以自动处理
- 在未捕获的异常发生时停止消费者(以便它可以安全地重新启动)
- 记录未捕获的异常,
- 在处理消息之前清除 Doctrine EntityManager,
- 当 Doctrine EntityManager 关闭时停止消费者。
用法
为了获得自动化处理的全部好处,您只需要通过 ConsumerHandler
运行消息处理,因此标准消费者可以看起来像这样
<?php declare(strict_types = 1); namespace Example; use OldSound\RabbitMqBundle\RabbitMq\ConsumerInterface; use PhpAmqpLib\Message\AMQPMessage; use VasekPurchart\RabbitMqConsumerHandlerBundle\ConsumerHandler\ConsumerHandler; class ExampleConsumer implements \OldSound\RabbitMqBundle\RabbitMq\ConsumerInterface { /** @var \VasekPurchart\RabbitMqConsumerHandlerBundle\ConsumerHandler\ConsumerHandler */ private $consumerHandler; public function __construct( ConsumerHandler $consumerHandler ) { $this->consumerHandler = $consumerHandler; } public function execute(AMQPMessage $message): int { return $this->consumerHandler->processMessage(function () use ($message): int { $data = $message->body; // do your magic with $data, basically anything you would put in the consumer // without this bundle, apart from the stuff this bundle handles automatically return ConsumerInterface::MSG_ACK; }); } }
此包将为您的每个消费者创建一个 ConsumerHandler
实例,因为它需要访问特定的 OldSound\RabbitMqBundle\RabbitMq\DequeuerInterface
实例,它需要控制从配置队列中消费消息。
假设您的消费者在 old_sound_rabbit_mq
配置中称为 example
,则 vasek_purchart.rabbit_mq_consumer_handler.consumer_handler.id.example
服务将被准备,因此您只需将此实例传递给您的消费者即可
old_sound_rabbit_mq: consumers: example: callback: 'Example\ExampleConsumer' # ... services: Example\ExampleConsumer: arguments: $consumerHandler: '@vasek_purchart.rabbit_mq_consumer_handler.consumer_handler.id.example'
重启消费者
对于消费者,您通常需要一些东西来在它们失败时保持它们运行。这通常是通过某种类型的守护进程(例如 supervisord
)实现的,它会为您运行消费者,监视它们是否正在运行,并在它们未运行时根据您的配置重新启动它们。
为了使工具能够可靠地重新启动消费者,它首先需要能够确定消费者是否已经 启动,这样它就不会陷入启动循环。通常的配置是在无法正确启动后尝试几次后不再启动程序。在 suprvisord
中,程序在 startsecs
指定的时间后被认为是启动的。
此包确保,每次消费者由于未捕获的异常或错误而关闭时,消费者都至少运行了这么长时间,它会通过 stop_consumer_sleep_seconds
睡眠,该值应配置与 startsecs
相同。这意味着该行为可以通过 supervisord
正确处理 - 当它因处理消息而失败时重新启动消费者,但不会无限期地重新启动它,如果应用程序甚至无法启动。
异常处理
以下是一些您可能会遇到的常见情况示例
<?php declare(strict_types = 1); namespace Example; use OldSound\RabbitMqBundle\RabbitMq\ConsumerInterface; use PhpAmqpLib\Message\AMQPMessage; use Psr\Log\LogLevel; use VasekPurchart\RabbitMqConsumerHandlerBundle\ConsumerHandler\ConsumerHandler; class ExampleConsumer implements \OldSound\RabbitMqBundle\RabbitMq\ConsumerInterface { /** @var \VasekPurchart\RabbitMqConsumerHandlerBundle\ConsumerHandler\ConsumerHandler */ private $consumerHandler; public function __construct( ConsumerHandler $consumerHandler ) { $this->consumerHandler = $consumerHandler; } public function execute(AMQPMessage $message): int { return $this->consumerHandler->processMessage( function (ConsumerHandler $consumerHandler) use ($message): int { // the correct ConsumerHandler is passed into the callback, // so you can use it for custom logging etc try { $data = $message->body; // ... return ConsumerInterface::MSG_ACK; } catch (\ResourceNotFound $e) { // might be cause by the asynchronous nature of message queues // - a resource might not yet be accessible // or it might have been deleted already // basically you can choose if this is OK (ACK or REJECT, // depending on semantics), or if you want to try later // again (REJECT_REQUEUE) return ConsumerInterface::REJECT; } catch (\UnexpectedBusinessLogicException $e) { // situation which you are not sure, why it happens, // but you need to investigate further (perhaps with more logging) // and perhaps throw away these messages, because // it might clutter the queue $consumerHandler->log(LogLevel::ERROR, 'My custom message'); $consumerHandler->logException($e); return ConsumerInterface::MSG_REJECT; } catch (\UnexpectedException $e) { // situation where you might need to decide further // what to do in the catch block if ($e->getCode() === 123) { return ConsumerInterface::MSG_REJECT; } throw $e; // handle with default "catchall" } catch (\ExpectedBusinessLogicException $e) { // situation where you can solve it in a different way // call a service return ConsumerInterface::MSG_ACK; } catch (\ConnectionTimeoutCustomException $e) { // situation where the application would need a restart // to reinitialize for example a connection // this would happen also by default in the "catchall", // but you might want to handle a specific case separately, // for example not to log these exceptions // this will stop the consumer $consumerHandler->stopConsumer('Connection timeout'); return ConsumerInterface::MSG_REJECT_REQUEUE; } } ); } }
配置
配置结构及其默认值列表
# config/packages/rabbit_mq_consumer_handler.yml rabbit_mq_consumer_handler: # Generally how long is needed for the program to run, to be considered started, # achieved by sleeping when stopping prematurely stop_consumer_sleep_seconds: 1 logger: # Logger service ID, which instance will be used to log messages and exceptions service_id: 'logger' entity_manager: # EntityManager service ID, which instance is used withing the consumer service_id: 'doctrine.orm.default_entity_manager' # Clear EntityManager before processing message clear_em_before_message: true consumers: # configuration specifically for this consumer <my_consumer_name>: # identical structure as the options above
安装
使用 vasek-purchart/rabbit-mq-consumer-handler-bundle
通过 Composer 安装软件包
composer require vasek-purchart/rabbit-mq-consumer-handler-bundle
在您的应用程序内核中注册该捆绑包
// config/bundles.php return [ // ... VasekPurchart\RabbitMqConsumerHandlerBundle\RabbitMqConsumerHandlerBundle::class => ['all' => true], ];