mhujer / rabbit-mq-database-transaction-producer-bundle
在数据库事务提交时向 RabbitMQ 生产者发布消息
Requires
- php: ~7.4 || ~8.0
- doctrine/doctrine-bundle: ~2.2
- php-amqplib/rabbitmq-bundle: ~2.2
- psr/log: ~1.0
- symfony/config: ~4.4|^5.4|^6.0
- symfony/dependency-injection: ~4.4|^5.4|^6.0
- symfony/http-kernel: ~4.4|^5.4|^6.0
- symfony/yaml: ~4.4|^5.4|^6.0
Requires (Dev)
- ext-pdo_sqlite: *
- consistence-community/coding-standard: 3.11.1
- doctrine/common: ~3.0
- doctrine/dbal: ~3.0
- matthiasnoback/symfony-dependency-injection-test: 4.3.0
- phing/phing: 2.17.1
- php-parallel-lint/php-parallel-lint: 1.3.1
- phpunit/phpunit: 9.5.13
This package is auto-updated.
Last update: 2024-08-28 20:53:09 UTC
README
此包是 vasek-purchart/rabbit-mq-database-transaction-producer-bundle
的分支,为更新的 PHP 和 Symfony 版本维护。
在数据库事务提交时向 RabbitMQ 生产者发布消息
注意:此包期望您正在使用 Doctrine DBAL & ORM Bundle 和 RabbitMqBundle
问题
数据库事务 确保一系列操作要么全部“完成”,要么一个也不“完成”。这对于大多数应用程序来说非常重要,因为否则它们的状态会变得破坏。这就是为什么大多数数据库系统都提供事务(至少在某种程度上)。当使用多个系统时,新问题出现了,因为没有通常的方式可以确保跨越所有系统的操作的事务行为。
此包为解决使用 RabbitMQ 和 SQL 数据库(通过 Doctrine)时由此问题引起的最常见情况提供了解决方案。SQL 数据库和 RabbitMQ 都有自己的事务,但无法扩展系统间的事务,这可能导致许多错误情况,通常
- 您向 RabbitMQ 队列发布了一个 ID,该 ID 应异步处理,但它从未提交到数据库。
- 您向 RabbitMQ 队列发布了一个 ID,该 ID 应异步处理,但它 尚未 提交到数据库。
- 所有内容都已提交到数据库,但伴随的消息从未发送到队列。
如果您正在使用嵌套事务,这种情况更加普遍,因为那时仅通过查看“本地”代码很难判断实际提交事务的时间。
这正是使用 Doctrine ORM 的情况,因为即使您调用 flush
,您也不能确定没有打开的事务包围这个调用。
此包的功能
此包并不声称“解决”问题,因为这几乎是不可能的,但它试图缓解由问题引起的大部分实际场景。当向 RabbitMQ 发布消息时,此包会检查数据库连接上是否存在打开的事务(包括嵌套),如果没有,它将立即发送消息。但是,当它检测到存在打开的事务时,它将存储消息,并且只有在所有连接上的事务都提交后才会发送。
以下是一个示例,如何编写代码,以避免上一节中提到的所有情况
<?php use Doctrine\ORM\EntityManager; use OldSound\RabbitMqBundle\RabbitMq\ProducerInterface as Producer; class ImportFacade { /** @var \OldSound\RabbitMqBundle\RabbitMq\ProducerInterface */ private $importProcessItemRabbitMqProducer; /** @var \Doctrine\ORM\EntityManager */ private $entityManager; public function __construct( Producer $importProcessItemRabbitMqProducer, EntityManager $entityManager ) { $this->importProcessItemRabbitMqProducer = $importProcessItemRabbitMqProducer; $this->entityManager = $entityManager; } public function import() { // $items = ... $this->entityManager->transactional(function () use ($items) { $this->entityManager->flush(); foreach ($items as $item) { $this->importProcessItemRabbitMqProducer->publish($item->getId()); } }); } }
示例表示一个导入,它被分成项目,然后可以逐个异步处理。所有持久化相关操作都使用 EntityManager::transactional()
包裹在事务中。首先,EntityManager
被刷新,这意味着如果存储数据时出现任何错误,将抛出异常,并且消息永远不会发布到 RabbitMQ。如果 Doctrine 刷新的数据是正常的,那么消息将立即发布或等待所有嵌套事务提交。
这确保了在发布消息之前,基于 RabbitMQ 消息的数据始终存在于数据库中,从而解决了上一节中提到的第一个和第二个情况。
第三种情况——数据已保存到数据库,但RabbitMQ消息从未发布——仍然可能发生,这通常发生在提交后和消息发布之间的应用程序出现问题。但鉴于发布逻辑非常简单,并且没有涉及业务逻辑,这种情况几乎不会发生,最常见的情况可能是RabbitMQ实例不可达。
配置
配置结构及默认值列表
# config/packages/rabbit_mq_database_transaction_producer.yaml rabbit_mq_database_transaction_producer: # Whether custom connection class for DBAL is used in the project, see below for details. # When this is false, custom connection class from this bundle is used. custom_connection_class: false
自定义连接类
Doctrine DBAL无法通过组合添加功能到Doctrine\DBAL\Connection
,以便可以从多个来源组合添加的功能。唯一扩展功能的方法是通过扩展原始类,并配置Doctrine使用该类,而不是使用dbal.wrapper_class
配置选项。
如果您已经使用自定义连接实现,您必须确保它实现了VasekPurchart\RabbitMqDatabaseTransactionProducerBundle\Doctrine\Connection\AfterCommitCallbacksConnection
,以便与该包兼容,即实现addAfterCommitCallback
方法,并确保在事务提交后触发回调。
如果您没有使用任何自定义实现,此包将提供实现,它添加了回调行为,并且还提供由调用回调引发的异常的日志记录。
服务覆盖
您还可以覆盖内部使用的服务,例如,如果您使用非标准日志记录器,您可以使用alias
提供自定义实例。
services: my_logger: class: Monolog\Logger arguments: - 'my_channel' rabbit_mq_database_transaction_producer_bundle.logger: @my_logger
安装
使用Composer
安装包mhujer/rabbit-mq-database-transaction-producer-bundle
composer require mhujer/rabbit-mq-database-transaction-producer-bundle
在您的应用程序中注册该包
// config/bundles.php return [ // ... VasekPurchart\RabbitMqDatabaseTransactionProducerBundle\RabbitMqDatabaseTransactionProducerBundle::class => ['all' => true], ];