sasa-b / command-bus
消息总线模式实现库
Requires
- php: >=8.2
- psr/container: *
- psr/log: *
Requires (Dev)
- phpstan/phpstan: *
- phpstan/phpstan-phpunit: *
- phpunit/phpunit: ^10.0
- ramsey/uuid: ^4.1
README
通常称为 命令总线 模式,但该库在 命令 和 查询 之间做出区分,并允许你在命令处理器中强制不返回任何值,以保持与 CQRS 模式 一致。
这是一个 独立库,仅有的两个依赖项是 PSR-11 容器 和 PSR-3 日志 接口,以允许更好的互操作性。
目录
入门
使用 composer 安装库
composer require sco/message-bus
独立使用
您需要遵循 PSR-4 自动加载标准,并创建自己的服务容器类,这涉及到实现 Psr\Container\ContainerInterface
接口,它可以像库为其测试套件使用的 Sco\MessageBus\Tests\Stub\Container\InMemoryContainer
那样简单,或者您可以通过 composer require 一个遵循 PSR-11 标准 的服务容器库,如 PHP-DI。
require 'vendor/autoload.php' $container = new InMemoryContainer($services) $bus = new \Sco\MessageBus\Bus($container); $bus->dispatch(new FindPostByIdQuery(1))
与 Symfony 框架一起使用
我们可以采用两种方法,装饰库提供的 Bus 类,或者注入服务定位器。有关更多信息,您可以阅读 Symfony 文档
装饰 Bus
我们可以创建一个新的装饰器类,该类将实现 Symfony 的 Symfony\Contracts\Service\ServiceSubscriberInterface
接口
use Sco\MessageBus\Bus; use Sco\MessageBus\Message; use Sco\MessageBus\Result; use Psr\Container\ContainerInterface; use Symfony\Contracts\Service\ServiceSubscriberInterface; class MessageBus implements ServiceSubscriberInterface { private Bus $bus; public function __construct(ContainerInterface $locator) { $this->bus = new Bus($locator, [], null, new UuidV4Identity()); } public function dispatch(\Sco\MessageBus\Message $message): Result { return $this->bus->dispatch($message); } public static function getSubscribedServices(): array { return [ FindPostByIdHandler::class, SavePostHandler::class ]; } }
使用这种方法,您应用程序中的所有处理器都必须添加到由 getSubscribedServices
返回的数组中,因为 Symfony 中的服务默认不是公开的,而且也不应该是公开的,除非您将处理器添加到这个数组中,否则在映射器完成映射后,它将无法找到处理器,并抛出一个服务未找到的容器异常。
注入服务定位器
另一种方法是向库的 Bus 注入一个包含所有处理器的服务定位器。这将在服务注册 yaml 文件中完成。
匿名服务定位器
services: _defaults: autowire: true autoconfigure: true # Anonymous Service Locator Sco\MessageBus\Bus: arguments: $container: !service_locator '@FindPostByIdHandler': 'handler_one' '@SavePostHandler': 'handler_two'
显式服务定位器定义
services: _defaults: autowire: true autoconfigure: true # Explicit Service Locator message_handler_service_locator: class: Symfony\Component\DependencyInjection\ServiceLocator arguments: - '@FindPostByIdHandler' - '@SavePostHandler' Sco\MessageBus\Bus: arguments: $container: '@message_handler_service_locator'
让我们扩展这些配置,并使用 Symfony 服务容器的标签功能自动将处理器添加到 Bus 中
使用 !tagged_locator
services: _defaults: autowire: true autoconfigure: true _instanceof: Sco\MessageBus\Handler: tags: ['message_handler'] # Anonymous Service Locator Sco\MessageBus\Bus: arguments: $container: !tagged_locator message_handler
显式服务定位器定义
services: _defaults: autowire: true autoconfigure: true _instanceof: Sco\MessageBus\Handler: tags: ['message_handler'] # Explicit Service Locator message_handler_service_locator: class: Symfony\Component\DependencyInjection\ServiceLocator arguments: - !tagged_iterator message_handler Sco\MessageBus\Bus: arguments: $container: '@message_handler_service_locator'
与 Laravel 框架一起使用
要有效地与 Laravel 框架一起使用,您只需要在 Laravel 的服务容器 中注册 Bus,并将容器作为参数传递给库的 Bus 类
$this->app->bind(\Sco\MessageBus\Bus::class, function ($app) { return new \Sco\MessageBus\Bus($app); });
核心概念
身份
每个 Command 或 Query 及其相应的 Result 对象组合将被分配一个唯一的身份,例如,一个 Command 和其相应的 Result 对象将有一个身份 00000001
。这可以用于日志记录、审计或调试目的。
默认的标识符生成策略是一个简单的Sco\MessageBus\Identity\RandomString
生成器,以将外部依赖降到最低。要使用其他东西,您可以要求一个库,例如\Sco\MessageBus\Identity
。
use Sco\MessageBus\Identity; class UuidIdentity implements Identity { public function generate() : string { return Uuid::uuid7()->toString(); } }
处理器映射策略
- 按名称映射 - 此策略考虑了完全限定名(FQN),并在类名中要求命令或查询后缀。例如,一个
FindPostByIdQuery
将被映射到FindPostByIdHandler
,或者一个SavePostCommand
将被映射到SavePostHandler
。 - 按属性映射 - 此策略使用PHP属性,在命令/查询类中添加
#[IsCommand(handler: SavePostHandler::class)]
或#[IsQuery(handler: FindPostByIdHandler::class)]
。handler
参数名称可以省略,这取决于您的个人喜好。 - 自定义 - 如果您想创建自己的自定义映射策略,可以通过实现
Sco\MessageBus\Mapper
接口来做到这一点。
中间件
每个命令都将通过一系列中间件。默认情况下,链为空,但库确实提供了一些内置的中间件。
- 事件中间件 - 在处理命令或查询之前和之后,以及失败时引发事件。
- 事务中间件 - 在事务中运行单个命令或查询,
begin
、commit
和rollback
步骤是普通的\Closure
对象,因此您可以使用您喜欢的任何ORM或持久化方法。 - 空结果中间件 - 如果命令结果返回的不仅仅是null,则抛出异常,以强制执行命令-查询分离。
- 不可变结果中间件 - 如果您的结果对象上没有定义readonly修饰符的属性,则抛出异常。
要创建自己的自定义中间件,需要实现Sco\MessageBus\Middleware
接口并将其提供给总线。
use Sco\MessageBus\Bus; use Sco\MessageBus\Message; use Sco\MessageBus\Middleware; class CustomMiddleware implements Middleware { public function __invoke(Message $message,\Closure $next) : mixed { // Do something before message handling $result = $next($message); // Do something after message handling return $result; } } $bus = new Bus(middlewares: [new CustomMiddleware()]);
事件
如果您添加了Sco\MessageBus\Middleware\EventMiddleware
,您将能够订阅以下事件:
消息接收事件 - 在消息被接收但在处理之前引发。
use Sco\MessageBus\Event\Subscriber; use Sco\MessageBus\Event\MessageReceivedEvent; $subscriber = new Subscriber(); $subscriber->addListener(MessageReceivedEvent::class, function (MessageReceivedEvent $event) { $event->getName(); // Name of the Event $event->getMessage();; // Command or Query that has been received });
消息处理事件 - 在成功处理消息后引发。
use Sco\MessageBus\Event\Subscriber; use Sco\MessageBus\Event\MessageHandledEvent; $subscriber = new Subscriber(); $subscriber->addListener(MessageHandledEvent::class, function (MessageHandledEvent $event) { $event->getName(); // Name of the Event $event->getMessage(); // Command or Query being handled $event->getResult(); // Result for the handled message });
消息失败事件 - 在消息处理失败并抛出异常时引发。
use Sco\MessageBus\Event\Subscriber; use Sco\MessageBus\Event\MessageFailedEvent; $subscriber = new Subscriber(); $subscriber->addListener(MessageFailedEvent::class, function (MessageFailedEvent $event) { $event->getName(); // Name of the Event $event->getMessage(); // Command or Query being handled $event->getError(); // Captured Exception });
事务
事务中间件接受三个函数参数,每个参数对应事务的每个阶段:开始、提交和回滚。采用此方法允许您使用您喜欢的任何ORM,甚至使用原生的PDO对象与持久层交互。
$pdo = new \PDO('{connection_dsn}') $transaction = new \Sco\MessageBus\Middleware\TransactionMiddleware( fn(): bool => $pdo->beginTransaction(), fn(): bool => $pdo->commit(), fn(\Throwable $error): bool => $pdo->rollBack(), );
结果类型
库将处理器的返回值包装到结果值对象中,以提供一致的API,并确保返回值始终为同一类型。
所有结果值对象都扩展了Sco\MessageBus\Result
抽象类,可以分为3组
- 包装原始值的那一组
Sco\MessageBus\Result\Boolean
Sco\MessageBus\Result\Integer
Sco\MessageBus\Result\Numeric
Sco\MessageBus\Result\Text
Sco\MessageBus\Result\None
(包装null值)
Sco\MessageBus\Result\Delegated
,它包装对象并将调用委托给底层对象的方法Sco\MessageBus\Result\Collection
和Sco\MessageBus\Result\Map
,它们包装数字索引数组(列表)和字符串索引数组(映射),并实现\Countable
、\ArrayAccess
和\IteratorAggregate
接口
您还可以通过扩展抽象类Sco\MessageBus\Result
并返回它们来添加自己的自定义结果值对象。
贡献
风格指南
库遵循PSR-12标准。
待办事项
- 添加PSR Cache接口和实现以缓存结果