bytespin / messenger-dedupe-bundle
提供在 Symfony messenger 和 doctrine 传输使用时,messenger/messages 去重功能
Requires
- php: >=8.2
- ext-readline: *
- doctrine/doctrine-bundle: ^2.9
- doctrine/orm: ^2.10
- symfony/doctrine-messenger: ^7.1
- symfony/flex: ^2
- symfony/framework-bundle: ^7.1
- symfony/messenger: ^7.1
- symfony/validator: ^7.1
README
版权 (c) 2023 Greg LAMY greg@bytespin.net
这是一个托管在 GitHub 上的公共项目: https://github.com/ByteSpin/MessengerDedupeBundle
这最初是作为一个 ETL 项目的部分开发的。
ByteSpin/MessengerDedupeBundle 是一个 Symfony 6.3 bundle,旨在帮助管理使用 Doctrine 传输时的 messenger_messages 唯一性。
注意
此项目仍处于 alpha 状态,尚未在其父项目中完全测试。
请随时提交错误和/或拉取请求!
您可以查看 CHANGELOG 以查看最新的改进和修复。
只要记住,我想让它尽可能简单!
要求
- php 8.2+
- Symfony 6.3+
安装
首先安装包
composer require bytespin/messenger-dedupe-bundle
然后更新数据库模式
php bin/console doctrine:schema:update --force
手动包注册
您需要手动在您的应用程序中注册该包。
为此,请按照以下步骤操作
-
在您的 Symfony 应用程序中打开文件
config/bundles.php
。 -
将以下行添加到此文件返回的数组中
ByteSpin\MessengerDedupeBundle\MessengerDedupeBundle::class => ['all' => true],
-
保存文件。您的包现在已注册,并准备好在您的应用程序中使用。
请确保在安装包后使用 Composer 安装包,但在您的应用程序中使用其任何功能之前执行此步骤。
配置
您必须配置要用于 ByteSpin\MessengerDedupeBundle 实体的实体管理器。这必须在安装后执行一次。我们提供了一个脚本来自动化此步骤;请运行
bin/console bytespin:configure-messenger-dedupe
如果您愿意自己完成此操作,请在 doctrine.yaml 中实体管理器的 'mappings:' 键中添加以下行
# src/config/packages/doctrine.yaml doctrine: dbal: (...) orm: (...) entity_managers: your_entity_manager: (...) mappings: ByteSpin\MessengerDedupeBundle: is_bundle: false type: attribute dir: '%kernel.project_dir%/vendor/bytespin/messenger-dedupe-bundle/src/Entity' prefix: ByteSpin\MessengerDedupeBundle\Entity alias: MessengerDedupeBundle
重要
如果您的项目包含映射到多个实体管理器的实体,请务必不要在 doctrine 配置中使用 auto_mapping: true。
这会阻止 bundle 中使用的 getManagerForClass() 函数正常工作!
这可能会发生在您决定在多个 symfony 项目之间共享 messenger_messages 表时使用 MessengerDedupeBundle。
在这种情况下
- 在运行配置脚本时选择正确的实体管理器,
- 请确保从 doctrine.yaml(或将它设置为 false)中删除 'auto_mapping: true' 键,
- 请确保您的所有实体都正确映射在 doctrine.yaml 的 'mappings:' 部分
消息去重
此功能通过自定义 Middleware 和 Envelope Stamp,帮助在 Doctrine 传输使用时避免相同的消息(您决定在这种情况下什么相同)在 messenger_messages 表中累积。
用法
您必须在您的 messenger.yaml 配置文件中启用去重 Middleware
# config/packages/messenger.yaml framework: messenger: buses: messenger.bus.default: middleware: - ByteSpin\MessengerDedupeBundle\Middleware\DeduplicationMiddleware
注意
去重 middleware 必须在所有其他自定义或标准 symfony middleware 之前执行
将其放在中间件列表的第一位即可
不要忘记使用以下类并初始化您的消息总线
use ByteSpin\MessengerDedupeBundle\Messenger\Stamp\HashStamp; use ByteSpin\MessengerDedupeBundle\Processor\HashProcessor;
public function __construct( private MessageBusInterface $messageBus, private HashProcessor $hashProcessor, ) { }
当您需要发送消息时,您必须首先计算使此消息独特的哈希值
例如
$messageHash = $this->hashProcessor->makeHash('TheMessageType'.$TheFirstVariable.$TheSecondVariable);
然后您可以使用自定义 stamp 发送您的消息
$this->messageBus->dispatch( new Envelope( $message, [ new TransportNamesStamp('async'), new HashStamp($messageHash), ] ) );
注意
没有HashStamp的消息将被中间件忽略
这就完了!
当一个消息通过doctrine传输带有HashStamp戳时,去重中间件将首先检查是否存在类似的哈希
- 如果存在,将返回信封(消息不会被发送以避免重复)
- 如果没有,将保存哈希并发送消息
事件订阅者负责在消息被处理时删除哈希
多Symfony应用程序之间的使用
在更复杂的架构中,一个Symfony应用程序(让我们称它为主应用程序)可以用来生成一些消息发送到其他远程Symfony应用程序(让我们称它们为远程应用程序)。去重哈希总是存储在主应用程序中。远程应用程序可以消费消息,但去重哈希仍然存储在主应用程序中。
通过一个简单的EventSubscriber监听WorkerMessageHandledEvent和/或WorkerMessageFailedEvent,远程应用程序可以向正确的Master应用程序传输/队列发送特定的RemoveDedupeHash消息以删除去重哈希。
在这种情况下
- 所有应用程序都必须使用ByteSpin/MessengerDedupeBundle来避免MessageDecodingFailedException
- 所有应用程序都必须共享兼容的messenger传输/队列配置
- 该捆绑包提供一个新的MasterStamp,该MasterStamp将被包含在生成的消息中。
- 该捆绑包还提供一个新的MessageHandler,该MessageHandler监听远程生成的RemoveDedupeHash消息
例如,主应用程序生成一条消息
(...) $this->messageBus->dispatch( new Envelope( $message, [ new TransportNamesStamp('remote_async_transport'), new HashStamp($messageHash), new MasterStamp('initiator_async_transport') ] ) );
在远程应用程序上,一个简单的EventSubscriber负责将RemoveHash消息发送到主应用程序
<?php use ByteSpin\MessengerDedupeBundle\Messenger\Stamp\HashStamp; use ByteSpin\MessengerDedupeBundle\Messenger\Stamp\InitiatorStamp; use ByteSpin\MessengerDedupeBundle\Model\RemoveDedupeHash; use Symfony\Component\EventDispatcher\EventSubscriberInterface; use Symfony\Component\Messenger\Envelope; use Symfony\Component\Messenger\Event\WorkerMessageFailedEvent; use Symfony\Component\Messenger\Event\WorkerMessageHandledEvent; use Symfony\Component\Messenger\MessageBusInterface; use Symfony\Component\Messenger\Stamp\TransportNamesStamp; readonly class MessageHandledOrFailedEventSubscriber implements EventSubscriberInterface { public function __construct( private MessageBusInterface $messageBus, ) { } public static function getSubscribedEvents(): array { return [ WorkerMessageHandledEvent::class => 'onMessageProcessed', WorkerMessageFailedEvent::class => 'onMessageProcessed', ]; } public function onMessageProcessed(WorkerMessageHandledEvent $event): void { // remove hash on remote message initiator, only if a remote master has been defined $envelope = $event->getEnvelope(); $hashStamp = $envelope->last(HashStamp::class); $masterStamp = $envelope->last(MasterStamp::class); if ($hashStamp && $masterStamp) { $transportName = $masterStamp->getMaster(); $hash = $hashStamp->getHash(); $this->messageBus->dispatch( new Envelope( new RemoveDedupeHash( $hash ), [ new TransportNamesStamp($transportName), ] ) ); } } }
许可
该项目受MIT许可证许可 - 有关详细信息,请参阅LICENSE文件。