vasek-purchart/rabbit-mq-consumer-handler-bundle

以安全和有效的方式处理 RabbitMQ 消费者中的消息

1.2 2018-12-28 13:02 UTC

README

以安全和有效的方式处理 RabbitMQ 消费者中的消息

注意: 此包假定您正在使用 RabbitMqBundle

消息队列消费者通常需要长时间运行的过程,在它们终止之前应该处理许多不同的消息。在理想情况下,它们将无限期地运行。但由于我们并不生活在一个理想的世界,错误不可避免地会发生。这些通常包括

  • 预期的应用程序异常,
  • 意外的应用程序异常,
  • 其他异常和错误,如连接中断等,
  • 内存泄漏和其他意外行为。

此包的目的是封装对这些状态的处理器,自动化可以自动化的那些,并提供舒适的方式来处理剩余的那些。

此包可以自动处理

  • 在未捕获的异常发生时停止消费者(以便它可以安全地重新启动)
  • 记录未捕获的异常,
  • 在处理消息之前清除 Doctrine EntityManager,
  • 当 Doctrine EntityManager 关闭时停止消费者。

用法

为了获得自动化处理的全部好处,您只需要通过 ConsumerHandler 运行消息处理,因此标准消费者可以看起来像这样

<?php

declare(strict_types = 1);

namespace Example;

use OldSound\RabbitMqBundle\RabbitMq\ConsumerInterface;
use PhpAmqpLib\Message\AMQPMessage;
use VasekPurchart\RabbitMqConsumerHandlerBundle\ConsumerHandler\ConsumerHandler;

class ExampleConsumer implements \OldSound\RabbitMqBundle\RabbitMq\ConsumerInterface
{

	/** @var \VasekPurchart\RabbitMqConsumerHandlerBundle\ConsumerHandler\ConsumerHandler */
	private $consumerHandler;

	public function __construct(
		ConsumerHandler $consumerHandler
	)
	{
		$this->consumerHandler = $consumerHandler;
	}

	public function execute(AMQPMessage $message): int
	{
		return $this->consumerHandler->processMessage(function () use ($message): int {
			$data = $message->body;

			// do your magic with $data, basically anything you would put in the consumer
			// without this bundle, apart from the stuff this bundle handles automatically 

			return ConsumerInterface::MSG_ACK;
		});
	}

}

此包将为您的每个消费者创建一个 ConsumerHandler 实例,因为它需要访问特定的 OldSound\RabbitMqBundle\RabbitMq\DequeuerInterface 实例,它需要控制从配置队列中消费消息。

假设您的消费者在 old_sound_rabbit_mq 配置中称为 example,则 vasek_purchart.rabbit_mq_consumer_handler.consumer_handler.id.example 服务将被准备,因此您只需将此实例传递给您的消费者即可

old_sound_rabbit_mq:
    consumers:
        example:
            callback: 'Example\ExampleConsumer'
            # ...

services:
    Example\ExampleConsumer:
        arguments:
            $consumerHandler: '@vasek_purchart.rabbit_mq_consumer_handler.consumer_handler.id.example'

重启消费者

对于消费者,您通常需要一些东西来在它们失败时保持它们运行。这通常是通过某种类型的守护进程(例如 supervisord)实现的,它会为您运行消费者,监视它们是否正在运行,并在它们未运行时根据您的配置重新启动它们。

为了使工具能够可靠地重新启动消费者,它首先需要能够确定消费者是否已经 启动,这样它就不会陷入启动循环。通常的配置是在无法正确启动后尝试几次后不再启动程序。在 suprvisord 中,程序在 startsecs 指定的时间后被认为是启动的。

此包确保,每次消费者由于未捕获的异常或错误而关闭时,消费者都至少运行了这么长时间,它会通过 stop_consumer_sleep_seconds 睡眠,该值应配置与 startsecs 相同。这意味着该行为可以通过 supervisord 正确处理 - 当它因处理消息而失败时重新启动消费者,但不会无限期地重新启动它,如果应用程序甚至无法启动。

异常处理

以下是一些您可能会遇到的常见情况示例

<?php

declare(strict_types = 1);

namespace Example;

use OldSound\RabbitMqBundle\RabbitMq\ConsumerInterface;
use PhpAmqpLib\Message\AMQPMessage;
use Psr\Log\LogLevel;
use VasekPurchart\RabbitMqConsumerHandlerBundle\ConsumerHandler\ConsumerHandler;

class ExampleConsumer implements \OldSound\RabbitMqBundle\RabbitMq\ConsumerInterface
{

	/** @var \VasekPurchart\RabbitMqConsumerHandlerBundle\ConsumerHandler\ConsumerHandler */
	private $consumerHandler;

	public function __construct(
		ConsumerHandler $consumerHandler
	)
	{
		$this->consumerHandler = $consumerHandler;
	}

	public function execute(AMQPMessage $message): int
	{
		return $this->consumerHandler->processMessage(
			function (ConsumerHandler $consumerHandler) use ($message): int {
				// the correct ConsumerHandler is passed into the callback,
				// so you can use it for custom logging etc

				try {
					$data = $message->body;

					// ... 

					return ConsumerInterface::MSG_ACK;

				} catch (\ResourceNotFound $e) {
					// might be cause by the asynchronous nature of message queues
					// - a resource might not yet be accessible
					// or it might have been deleted already

					// basically you can choose if this is OK (ACK or REJECT,
					// depending on semantics), or if you want to try later
					// again (REJECT_REQUEUE)

					return ConsumerInterface::REJECT;

				} catch (\UnexpectedBusinessLogicException $e) {
					// situation which you are not sure, why it happens,
					// but you need to investigate further (perhaps with more logging)
					// and perhaps throw away these messages, because
					// it might clutter the queue

					$consumerHandler->log(LogLevel::ERROR, 'My custom message');
					$consumerHandler->logException($e);

					return ConsumerInterface::MSG_REJECT;

				} catch (\UnexpectedException $e) {
					// situation where you might need to decide further
					// what to do in the catch block

					if ($e->getCode() === 123) {
						return ConsumerInterface::MSG_REJECT;
					}

					throw $e; // handle with default "catchall"

				} catch (\ExpectedBusinessLogicException $e) {
					// situation where you can solve it in a different way

					// call a service

					return ConsumerInterface::MSG_ACK;

				} catch (\ConnectionTimeoutCustomException $e) {
					// situation where the application would need a restart
					// to reinitialize for example a connection

					// this would happen also by default in the "catchall",
					// but you might want to handle a specific case separately,
					// for example not to log these exceptions

					// this will stop the consumer
					$consumerHandler->stopConsumer('Connection timeout');

					return ConsumerInterface::MSG_REJECT_REQUEUE;

				}
			}
		);
	}

}

配置

配置结构及其默认值列表

# config/packages/rabbit_mq_consumer_handler.yml
rabbit_mq_consumer_handler:
    # Generally how long is needed for the program to run, to be considered started,
    # achieved by sleeping when stopping prematurely
    stop_consumer_sleep_seconds: 1
    logger:
        # Logger service ID, which instance will be used to log messages and exceptions
        service_id: 'logger'
    entity_manager:
        # EntityManager service ID, which instance is used withing the consumer
        service_id: 'doctrine.orm.default_entity_manager'
        # Clear EntityManager before processing message
        clear_em_before_message: true
    consumers:
        # configuration specifically for this consumer
        <my_consumer_name>:
            # identical structure as the options above

安装

使用 vasek-purchart/rabbit-mq-consumer-handler-bundle 通过 Composer 安装软件包

composer require vasek-purchart/rabbit-mq-consumer-handler-bundle

在您的应用程序内核中注册该捆绑包

// config/bundles.php
return [
	// ...
	VasekPurchart\RabbitMqConsumerHandlerBundle\RabbitMqConsumerHandlerBundle::class => ['all' => true],
];