vascek-purchart / rabbit-mq-database-transaction-producer-bundle
当数据库事务提交时向 RabbitMQ 生产者发布消息
Requires
- php: ~7.2
- doctrine/doctrine-bundle: ~1.3
- php-amqplib/rabbitmq-bundle: ~1.9
- psr/log: ~1.0
- symfony/config: ~3.0|~4.0
- symfony/dependency-injection: ~3.2|~4.0
- symfony/http-kernel: ~3.0|~4.0
- symfony/yaml: ~3.0|~4.0
Requires (Dev)
This package is auto-updated.
Last update: 2024-09-13 00:48:25 UTC
README
当数据库事务提交时向 RabbitMQ 生产者发布消息
注意:此包期望您正在使用 Doctrine DBAL & ORM Bundle 和 RabbitMqBundle
问题
数据库事务 确保一系列操作要么全部“完成”,要么一个也不“完成”。这对于大多数应用程序来说非常重要,因为否则它们的状态就会变得混乱。这就是为什么大多数数据库系统都提供事务(至少在某种程度上)。当您使用多个系统时,会出现新的问题,因为通常没有方法来确保跨所有系统的操作具有事务性。
此包提供了解决方案,以减轻使用 RabbitMQ 和 SQL 数据库(通过 Doctrine)时由此问题引起的最常见情况。SQL 数据库和 RabbitMQ 都有自己的事务,但无法扩展系统间的事务,这可能导致许多错误情况,通常
- 您将 ID 发布到 RabbitMQ 队列中,该 ID 应异步处理,但它从未提交到数据库。
- 您将 ID 发布到 RabbitMQ 队列中,该 ID 应异步处理,但它尚未提交到数据库。
- 一切已提交到数据库,但相关的消息从未发送到队列。
如果您使用嵌套事务,这会更加常见,因为那时仅通过查看“本地”代码很难判断实际何时提交事务。
这正是使用 Doctrine ORM 时的情况,因为即使您调用 flush
,您也无法确定是否有未提交的事务包裹在此调用。
此包做什么
此包不声称“解决”问题,因为几乎不可能,但它试图减轻由问题引起的几乎所有实际情况。当向 RabbitMQ 发布消息时,此包将检查数据库连接上是否有打开的事务(包括嵌套),如果没有,则立即发送消息。但是,当它检测到有打开的事务时,则将存储消息,并且仅在所有连接上的事务提交后才会发送消息。
当像以下示例中那样编写代码时,上一节中提到的所有情况都不应引起问题
<?php use Doctrine\ORM\EntityManager; use OldSound\RabbitMqBundle\RabbitMq\ProducerInterface as Producer; class ImportFacade { /** @var \OldSound\RabbitMqBundle\RabbitMq\ProducerInterface */ private $importProcessItemRabbitMqProducer; /** @var \Doctrine\ORM\EntityManager */ private $entityManager; public function __construct( Producer $importProcessItemRabbitMqProducer, EntityManager $entityManager ) { $this->importProcessItemRabbitMqProducer = $importProcessItemRabbitMqProducer; $this->entityManager = $entityManager; } public function import() { // $items = ... $this->entityManager->transactional(function () use ($items) { $this->entityManager->flush(); foreach ($items as $item) { $this->importProcessItemRabbitMqProducer->publish($item->getId()); } }); } }
该示例表示一个导入操作,它被拆分为项目,然后可以逐个异步处理。所有持久化相关操作都使用 EntityManager::transactional()
包裹在事务中。首先,刷新 EntityManager
,这意味着如果存储数据时出现任何错误,将抛出异常,并且消息永远不会发布到 RabbitMQ。如果 Doctrine 刷新的数据正常,则消息将立即发布或等到所有嵌套事务提交后发布。
这确保了在发布消息之前,基于 RabbitMQ 消息的数据始终存在于数据库中,因此解决了上一节中的前两种情况。
第三种情况——数据已保存到数据库,但RabbitMQ消息从未发布——不幸的是仍然可能发生——这种情况会在提交后和消息发布之间出现应用程序问题。但由于发布逻辑非常简单,且没有涉及业务逻辑,这种情况几乎不会发生,最常见的情况可能是RabbitMQ实例无法访问。
配置
配置结构及默认值列表
# config/packages/rabbit_mq_database_transaction_producer.yaml rabbit_mq_database_transaction_producer: # Whether custom connection class for DBAL is used in the project, see below for details. # When this is false, custom connection class from this bundle is used. custom_connection_class: false
自定义连接类
Doctrine DBAL无法通过组合添加功能到Doctrine\DBAL\Connection
,以便可以从多个来源组合添加的功能。唯一扩展功能的方法是通过扩展原始类,并将Doctrine配置为使用该类而不是使用dbal.wrapper_class
配置选项。
如果您已经使用自定义连接实现,您必须确保它实现了VasekPurchart\RabbitMqDatabaseTransactionProducerBundle\Doctrine\Connection\AfterCommitCallbacksConnection
,以便与此捆绑包兼容,即实现addAfterCommitCallback
方法,并确保回调在事务提交后触发。
如果您没有使用任何自定义实现,此捆绑包将提供实现,该实现添加了回调行为,并在其之上提供由调用的回调引发的异常的日志记录。
服务覆盖
您也可以覆盖内部使用的服务,例如,如果您使用非标准日志记录器,您可以提供带有别名的自定义实例。
services: my_logger: class: Monolog\Logger arguments: - 'my_channel' rabbit_mq_database_transaction_producer_bundle.logger: @my_logger
安装
使用Composer
安装包vasek-purchart/rabbit-mq-database-transaction-producer-bundle
composer require vasek-purchart/rabbit-mq-database-transaction-producer-bundle
在您的应用程序中注册此捆绑包
// config/bundles.php return [ // ... VasekPurchart\RabbitMqDatabaseTransactionProducerBundle\RabbitMqDatabaseTransactionProducerBundle::class => ['all' => true], ];