robertbaelde/persisting-message-bus

此包的最新版本(0.0.2)没有可用的许可信息。

0.0.2 2023-07-26 08:26 UTC

This package is auto-updated.

Last update: 2024-09-26 10:48:56 UTC


README

此包将提供一个持久化其消息的消息总线。这可以用于与公共事件的跨上下文通信。消息总线将针对一个主题。主题是一组事件类型。这允许消费上下文只知道主题及其事件类型,而不知道它们来自何处。

安装

composer require robertbaelde/persisting-message-bus

使用方法

配置主题

主题是一组消息类,映射到字符串名称。每个主题都必须有一个唯一的名称。

<?php

use Robertbaelde\PersistingMessageBus\BaseTopic;

class TestTopic extends BaseTopic
{
    public const SimpleDomainMessage = SimpleDomainMessage::class;

    public function getMessages(): array
    {
        return [
            'SimpleDomainMessage' => self::SimpleDomainMessage
        ];
    }

    public function getName(): string
    {
        return 'TestTopic';
    }
}

消息必须实现PublicMessage接口

use Robertbaelde\PersistingMessageBus\PublicMessage;

class SimpleDomainMessage implements PublicMessage
{
}

分发消息

为了分发消息,您需要一个消息总线。这可以通过使用主题和消息存储库来构建。

使用此主题,您可以构建一个MessageDispatcher,可用于分发消息。消息分发器可以使用MessageDecorator装饰消息。

$message = new SimpleDomainMessage('bar');

$messageBus = new MessageBus(
    new TestTopic(),
    $this->messageRepository
);

$messageDispatcher = new MessageDispatcher(
    $messageBus,
    new DefaultMessageDecorator(new SystemClock()),
);

$messageDispatcher->dispatch($message);

消费消息

为了消费消息,您需要一个消息总线和一个跟踪消息流偏移量的存储库。Eventsauce的消息消费者可以用作消费者。

$messageConsumer = new MessageConsumer(
    messageBus: $this->messageBus,
    messageConsumerState: new InMemoryMessageConsumerState(),
    messageConsumer: $consumer
);

$messageConsumer->handleNewMessages();

使用消息消费者时,您可能希望在循环中运行handleNewMessages。请确保一次只有一个进程处理新消息。否则,消息可能会被处理两次。

Illuminate存储库

为了持久化消息和消费者状态,提供了2个存储库。

消息的数据库模式

CREATE TABLE IF NOT EXISTS `public_messages` (
  `id` bigint unsigned NOT NULL AUTO_INCREMENT,
  `message_id` varchar (255) NOT NULL,
  `topic` varchar (255) NOT NULL,
  `message_type` varchar (255) NOT NULL,
  `payload` varchar (1200) NOT NULL,
  `headers` varchar (1200) NOT NULL,
  `published_at` timestamp NOT NULL,
  PRIMARY KEY (`id` ASC),
  KEY `topic` (`topic`, `id` ASC)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

消费者的数据库模式

CREATE TABLE IF NOT EXISTS `message_consumer_state` (
    `consumer_name` varchar (255) NOT NULL,
    `cursor` varchar (1200) NOT NULL,
    `last_updated_at` timestamp NOT NULL,
    PRIMARY KEY (`consumer_name`),
    KEY `consumer_name` (`consumer_name`)
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

路线图

  • 添加一个消费者,用于跨服务同步发出HTTP请求
  • 添加相关性与因果关系ID的系统?
  • 消息收件箱模式
  • 消息发件箱模式

许可

MIT许可(MIT)。有关更多信息,请参阅许可文件