mhujer / rabbit-mq-consumer-handler-bundle
以安全有效的方式处理RabbitMQ消费者的消息
Requires
- php: ~7.4 || ~8.0
- consistence-community/consistence: ~2.1
- doctrine/orm: ~2.6
- php-amqplib/rabbitmq-bundle: ~2.2
- psr/log: ~1.0
- symfony/config: ~4.4|^5.4|^6.0
- symfony/dependency-injection: ~4.4|^5.4|^6.0
- symfony/http-kernel: ~4.4|^5.4|^6.0
- symfony/yaml: ~4.4|^5.4|^6.0
Requires (Dev)
- consistence-community/coding-standard: 3.11.1
- matthiasnoback/symfony-dependency-injection-test: 4.3.0
- phing/phing: 2.17.1
- php-parallel-lint/php-parallel-lint: 1.3.1
- phpunit/phpunit: 9.5.13
This package is auto-updated.
Last update: 2024-08-28 21:33:44 UTC
README
此包是基于vasek-purchart/rabbit-mq-consumer-handler-bundle
的分支,为更新的PHP和Symfony版本进行了维护。
以安全有效的方式处理RabbitMQ消费者的消息
注意:此bundle期望您使用RabbitMqBundle
消息队列消费者通常需要长时间运行的过程,在终止之前应该处理许多不同的消息。在一个理想的情况下,它们将无限期地运行。但是,因为我们并不生活在一个理想的世界,错误不可避免地会发生。这些通常可以是
- 预期的应用程序异常,
- 意外的应用程序异常,
- 其他异常和错误,如连接中断等,
- 内存泄漏和其他意外行为。
此bundle的目的就是封装这些状态的处理,自动化可以自动化的那些,并为剩余的提供舒适的处理方式。
此bundle可以自动处理
- 在未捕获的异常时停止消费者(以便可以安全地重新启动)
- 记录未捕获的异常,
- 在处理消息之前清除Doctrine EntityManager,
- 当Doctrine EntityManager关闭时停止消费者。
用法
为了获得自动处理的全部好处,您只需要通过ConsumerHandler
运行消息处理,因此标准消费者可以看起来像这样
<?php declare(strict_types = 1); namespace Example; use OldSound\RabbitMqBundle\RabbitMq\ConsumerInterface; use PhpAmqpLib\Message\AMQPMessage; use VasekPurchart\RabbitMqConsumerHandlerBundle\ConsumerHandler\ConsumerHandler; class ExampleConsumer implements \OldSound\RabbitMqBundle\RabbitMq\ConsumerInterface { /** @var \VasekPurchart\RabbitMqConsumerHandlerBundle\ConsumerHandler\ConsumerHandler */ private $consumerHandler; public function __construct( ConsumerHandler $consumerHandler ) { $this->consumerHandler = $consumerHandler; } public function execute(AMQPMessage $message): int { return $this->consumerHandler->processMessage(function () use ($message): int { $data = $message->body; // do your magic with $data, basically anything you would put in the consumer // without this bundle, apart from the stuff this bundle handles automatically return ConsumerInterface::MSG_ACK; }); } }
此bundle将为您的每个消费者创建ConsumerHandler
实例,因为它需要访问特定的OldSound\RabbitMqBundle\RabbitMq\DequeuerInterface
实例,它需要控制从配置的队列中消费消息。
假设您的消费者在old_sound_rabbit_mq
配置中名为example
,则将准备vasek_purchart.rabbit_mq_consumer_handler.consumer_handler.id.example
服务,因此您可以只需将此实例传递给您的消费者
old_sound_rabbit_mq: consumers: example: callback: 'Example\ExampleConsumer' # ... services: Example\ExampleConsumer: arguments: $consumerHandler: '@vasek_purchart.rabbit_mq_consumer_handler.consumer_handler.id.example'
重启消费者
对于消费者,您通常需要一些东西来在它们失败时保持它们运行。这通常通过某种类型的守护进程(例如 supervisord
)来实现,它将为您运行消费者,监控它们是否正在运行,并在它们未运行时根据您的配置重新启动它们。
为了使工具能够可靠地重新启动消费者,它首先需要能够确定消费者是否已经启动,这样它就不会陷入启动循环。通常的配置是在多次重试后无法正确启动程序时不再重新启动程序。在suprvisord
中,程序在startsecs
秒后被认为是启动的。
此bundle确保,当消费者由于未捕获的异常或错误而关闭时,消费者已经通过stop_consumer_sleep_seconds
休眠了至少这么多时间,该值应配置为与startsecs
相同。这意味着supervisord
可以正确处理行为——在由于处理消息而失败时重新启动消费者,但如果应用程序甚至无法启动则不会无限期地重新启动它。
处理异常
以下是一些您可能会遇到的最常见情况的示例
<?php declare(strict_types = 1); namespace Example; use OldSound\RabbitMqBundle\RabbitMq\ConsumerInterface; use PhpAmqpLib\Message\AMQPMessage; use Psr\Log\LogLevel; use VasekPurchart\RabbitMqConsumerHandlerBundle\ConsumerHandler\ConsumerHandler; class ExampleConsumer implements \OldSound\RabbitMqBundle\RabbitMq\ConsumerInterface { /** @var \VasekPurchart\RabbitMqConsumerHandlerBundle\ConsumerHandler\ConsumerHandler */ private $consumerHandler; public function __construct( ConsumerHandler $consumerHandler ) { $this->consumerHandler = $consumerHandler; } public function execute(AMQPMessage $message): int { return $this->consumerHandler->processMessage( function (ConsumerHandler $consumerHandler) use ($message): int { // the correct ConsumerHandler is passed into the callback, // so you can use it for custom logging etc try { $data = $message->body; // ... return ConsumerInterface::MSG_ACK; } catch (\ResourceNotFound $e) { // might be cause by the asynchronous nature of message queues // - a resource might not yet be accessible // or it might have been deleted already // basically you can choose if this is OK (ACK or REJECT, // depending on semantics), or if you want to try later // again (REJECT_REQUEUE) return ConsumerInterface::REJECT; } catch (\UnexpectedBusinessLogicException $e) { // situation which you are not sure, why it happens, // but you need to investigate further (perhaps with more logging) // and perhaps throw away these messages, because // it might clutter the queue $consumerHandler->log(LogLevel::ERROR, 'My custom message'); $consumerHandler->logException($e); return ConsumerInterface::MSG_REJECT; } catch (\UnexpectedException $e) { // situation where you might need to decide further // what to do in the catch block if ($e->getCode() === 123) { return ConsumerInterface::MSG_REJECT; } throw $e; // handle with default "catchall" } catch (\ExpectedBusinessLogicException $e) { // situation where you can solve it in a different way // call a service return ConsumerInterface::MSG_ACK; } catch (\ConnectionTimeoutCustomException $e) { // situation where the application would need a restart // to reinitialize for example a connection // this would happen also by default in the "catchall", // but you might want to handle a specific case separately, // for example not to log these exceptions // this will stop the consumer $consumerHandler->stopConsumer('Connection timeout'); return ConsumerInterface::MSG_REJECT_REQUEUE; } } ); } }
配置
配置结构,并列出默认值
# config/packages/rabbit_mq_consumer_handler.yml rabbit_mq_consumer_handler: # Generally how long is needed for the program to run, to be considered started, # achieved by sleeping when stopping prematurely stop_consumer_sleep_seconds: 1 logger: # Logger service ID, which instance will be used to log messages and exceptions service_id: 'logger' entity_manager: # EntityManager service ID, which instance is used withing the consumer service_id: 'doctrine.orm.default_entity_manager' # Clear EntityManager before processing message clear_em_before_message: true consumers: # configuration specifically for this consumer <my_consumer_name>: # identical structure as the options above
安装
使用Composer安装包mhujer/rabbit-mq-consumer-handler-bundle
composer require mhujer/rabbit-mq-consumer-handler-bundle
在您的应用程序内核中注册此包
// config/bundles.php return [ // ... VasekPurchart\RabbitMqConsumerHandlerBundle\RabbitMqConsumerHandlerBundle::class => ['all' => true], ];