vascek-purchart/rabbit-mq-database-transaction-producer-bundle

当数据库事务提交时向 RabbitMQ 生产者发布消息

2.0 2018-12-29 13:25 UTC

README

当数据库事务提交时向 RabbitMQ 生产者发布消息

注意:此包期望您正在使用 Doctrine DBAL & ORM BundleRabbitMqBundle

问题

数据库事务 确保一系列操作要么全部“完成”,要么一个也不“完成”。这对于大多数应用程序来说非常重要,因为否则它们的状态就会变得混乱。这就是为什么大多数数据库系统都提供事务(至少在某种程度上)。当您使用多个系统时,会出现新的问题,因为通常没有方法来确保跨所有系统的操作具有事务性。

此包提供了解决方案,以减轻使用 RabbitMQ 和 SQL 数据库(通过 Doctrine)时由此问题引起的最常见情况。SQL 数据库和 RabbitMQ 都有自己的事务,但无法扩展系统间的事务,这可能导致许多错误情况,通常

  1. 您将 ID 发布到 RabbitMQ 队列中,该 ID 应异步处理,但它从未提交到数据库。
  2. 您将 ID 发布到 RabbitMQ 队列中,该 ID 应异步处理,但它尚未提交到数据库。
  3. 一切已提交到数据库,但相关的消息从未发送到队列。

如果您使用嵌套事务,这会更加常见,因为那时仅通过查看“本地”代码很难判断实际何时提交事务。

这正是使用 Doctrine ORM 时的情况,因为即使您调用 flush,您也无法确定是否有未提交的事务包裹在此调用。

此包做什么

此包不声称“解决”问题,因为几乎不可能,但它试图减轻由问题引起的几乎所有实际情况。当向 RabbitMQ 发布消息时,此包将检查数据库连接上是否有打开的事务(包括嵌套),如果没有,则立即发送消息。但是,当它检测到有打开的事务时,则将存储消息,并且仅在所有连接上的事务提交后才会发送消息。

当像以下示例中那样编写代码时,上一节中提到的所有情况都不应引起问题

<?php

use Doctrine\ORM\EntityManager;
use OldSound\RabbitMqBundle\RabbitMq\ProducerInterface as Producer;

class ImportFacade
{

	/** @var \OldSound\RabbitMqBundle\RabbitMq\ProducerInterface */
	private $importProcessItemRabbitMqProducer;

	/** @var \Doctrine\ORM\EntityManager */
	private $entityManager;

	public function __construct(
		Producer $importProcessItemRabbitMqProducer,
		EntityManager $entityManager
	)
	{
		$this->importProcessItemRabbitMqProducer = $importProcessItemRabbitMqProducer;
		$this->entityManager = $entityManager;
	}

	public function import()
	{
		// $items = ...

		$this->entityManager->transactional(function () use ($items) {
			$this->entityManager->flush();

			foreach ($items as $item) {
				$this->importProcessItemRabbitMqProducer->publish($item->getId());
			}
		});
	}

}

该示例表示一个导入操作,它被拆分为项目,然后可以逐个异步处理。所有持久化相关操作都使用 EntityManager::transactional() 包裹在事务中。首先,刷新 EntityManager,这意味着如果存储数据时出现任何错误,将抛出异常,并且消息永远不会发布到 RabbitMQ。如果 Doctrine 刷新的数据正常,则消息将立即发布或等到所有嵌套事务提交后发布。

这确保了在发布消息之前,基于 RabbitMQ 消息的数据始终存在于数据库中,因此解决了上一节中的前两种情况。

第三种情况——数据已保存到数据库,但RabbitMQ消息从未发布——不幸的是仍然可能发生——这种情况会在提交后和消息发布之间出现应用程序问题。但由于发布逻辑非常简单,且没有涉及业务逻辑,这种情况几乎不会发生,最常见的情况可能是RabbitMQ实例无法访问。

配置

配置结构及默认值列表

# config/packages/rabbit_mq_database_transaction_producer.yaml
rabbit_mq_database_transaction_producer:
    # Whether custom connection class for DBAL is used in the project, see below for details.
    # When this is false, custom connection class from this bundle is used.
    custom_connection_class: false

自定义连接类

Doctrine DBAL无法通过组合添加功能到Doctrine\DBAL\Connection,以便可以从多个来源组合添加的功能。唯一扩展功能的方法是通过扩展原始类,并将Doctrine配置为使用该类而不是使用dbal.wrapper_class配置选项。

如果您已经使用自定义连接实现,您必须确保它实现了VasekPurchart\RabbitMqDatabaseTransactionProducerBundle\Doctrine\Connection\AfterCommitCallbacksConnection,以便与此捆绑包兼容,即实现addAfterCommitCallback方法,并确保回调在事务提交后触发。

如果您没有使用任何自定义实现,此捆绑包将提供实现,该实现添加了回调行为,并在其之上提供由调用的回调引发的异常的日志记录。

服务覆盖

您也可以覆盖内部使用的服务,例如,如果您使用非标准日志记录器,您可以提供带有别名的自定义实例。

services:
    my_logger:
        class: Monolog\Logger
        arguments:
            - 'my_channel'

    rabbit_mq_database_transaction_producer_bundle.logger: @my_logger

安装

使用Composer安装包vasek-purchart/rabbit-mq-database-transaction-producer-bundle

composer require vasek-purchart/rabbit-mq-database-transaction-producer-bundle

在您的应用程序中注册此捆绑包

// config/bundles.php
return [
	// ...
	VasekPurchart\RabbitMqDatabaseTransactionProducerBundle\RabbitMqDatabaseTransactionProducerBundle::class => ['all' => true],
];