sasa-b/command-bus

消息总线模式实现库

3.0.0 2024-04-24 10:02 UTC

README

通常称为 命令总线 模式,但该库在 命令查询 之间做出区分,并允许你在命令处理器中强制不返回任何值,以保持与 CQRS 模式 一致。

这是一个 独立库,仅有的两个依赖项是 PSR-11 容器PSR-3 日志 接口,以允许更好的互操作性。

目录

入门

使用 composer 安装库

composer require sco/message-bus

独立使用

您需要遵循 PSR-4 自动加载标准,并创建自己的服务容器类,这涉及到实现 Psr\Container\ContainerInterface 接口,它可以像库为其测试套件使用的 Sco\MessageBus\Tests\Stub\Container\InMemoryContainer 那样简单,或者您可以通过 composer require 一个遵循 PSR-11 标准 的服务容器库,如 PHP-DI

require 'vendor/autoload.php'

$container = new InMemoryContainer($services)

$bus = new \Sco\MessageBus\Bus($container);

$bus->dispatch(new FindPostByIdQuery(1))

与 Symfony 框架一起使用

我们可以采用两种方法,装饰库提供的 Bus 类,或者注入服务定位器。有关更多信息,您可以阅读 Symfony 文档

装饰 Bus

我们可以创建一个新的装饰器类,该类将实现 Symfony 的 Symfony\Contracts\Service\ServiceSubscriberInterface 接口

use Sco\MessageBus\Bus;
use Sco\MessageBus\Message;
use Sco\MessageBus\Result;
use Psr\Container\ContainerInterface;
use Symfony\Contracts\Service\ServiceSubscriberInterface;

class MessageBus implements ServiceSubscriberInterface
{
    private Bus $bus;

    public function __construct(ContainerInterface $locator)
    {
        $this->bus = new Bus($locator, [], null, new UuidV4Identity());
    }

    public function dispatch(\Sco\MessageBus\Message $message): Result
    {
        return $this->bus->dispatch($message);
    }

    public static function getSubscribedServices(): array
    {
        return [
            FindPostByIdHandler::class,
            SavePostHandler::class
        ];
    }
}

使用这种方法,您应用程序中的所有处理器都必须添加到由 getSubscribedServices 返回的数组中,因为 Symfony 中的服务默认不是公开的,而且也不应该是公开的,除非您将处理器添加到这个数组中,否则在映射器完成映射后,它将无法找到处理器,并抛出一个服务未找到的容器异常。

注入服务定位器

另一种方法是向库的 Bus 注入一个包含所有处理器的服务定位器。这将在服务注册 yaml 文件中完成。

匿名服务定位器

services:
    _defaults:
      autowire: true      
      autoconfigure: true 

    # Anonymous Service Locator
    Sco\MessageBus\Bus:
      arguments:
        $container: !service_locator
                        '@FindPostByIdHandler': 'handler_one'
                        '@SavePostHandler': 'handler_two'

显式服务定位器定义

services:
    _defaults:
      autowire: true      
      autoconfigure: true 

    # Explicit Service Locator
    message_handler_service_locator:
      class: Symfony\Component\DependencyInjection\ServiceLocator
      arguments:
          - '@FindPostByIdHandler'
          - '@SavePostHandler' 

    Sco\MessageBus\Bus:
      arguments:
        $container: '@message_handler_service_locator'

让我们扩展这些配置,并使用 Symfony 服务容器的标签功能自动将处理器添加到 Bus 中

使用 !tagged_locator

services:
  _defaults:
    autowire: true
    autoconfigure: true
    
  _instanceof: 
    Sco\MessageBus\Handler:
      tags: ['message_handler']

  # Anonymous Service Locator
  Sco\MessageBus\Bus:
    arguments:
      $container: !tagged_locator message_handler

显式服务定位器定义

services:
  _defaults:
    autowire: true
    autoconfigure: true

  _instanceof:
    Sco\MessageBus\Handler:
      tags: ['message_handler']
      
  # Explicit Service Locator
  message_handler_service_locator:
    class: Symfony\Component\DependencyInjection\ServiceLocator
    arguments:
      - !tagged_iterator message_handler

  Sco\MessageBus\Bus:
    arguments:
      $container: '@message_handler_service_locator'

与 Laravel 框架一起使用

要有效地与 Laravel 框架一起使用,您只需要在 Laravel 的服务容器 中注册 Bus,并将容器作为参数传递给库的 Bus 类

$this->app->bind(\Sco\MessageBus\Bus::class, function ($app) {
    return new \Sco\MessageBus\Bus($app);
});

核心概念

身份

每个 CommandQuery 及其相应的 Result 对象组合将被分配一个唯一的身份,例如,一个 Command 和其相应的 Result 对象将有一个身份 00000001。这可以用于日志记录、审计或调试目的。

默认的标识符生成策略是一个简单的Sco\MessageBus\Identity\RandomString生成器,以将外部依赖降到最低。要使用其他东西,您可以要求一个库,例如(https://github.com/ramsey/uuid),并实现\Sco\MessageBus\Identity

use Sco\MessageBus\Identity;

class UuidIdentity implements Identity
{
    public function generate() : string
    {
        return Uuid::uuid7()->toString();
    }
}

处理器映射策略

  1. 按名称映射 - 此策略考虑了完全限定名(FQN),并在类名中要求命令查询后缀。例如,一个FindPostByIdQuery将被映射到FindPostByIdHandler,或者一个SavePostCommand将被映射到SavePostHandler
  2. 按属性映射 - 此策略使用PHP属性,在命令/查询类中添加#[IsCommand(handler: SavePostHandler::class)]#[IsQuery(handler: FindPostByIdHandler::class)]handler参数名称可以省略,这取决于您的个人喜好。
  3. 自定义 - 如果您想创建自己的自定义映射策略,可以通过实现Sco\MessageBus\Mapper接口来做到这一点。

中间件

每个命令都将通过一系列中间件。默认情况下,链为空,但库确实提供了一些内置的中间件。

  • 事件中间件 - 在处理命令或查询之前和之后,以及失败时引发事件。
  • 事务中间件 - 在事务中运行单个命令查询begincommitrollback步骤是普通的\Closure对象,因此您可以使用您喜欢的任何ORM或持久化方法。
  • 空结果中间件 - 如果命令结果返回的不仅仅是null,则抛出异常,以强制执行命令-查询分离
  • 不可变结果中间件 - 如果您的结果对象上没有定义readonly修饰符的属性,则抛出异常。

要创建自己的自定义中间件,需要实现Sco\MessageBus\Middleware接口并将其提供给总线。

use Sco\MessageBus\Bus;
use Sco\MessageBus\Message;
use Sco\MessageBus\Middleware;

class CustomMiddleware implements Middleware
{
    public function __invoke(Message $message,\Closure $next) : mixed
    {
        // Do something before message handling
        
        $result = $next($message);
        
        // Do something after message handling
        
        return $result;
    }
}

$bus = new Bus(middlewares: [new CustomMiddleware()]);

事件

如果您添加了Sco\MessageBus\Middleware\EventMiddleware,您将能够订阅以下事件:

消息接收事件 - 在消息被接收但在处理之前引发。

use Sco\MessageBus\Event\Subscriber;
use Sco\MessageBus\Event\MessageReceivedEvent;

$subscriber = new Subscriber();

$subscriber->addListener(MessageReceivedEvent::class, function (MessageReceivedEvent $event) {
  $event->getName(); // Name of the Event
  $event->getMessage();; // Command or Query that has been received
});

消息处理事件 - 在成功处理消息后引发。

use Sco\MessageBus\Event\Subscriber;
use Sco\MessageBus\Event\MessageHandledEvent;

$subscriber = new Subscriber();

$subscriber->addListener(MessageHandledEvent::class, function (MessageHandledEvent $event) {
    $event->getName(); // Name of the Event
    $event->getMessage(); // Command or Query being handled
    $event->getResult(); // Result for the handled message
});

消息失败事件 - 在消息处理失败并抛出异常时引发。

use Sco\MessageBus\Event\Subscriber;
use Sco\MessageBus\Event\MessageFailedEvent;

$subscriber = new Subscriber();

$subscriber->addListener(MessageFailedEvent::class, function (MessageFailedEvent $event) {
    $event->getName(); // Name of the Event
    $event->getMessage(); // Command or Query being handled
    $event->getError(); // Captured Exception
});

事务

事务中间件接受三个函数参数,每个参数对应事务的每个阶段:开始、提交和回滚。采用此方法允许您使用您喜欢的任何ORM,甚至使用原生的PDO对象与持久层交互。

$pdo = new \PDO('{connection_dsn}')

$transaction = new \Sco\MessageBus\Middleware\TransactionMiddleware(
    fn(): bool => $pdo->beginTransaction(),
    fn(): bool => $pdo->commit(),
    fn(\Throwable $error): bool => $pdo->rollBack(),
);

结果类型

库将处理器的返回值包装到结果值对象中,以提供一致的API,并确保返回值始终为同一类型。

所有结果值对象都扩展了Sco\MessageBus\Result抽象类,可以分为3组

  1. 包装原始值的那一组
    • Sco\MessageBus\Result\Boolean
    • Sco\MessageBus\Result\Integer
    • Sco\MessageBus\Result\Numeric
    • Sco\MessageBus\Result\Text
    • Sco\MessageBus\Result\None(包装null值)
  2. Sco\MessageBus\Result\Delegated,它包装对象并将调用委托给底层对象的方法
  3. Sco\MessageBus\Result\CollectionSco\MessageBus\Result\Map,它们包装数字索引数组(列表)和字符串索引数组(映射),并实现\Countable\ArrayAccess\IteratorAggregate接口

您还可以通过扩展抽象类Sco\MessageBus\Result并返回它们来添加自己的自定义结果值对象。

贡献

风格指南

库遵循PSR-12标准

待办事项

  1. 添加PSR Cache接口和实现以缓存结果