bytespin / messenger-dedupe-bundle

提供在 Symfony messenger 和 doctrine 传输使用时,messenger/messages 去重功能

1.0.14 2024-09-02 17:21 UTC

This package is auto-updated.

Last update: 2024-10-02 17:29:46 UTC


README

版权 (c) 2023 Greg LAMY greg@bytespin.net

这是一个托管在 GitHub 上的公共项目: https://github.com/ByteSpin/MessengerDedupeBundle

这最初是作为一个 ETL 项目的部分开发的。

ByteSpin/MessengerDedupeBundle 是一个 Symfony 6.3 bundle,旨在帮助管理使用 Doctrine 传输时的 messenger_messages 唯一性。

注意

此项目仍处于 alpha 状态,尚未在其父项目中完全测试。

请随时提交错误和/或拉取请求!

您可以查看 CHANGELOG 以查看最新的改进和修复。

只要记住,我想让它尽可能简单!

要求

  • php 8.2+
  • Symfony 6.3+

安装

首先安装包

composer require bytespin/messenger-dedupe-bundle

然后更新数据库模式

php bin/console doctrine:schema:update --force

手动包注册

您需要手动在您的应用程序中注册该包。

为此,请按照以下步骤操作

  1. 在您的 Symfony 应用程序中打开文件 config/bundles.php

  2. 将以下行添加到此文件返回的数组中

        ByteSpin\MessengerDedupeBundle\MessengerDedupeBundle::class => ['all' => true],
  3. 保存文件。您的包现在已注册,并准备好在您的应用程序中使用。

请确保在安装包后使用 Composer 安装包,但在您的应用程序中使用其任何功能之前执行此步骤。

配置

您必须配置要用于 ByteSpin\MessengerDedupeBundle 实体的实体管理器。这必须在安装后执行一次。我们提供了一个脚本来自动化此步骤;请运行

bin/console bytespin:configure-messenger-dedupe

如果您愿意自己完成此操作,请在 doctrine.yaml 中实体管理器的 'mappings:' 键中添加以下行

# src/config/packages/doctrine.yaml
doctrine:
   dbal:
   (...)
   orm:
   (...)
   entity_managers:
      your_entity_manager:
      (...)
      mappings:
         ByteSpin\MessengerDedupeBundle:
         is_bundle: false
         type: attribute
         dir: '%kernel.project_dir%/vendor/bytespin/messenger-dedupe-bundle/src/Entity'
         prefix: ByteSpin\MessengerDedupeBundle\Entity
         alias: MessengerDedupeBundle


重要

如果您的项目包含映射到多个实体管理器的实体,请务必不要在 doctrine 配置中使用 auto_mapping: true。

这会阻止 bundle 中使用的 getManagerForClass() 函数正常工作!

这可能会发生在您决定在多个 symfony 项目之间共享 messenger_messages 表时使用 MessengerDedupeBundle。

在这种情况下

  • 在运行配置脚本时选择正确的实体管理器,
  • 请确保从 doctrine.yaml(或将它设置为 false)中删除 'auto_mapping: true' 键,
  • 请确保您的所有实体都正确映射在 doctrine.yaml 的 'mappings:' 部分

消息去重

此功能通过自定义 Middleware 和 Envelope Stamp,帮助在 Doctrine 传输使用时避免相同的消息(您决定在这种情况下什么相同)在 messenger_messages 表中累积。

用法

您必须在您的 messenger.yaml 配置文件中启用去重 Middleware

# config/packages/messenger.yaml

framework:
    messenger:
        buses:
            messenger.bus.default:
                middleware:
                    - ByteSpin\MessengerDedupeBundle\Middleware\DeduplicationMiddleware

注意

去重 middleware 必须在所有其他自定义或标准 symfony middleware 之前执行

将其放在中间件列表的第一位即可

不要忘记使用以下类并初始化您的消息总线

use ByteSpin\MessengerDedupeBundle\Messenger\Stamp\HashStamp;
use ByteSpin\MessengerDedupeBundle\Processor\HashProcessor;
public function __construct(
        private MessageBusInterface $messageBus,
        private HashProcessor $hashProcessor,
    ) {
    }

当您需要发送消息时,您必须首先计算使此消息独特的哈希值

例如

$messageHash = $this->hashProcessor->makeHash('TheMessageType'.$TheFirstVariable.$TheSecondVariable);

然后您可以使用自定义 stamp 发送您的消息

$this->messageBus->dispatch(
                new Envelope(
                    $message,
                    [
                        new TransportNamesStamp('async'),
                        new HashStamp($messageHash),
                    ]
                )
            );

注意

没有HashStamp的消息将被中间件忽略

这就完了!

当一个消息通过doctrine传输带有HashStamp戳时,去重中间件将首先检查是否存在类似的哈希

  • 如果存在,将返回信封(消息不会被发送以避免重复)
  • 如果没有,将保存哈希并发送消息

事件订阅者负责在消息被处理时删除哈希

多Symfony应用程序之间的使用

在更复杂的架构中,一个Symfony应用程序(让我们称它为主应用程序)可以用来生成一些消息发送到其他远程Symfony应用程序(让我们称它们为远程应用程序)。去重哈希总是存储在主应用程序中。远程应用程序可以消费消息,但去重哈希仍然存储在主应用程序中。

通过一个简单的EventSubscriber监听WorkerMessageHandledEvent和/或WorkerMessageFailedEvent,远程应用程序可以向正确的Master应用程序传输/队列发送特定的RemoveDedupeHash消息以删除去重哈希。

在这种情况下

  • 所有应用程序都必须使用ByteSpin/MessengerDedupeBundle来避免MessageDecodingFailedException
  • 所有应用程序都必须共享兼容的messenger传输/队列配置
  • 该捆绑包提供一个新的MasterStamp,该MasterStamp将被包含在生成的消息中。
  • 该捆绑包还提供一个新的MessageHandler,该MessageHandler监听远程生成的RemoveDedupeHash消息

例如,主应用程序生成一条消息

(...)
$this->messageBus->dispatch(
                new Envelope(
                    $message,
                    [
                        new TransportNamesStamp('remote_async_transport'),
                        new HashStamp($messageHash),
                        new MasterStamp('initiator_async_transport')
                    ]
                )
            );

在远程应用程序上,一个简单的EventSubscriber负责将RemoveHash消息发送到主应用程序

<?php

use ByteSpin\MessengerDedupeBundle\Messenger\Stamp\HashStamp;
use ByteSpin\MessengerDedupeBundle\Messenger\Stamp\InitiatorStamp;
use ByteSpin\MessengerDedupeBundle\Model\RemoveDedupeHash;
use Symfony\Component\EventDispatcher\EventSubscriberInterface;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Event\WorkerMessageFailedEvent;
use Symfony\Component\Messenger\Event\WorkerMessageHandledEvent;
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Messenger\Stamp\TransportNamesStamp;

readonly class MessageHandledOrFailedEventSubscriber implements EventSubscriberInterface
{
    public function __construct(
        private MessageBusInterface $messageBus,

    ) {
    }

    public static function getSubscribedEvents(): array
    {
        return [
            WorkerMessageHandledEvent::class => 'onMessageProcessed',
            WorkerMessageFailedEvent::class => 'onMessageProcessed',
        ];
    }

    public function onMessageProcessed(WorkerMessageHandledEvent $event): void
    {
        // remove hash on remote message initiator, only if a remote master has been defined
        $envelope = $event->getEnvelope();
        $hashStamp = $envelope->last(HashStamp::class);
        $masterStamp = $envelope->last(MasterStamp::class);
        if ($hashStamp && $masterStamp) {
            $transportName = $masterStamp->getMaster();
            $hash = $hashStamp->getHash();
            $this->messageBus->dispatch(
                new Envelope(
                    new RemoveDedupeHash(
                        $hash
                    ),
                    [
                        new TransportNamesStamp($transportName),
                    ]
                )
            );

        }
    }
}

许可

该项目受MIT许可证许可 - 有关详细信息,请参阅LICENSE文件。