mhujer/rabbit-mq-database-transaction-producer-bundle

在数据库事务提交时向 RabbitMQ 生产者发布消息

2.1.1 2022-01-28 15:28 UTC

This package is auto-updated.

Last update: 2024-08-28 20:53:09 UTC


README

此包是 vasek-purchart/rabbit-mq-database-transaction-producer-bundle 的分支,为更新的 PHP 和 Symfony 版本维护。

在数据库事务提交时向 RabbitMQ 生产者发布消息

注意:此包期望您正在使用 Doctrine DBAL & ORM BundleRabbitMqBundle

问题

数据库事务 确保一系列操作要么全部“完成”,要么一个也不“完成”。这对于大多数应用程序来说非常重要,因为否则它们的状态会变得破坏。这就是为什么大多数数据库系统都提供事务(至少在某种程度上)。当使用多个系统时,新问题出现了,因为没有通常的方式可以确保跨越所有系统的操作的事务行为。

此包为解决使用 RabbitMQ 和 SQL 数据库(通过 Doctrine)时由此问题引起的最常见情况提供了解决方案。SQL 数据库和 RabbitMQ 都有自己的事务,但无法扩展系统间的事务,这可能导致许多错误情况,通常

  1. 您向 RabbitMQ 队列发布了一个 ID,该 ID 应异步处理,但它从未提交到数据库。
  2. 您向 RabbitMQ 队列发布了一个 ID,该 ID 应异步处理,但它 尚未 提交到数据库。
  3. 所有内容都已提交到数据库,但伴随的消息从未发送到队列。

如果您正在使用嵌套事务,这种情况更加普遍,因为那时仅通过查看“本地”代码很难判断实际提交事务的时间。

这正是使用 Doctrine ORM 的情况,因为即使您调用 flush,您也不能确定没有打开的事务包围这个调用。

此包的功能

此包并不声称“解决”问题,因为这几乎是不可能的,但它试图缓解由问题引起的大部分实际场景。当向 RabbitMQ 发布消息时,此包会检查数据库连接上是否存在打开的事务(包括嵌套),如果没有,它将立即发送消息。但是,当它检测到存在打开的事务时,它将存储消息,并且只有在所有连接上的事务都提交后才会发送。

以下是一个示例,如何编写代码,以避免上一节中提到的所有情况

<?php

use Doctrine\ORM\EntityManager;
use OldSound\RabbitMqBundle\RabbitMq\ProducerInterface as Producer;

class ImportFacade
{

	/** @var \OldSound\RabbitMqBundle\RabbitMq\ProducerInterface */
	private $importProcessItemRabbitMqProducer;

	/** @var \Doctrine\ORM\EntityManager */
	private $entityManager;

	public function __construct(
		Producer $importProcessItemRabbitMqProducer,
		EntityManager $entityManager
	)
	{
		$this->importProcessItemRabbitMqProducer = $importProcessItemRabbitMqProducer;
		$this->entityManager = $entityManager;
	}

	public function import()
	{
		// $items = ...

		$this->entityManager->transactional(function () use ($items) {
			$this->entityManager->flush();

			foreach ($items as $item) {
				$this->importProcessItemRabbitMqProducer->publish($item->getId());
			}
		});
	}

}

示例表示一个导入,它被分成项目,然后可以逐个异步处理。所有持久化相关操作都使用 EntityManager::transactional() 包裹在事务中。首先,EntityManager 被刷新,这意味着如果存储数据时出现任何错误,将抛出异常,并且消息永远不会发布到 RabbitMQ。如果 Doctrine 刷新的数据是正常的,那么消息将立即发布或等待所有嵌套事务提交。

这确保了在发布消息之前,基于 RabbitMQ 消息的数据始终存在于数据库中,从而解决了上一节中提到的第一个和第二个情况。

第三种情况——数据已保存到数据库,但RabbitMQ消息从未发布——仍然可能发生,这通常发生在提交后和消息发布之间的应用程序出现问题。但鉴于发布逻辑非常简单,并且没有涉及业务逻辑,这种情况几乎不会发生,最常见的情况可能是RabbitMQ实例不可达。

配置

配置结构及默认值列表

# config/packages/rabbit_mq_database_transaction_producer.yaml
rabbit_mq_database_transaction_producer:
    # Whether custom connection class for DBAL is used in the project, see below for details.
    # When this is false, custom connection class from this bundle is used.
    custom_connection_class: false

自定义连接类

Doctrine DBAL无法通过组合添加功能到Doctrine\DBAL\Connection,以便可以从多个来源组合添加的功能。唯一扩展功能的方法是通过扩展原始类,并配置Doctrine使用该类,而不是使用dbal.wrapper_class配置选项。

如果您已经使用自定义连接实现,您必须确保它实现了VasekPurchart\RabbitMqDatabaseTransactionProducerBundle\Doctrine\Connection\AfterCommitCallbacksConnection,以便与该包兼容,即实现addAfterCommitCallback方法,并确保在事务提交后触发回调。

如果您没有使用任何自定义实现,此包将提供实现,它添加了回调行为,并且还提供由调用回调引发的异常的日志记录。

服务覆盖

您还可以覆盖内部使用的服务,例如,如果您使用非标准日志记录器,您可以使用alias提供自定义实例。

services:
    my_logger:
        class: Monolog\Logger
        arguments:
            - 'my_channel'

    rabbit_mq_database_transaction_producer_bundle.logger: @my_logger

安装

使用Composer安装包mhujer/rabbit-mq-database-transaction-producer-bundle

composer require mhujer/rabbit-mq-database-transaction-producer-bundle

在您的应用程序中注册该包

// config/bundles.php
return [
	// ...
	VasekPurchart\RabbitMqDatabaseTransactionProducerBundle\RabbitMqDatabaseTransactionProducerBundle::class => ['all' => true],
];