mhujer/rabbit-mq-consumer-handler-bundle

以安全有效的方式处理RabbitMQ消费者的消息

1.3.1 2022-01-28 15:40 UTC

This package is auto-updated.

Last update: 2024-08-28 21:33:44 UTC


README

此包是基于vasek-purchart/rabbit-mq-consumer-handler-bundle的分支,为更新的PHP和Symfony版本进行了维护。

以安全有效的方式处理RabbitMQ消费者的消息

注意:此bundle期望您使用RabbitMqBundle

消息队列消费者通常需要长时间运行的过程,在终止之前应该处理许多不同的消息。在一个理想的情况下,它们将无限期地运行。但是,因为我们并不生活在一个理想的世界,错误不可避免地会发生。这些通常可以是

  • 预期的应用程序异常,
  • 意外的应用程序异常,
  • 其他异常和错误,如连接中断等,
  • 内存泄漏和其他意外行为。

此bundle的目的就是封装这些状态的处理,自动化可以自动化的那些,并为剩余的提供舒适的处理方式。

此bundle可以自动处理

  • 在未捕获的异常时停止消费者(以便可以安全地重新启动)
  • 记录未捕获的异常,
  • 在处理消息之前清除Doctrine EntityManager,
  • 当Doctrine EntityManager关闭时停止消费者。

用法

为了获得自动处理的全部好处,您只需要通过ConsumerHandler运行消息处理,因此标准消费者可以看起来像这样

<?php

declare(strict_types = 1);

namespace Example;

use OldSound\RabbitMqBundle\RabbitMq\ConsumerInterface;
use PhpAmqpLib\Message\AMQPMessage;
use VasekPurchart\RabbitMqConsumerHandlerBundle\ConsumerHandler\ConsumerHandler;

class ExampleConsumer implements \OldSound\RabbitMqBundle\RabbitMq\ConsumerInterface
{

	/** @var \VasekPurchart\RabbitMqConsumerHandlerBundle\ConsumerHandler\ConsumerHandler */
	private $consumerHandler;

	public function __construct(
		ConsumerHandler $consumerHandler
	)
	{
		$this->consumerHandler = $consumerHandler;
	}

	public function execute(AMQPMessage $message): int
	{
		return $this->consumerHandler->processMessage(function () use ($message): int {
			$data = $message->body;

			// do your magic with $data, basically anything you would put in the consumer
			// without this bundle, apart from the stuff this bundle handles automatically 

			return ConsumerInterface::MSG_ACK;
		});
	}

}

此bundle将为您的每个消费者创建ConsumerHandler实例,因为它需要访问特定的OldSound\RabbitMqBundle\RabbitMq\DequeuerInterface实例,它需要控制从配置的队列中消费消息。

假设您的消费者在old_sound_rabbit_mq配置中名为example,则将准备vasek_purchart.rabbit_mq_consumer_handler.consumer_handler.id.example服务,因此您可以只需将此实例传递给您的消费者

old_sound_rabbit_mq:
    consumers:
        example:
            callback: 'Example\ExampleConsumer'
            # ...

services:
    Example\ExampleConsumer:
        arguments:
            $consumerHandler: '@vasek_purchart.rabbit_mq_consumer_handler.consumer_handler.id.example'

重启消费者

对于消费者,您通常需要一些东西来在它们失败时保持它们运行。这通常通过某种类型的守护进程(例如 supervisord)来实现,它将为您运行消费者,监控它们是否正在运行,并在它们未运行时根据您的配置重新启动它们。

为了使工具能够可靠地重新启动消费者,它首先需要能够确定消费者是否已经启动,这样它就不会陷入启动循环。通常的配置是在多次重试后无法正确启动程序时不再重新启动程序。在suprvisord中,程序在startsecs秒后被认为是启动的。

此bundle确保,当消费者由于未捕获的异常或错误而关闭时,消费者已经通过stop_consumer_sleep_seconds休眠了至少这么多时间,该值应配置为与startsecs相同。这意味着supervisord可以正确处理行为——在由于处理消息而失败时重新启动消费者,但如果应用程序甚至无法启动则不会无限期地重新启动它。

处理异常

以下是一些您可能会遇到的最常见情况的示例

<?php

declare(strict_types = 1);

namespace Example;

use OldSound\RabbitMqBundle\RabbitMq\ConsumerInterface;
use PhpAmqpLib\Message\AMQPMessage;
use Psr\Log\LogLevel;
use VasekPurchart\RabbitMqConsumerHandlerBundle\ConsumerHandler\ConsumerHandler;

class ExampleConsumer implements \OldSound\RabbitMqBundle\RabbitMq\ConsumerInterface
{

	/** @var \VasekPurchart\RabbitMqConsumerHandlerBundle\ConsumerHandler\ConsumerHandler */
	private $consumerHandler;

	public function __construct(
		ConsumerHandler $consumerHandler
	)
	{
		$this->consumerHandler = $consumerHandler;
	}

	public function execute(AMQPMessage $message): int
	{
		return $this->consumerHandler->processMessage(
			function (ConsumerHandler $consumerHandler) use ($message): int {
				// the correct ConsumerHandler is passed into the callback,
				// so you can use it for custom logging etc

				try {
					$data = $message->body;

					// ... 

					return ConsumerInterface::MSG_ACK;

				} catch (\ResourceNotFound $e) {
					// might be cause by the asynchronous nature of message queues
					// - a resource might not yet be accessible
					// or it might have been deleted already

					// basically you can choose if this is OK (ACK or REJECT,
					// depending on semantics), or if you want to try later
					// again (REJECT_REQUEUE)

					return ConsumerInterface::REJECT;

				} catch (\UnexpectedBusinessLogicException $e) {
					// situation which you are not sure, why it happens,
					// but you need to investigate further (perhaps with more logging)
					// and perhaps throw away these messages, because
					// it might clutter the queue

					$consumerHandler->log(LogLevel::ERROR, 'My custom message');
					$consumerHandler->logException($e);

					return ConsumerInterface::MSG_REJECT;

				} catch (\UnexpectedException $e) {
					// situation where you might need to decide further
					// what to do in the catch block

					if ($e->getCode() === 123) {
						return ConsumerInterface::MSG_REJECT;
					}

					throw $e; // handle with default "catchall"

				} catch (\ExpectedBusinessLogicException $e) {
					// situation where you can solve it in a different way

					// call a service

					return ConsumerInterface::MSG_ACK;

				} catch (\ConnectionTimeoutCustomException $e) {
					// situation where the application would need a restart
					// to reinitialize for example a connection

					// this would happen also by default in the "catchall",
					// but you might want to handle a specific case separately,
					// for example not to log these exceptions

					// this will stop the consumer
					$consumerHandler->stopConsumer('Connection timeout');

					return ConsumerInterface::MSG_REJECT_REQUEUE;

				}
			}
		);
	}

}

配置

配置结构,并列出默认值

# config/packages/rabbit_mq_consumer_handler.yml
rabbit_mq_consumer_handler:
    # Generally how long is needed for the program to run, to be considered started,
    # achieved by sleeping when stopping prematurely
    stop_consumer_sleep_seconds: 1
    logger:
        # Logger service ID, which instance will be used to log messages and exceptions
        service_id: 'logger'
    entity_manager:
        # EntityManager service ID, which instance is used withing the consumer
        service_id: 'doctrine.orm.default_entity_manager'
        # Clear EntityManager before processing message
        clear_em_before_message: true
    consumers:
        # configuration specifically for this consumer
        <my_consumer_name>:
            # identical structure as the options above

安装

使用Composer安装包mhujer/rabbit-mq-consumer-handler-bundle

composer require mhujer/rabbit-mq-consumer-handler-bundle

在您的应用程序内核中注册此包

// config/bundles.php
return [
	// ...
	VasekPurchart\RabbitMqConsumerHandlerBundle\RabbitMqConsumerHandlerBundle::class => ['all' => true],
];