robertbaelde / persisting-message-bus
此包的最新版本(0.0.2)没有可用的许可信息。
0.0.2
2023-07-26 08:26 UTC
Requires
- eventsauce/backoff: ^1.1
- eventsauce/eventsauce: ^3
- illuminate/database: ^8|^9|^10
- ramsey/uuid: ^4
Requires (Dev)
- phpunit/phpunit: ^9.5
This package is auto-updated.
Last update: 2024-09-26 10:48:56 UTC
README
此包将提供一个持久化其消息的消息总线。这可以用于与公共事件的跨上下文通信。消息总线将针对一个主题。主题是一组事件类型。这允许消费上下文只知道主题及其事件类型,而不知道它们来自何处。
安装
composer require robertbaelde/persisting-message-bus
使用方法
配置主题
主题是一组消息类,映射到字符串名称。每个主题都必须有一个唯一的名称。
<?php use Robertbaelde\PersistingMessageBus\BaseTopic; class TestTopic extends BaseTopic { public const SimpleDomainMessage = SimpleDomainMessage::class; public function getMessages(): array { return [ 'SimpleDomainMessage' => self::SimpleDomainMessage ]; } public function getName(): string { return 'TestTopic'; } }
消息必须实现PublicMessage接口
use Robertbaelde\PersistingMessageBus\PublicMessage; class SimpleDomainMessage implements PublicMessage { }
分发消息
为了分发消息,您需要一个消息总线。这可以通过使用主题和消息存储库来构建。
使用此主题,您可以构建一个MessageDispatcher,可用于分发消息。消息分发器可以使用MessageDecorator装饰消息。
$message = new SimpleDomainMessage('bar'); $messageBus = new MessageBus( new TestTopic(), $this->messageRepository ); $messageDispatcher = new MessageDispatcher( $messageBus, new DefaultMessageDecorator(new SystemClock()), ); $messageDispatcher->dispatch($message);
消费消息
为了消费消息,您需要一个消息总线和一个跟踪消息流偏移量的存储库。Eventsauce的消息消费者可以用作消费者。
$messageConsumer = new MessageConsumer( messageBus: $this->messageBus, messageConsumerState: new InMemoryMessageConsumerState(), messageConsumer: $consumer ); $messageConsumer->handleNewMessages();
使用消息消费者时,您可能希望在循环中运行handleNewMessages。请确保一次只有一个进程处理新消息。否则,消息可能会被处理两次。
Illuminate存储库
为了持久化消息和消费者状态,提供了2个存储库。
消息的数据库模式
CREATE TABLE IF NOT EXISTS `public_messages` ( `id` bigint unsigned NOT NULL AUTO_INCREMENT, `message_id` varchar (255) NOT NULL, `topic` varchar (255) NOT NULL, `message_type` varchar (255) NOT NULL, `payload` varchar (1200) NOT NULL, `headers` varchar (1200) NOT NULL, `published_at` timestamp NOT NULL, PRIMARY KEY (`id` ASC), KEY `topic` (`topic`, `id` ASC) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
消费者的数据库模式
CREATE TABLE IF NOT EXISTS `message_consumer_state` ( `consumer_name` varchar (255) NOT NULL, `cursor` varchar (1200) NOT NULL, `last_updated_at` timestamp NOT NULL, PRIMARY KEY (`consumer_name`), KEY `consumer_name` (`consumer_name`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
路线图
- 添加一个消费者,用于跨服务同步发出HTTP请求
- 添加相关性与因果关系ID的系统?
- 消息收件箱模式
- 消息发件箱模式
许可
MIT许可(MIT)。有关更多信息,请参阅许可文件。