makinacorpus/corebus

命令和事件总线接口及逻辑。


README

基于消息架构的项目的离散命令总线和服务域事件调度器接口。

离散意味着您的域代码将不会受到此组件硬依赖的影响,除了用于定位命令处理方法和事件监听方法属性外。您的域业务代码仍然没有依赖。

事件总线功能

  • 内部同步事件调度器。
  • 基于PHP属性的监听器定位器。
  • 监听器定位器快速丢弃PHP缓存。
  • 外部事件调度器(尚未启用)。

命令总线功能

  • 事务性同步命令总线,可处理您的交易。
  • 交易期间的事件缓冲,仅在交易成功时将事件刷新到外部事件调度器。
  • 基于PHP属性的命令处理程序定位器。
  • 命令处理程序定位器快速丢弃PHP缓存。
  • 使用makinacorpus/goat-query的默认事务实现。
  • 基于makinacorpus/message-broker消息代理接口的命令异步调度器实现。

其他各种功能

  • 用于在CLI中消费异步事件的Worker对象。
  • 包含命令总线Worker的控制台命令在内的Symfony集成。
  • 全局属性,用于方面驱动的域代码配置。
  • 简单的命令总线接口。

设计

基本设计

您的应用程序预期运行时流程如下

  • 命令可以被调度以触发系统中的写入。
  • 命令总是异步处理的,它们可以返回一个响应。
  • 一个命令意味着在您的数据库后端上有一个事务。
  • 在单个命令处理期间,域代码可能会引发一个或多个域事件。
  • 域事件总是在触发命令事务的域代码中同步调度。

在整个命令处理过程中,如果后端允许,数据库事务将被隔离。提交是全部或无,包括在处理过程中发出的事件和监听器执行。

事务和事件缓冲

事务处理将在实现中完全隐藏,您的业务代码永远不会看到它,下面是如何工作的

  • 域事件在发出和内部调度过程中,将沿途存储到一个易失性的内存临时缓冲区。
  • 一旦命令被消费并且任务结束,事务将提交。
  • 在成功的情况下,缓冲区将被刷新,事件可以发送到总线,以便外部应用程序监听。
  • 在失败的情况下,事务回滚,事件缓冲区被清空,事件在进一步操作之前被丢弃。

可以在每个命令的基础上禁用事务,使用命令类的PHP属性。

可选的事件存储

如果您的项目需要,您可以在事件调度器上连接一个事件存储。有两种选择

  • 连接到内部事件调度器,事件将沿途存储,这要求事件存储与您的域存储库使用相同的数据库事务,即连接。
  • 连接到事件缓冲区输出,这意味着事件将在提交后存储,不再存在一致性问题,但如果事件存储过程失败,您将丢失历史记录。

命令的内部工作流程

命令调度器

命令调度器是面向用户的命令总线。它有两种现有的变体

  • MakinaCorpus\CoreBus\CommandBus\CommandBus 是默认的命令总线接口。

  • MakinaCorpus\CoreBus\CommandBus\SynchronousCommandbus,它扩展了前者。它与前者完全相同,但存在是为了依赖注入的目的:使用同步命令总线将始终直接将命令从调度器传递到消费者,这将使消息被同步消费。

所有实现都实现了这两个接口,依赖注入组件有责任区分同步和异步实现。

这也是命令访问控制将通过装饰器发生的地方,如本文档稍后所述。

命令消费者

命令消费者是本地实现,它从给定的命令中执行它。您可以找到多个实现

  • MakinaCorpus\CoreBus\CommandBus\Consumer\DefaultCommandConsumer 是实际实现,它执行处理函数查找并执行它。

  • MakinaCorpus\CoreBus\CommandBus\Consumer\TransactionalCommandConsumer 是一个实现,它将装饰默认消费者,并将处理函数执行包装在一个单独的数据库事务中,沿途引发领域事件,并在出现任何错误时回滚。

  • MakinaCorpus\CoreBus\CommandBus\Consumer\NullCommandConsumer 仅在单元测试中使用,请忽略该实现。如果出于任何原因需要模拟实现,它可能对您有用。

目前还有一个尚未通用的步骤,即从队列中获取消息并将其发送到消息消费者的命令总线工作进程:它与 makinacorpus/message-broker 包硬编码。

实现

提供了两种实现

  • 内存总线,以及空事务处理(根本不进行事务)非常适合原型设计和单元测试。
  • 使用 makinacorpus/goat 作为消息代理和 makinacorpus/goat-query 作为事务处理(使用相同的数据库连接)的 PostgreSQL 总线实现,可靠并保证数据一致性。

所有内容都隐藏在接口后面,不同的实现容易实现。您的项目不需要选择这些实现中的任何一个,相反,鼓励实现自己的。

设置

独立

目前没有独立的设置指南。请参考提供的 Symfony 配置以获取具体示例。

Symfony

只需在您的 config/bundles.php 文件中启用捆绑包

return [
    // ... your other bunbles.
    MakinaCorpus\CoreBus\Bridge\Symfony\CoreBusBundle::class => ['all' => true],
];

然后,将 src/Bridge/Symfony/Resources/example/corebus.sample.yaml 文件剪切并粘贴到您的 config/packages/ 文件夹中,并对其进行编辑。

用法

命令和事件

命令是普通的 PHP 对象,不需要任何依赖。

只需编写一个数据传输对象

namespace App\Domain\SomeBusiness\Command;

final class SayHelloCommand
{
    public readonly string $name;

    public function __construct(string $name)
    {
        $this->name = $name;
    }
}

与事件相同,所以只需写

namespace App\Domain\SomeBusiness\Event;

final class HelloWasSaidEvent
{
    public readonly string $name;

    public function __construct(string $name)
    {
        $this->name = $name;
    }
}

使用基类注册处理程序

绑定单个命令处理程序

namespace App\Domain\SomeBusiness\Handler;

use MakinaCorpus\CoreBus\CommandBus\AbstractCommandHandler;

final class SayHelloHandler extends AbstractCommandHandler
{
    /*
     * Method name is yours, you may have more than one handler in the
     * same class, do you as wish. Only important thing is to implement
     * the Handler interface (here via the AbstractHandler class).
     */
    public function do(SayHelloCommand $command)
    {
        echo "Hello, ", $command->name, "\n";

        $this->notifyEvent(new HelloWasSaidEvent($command->name));
    }
}

请注意,使用 AbstractCommandHandler 基类完全是可选的,它只是一个辅助程序,可以在处理程序内部使用事件调度器和命令总线。

或者,如果您不需要这些,您可以

  • 或者在类上设置 #[MakinaCorpus\CoreBus\Attr\CommandHandler] 属性,在这种情况下,其所有方法都将被视为处理程序。
  • 或者在作为处理程序的方法上设置 #[MakinaCorpus\CoreBus\Attr\CommandHandler] 属性。

您还可以编写尽可能多的事件监听器,然后甚至可以自己触发事件

namespace App\Domain\SomeBusiness\Listener;

use MakinaCorpus\CoreBus\EventBus\EventListener;

final class SayHelloListener implements EventListener
{
    /*
     * Method name is yours, you may have more than one handler in the
     * same class, do you as wish. Only important thing is to implement
     * the EventListener interface.
     */
    public function on(HelloWasSaidEvent $event)
    {
        $this->logger->debug("Hello was said to {name}.", ['name' => $event->name]);
    }
}

与事件监听器相同,基类只是在这里提供帮助,但不是必需的,您只需

  • 或者在类上设置 #[MakinaCorpus\CoreBus\Attr\EventListener] 属性,在这种情况下,其所有方法都将被视为监听器。
  • 或者在作为监听器的每个方法上设置 #[MakinaCorpus\CoreBus\Attr\EventListener] 属性。

这要求您的服务为容器所知。您有三个不同的选项。

第一个选项是 Symfony 的默认选项,自动配置您所有的服务

services:
    _defaults:
        autowire: true
        autoconfigure: true
        public: false

    everything:
        namespace: App\Domain\
        resource: '../src/Domain/*'

或者如果您想做得更微妙

services:
    _defaults:
        autowire: true
        autoconfigure: true
        public: false

    handler_listener:
        namespace: App\Domain\
        resource: '../src/Domain/*/{Handler,Listener}'

或者如果您想使用旧方法

services:
    App\Domain\SomeBusiness\Handler\SayHelloHandler: ~
    App\Domain\SomeBusiness\Listener\SayHelloListener: ~

在任何情况下,只要您扩展基本类或使用属性,您都不需要任何标签或其他元数据。

使用属性注册处理程序

绑定单个命令处理程序

namespace App\Domain\SomeBusiness\Handler;

use MakinaCorpus\CoreBus\EventBus\EventBusAware;
use MakinaCorpus\CoreBus\EventBus\EventBusAwareTrait;

final class SayHelloHandler implements EventBusAware
{
    use EventBusAwareTrait;

    /*
     * Method name is yours, you may have more than one handler in the
     * same class, do you as wish. Only important thing is to implement
     * the Handler interface (here via the AbstractHandler class).
     */
    #[MakinaCorpus\CoreBus\Attr\CommandHandler]
    public function do(SayHelloCommand $command)
    {
        echo "Hello, ", $command->name, "\n";

        $this->notifyEvent(new HelloWasSaidEvent($command->name));
    }
}

您还可以编写尽可能多的事件监听器,然后甚至可以自己触发事件

namespace App\Domain\SomeBusiness\Listener;

final class SayHello
{
    /*
     * Method name is yours, you may have more than one handler in the
     * same class, do you as wish. Only important thing is to implement
     * the EventListener interface.
     */
    #[MakinaCorpus\CoreBus\Attr\EventListener]
    public function on(HelloWasSaidEvent $event)
    {
        $this->logger->debug("Hello was said to {name}.", ['name' => $event->name]);
    }
}

使用Symfony容器机制,无需配置即可使用此功能。

Symfony命令

将消息推送到总线

推送消息非常简单

bin/console corebus:push CommandName <<'EOT'
{
    "message": "contents"
}
EOT

运行工作进程

运行工作进程非常简单

bin/console corebus:worker -v

如果您设置-vv,您将获得非常详细的输出,在除开发机器以外的任何环境中使用都是一个非常糟糕的想法。

使用-v运行将输出每条正在消费的消息的单独一行,包括一些时间和内存信息。当消息失败时,异常跟踪将完全显示在输出中。这是与systemd或将输出管道传输到日志的docker容器一起使用的良好设置。

未设置任何-v标志将等同于-vv,但输出仅发生在monolog的corebus通道下。

此外,您还可以使用以下选项进行调整

  • --limit=X:仅处理X条消息并退出
  • --routing-key=QUEUE_NAME:仅处理QUEUE_NAME队列中的消息
  • --memory-limit=128M:当PHP内存限制超过给定限制时,退出。默认情况下,进程将使用当前PHP限制减去16M,以避免在消息处理期间出现PHP内存限制错误
  • --memory-leak=512K:当单个消息消耗完成后未完全释放内存时,在输出中警告。给定的阈值
  • --sleep-time=X:在没有更多消息要消费之前,在两次消息之间等待X微秒。这可能在未来被某些实现忽略。

使用属性

此软件包提供了对命令和事件进行注释以推断总线行为的属性支持。这允许在不污染领域代码的情况下声明命令或事件行为。

命令属性

  • #[MakinaCorpus\CoreBus\Attr\NoTransaction]禁用命令的事务处理。请谨慎使用。

  • #[MakinaCorpus\CoreBus\Attr\RoutingKey(name: string)]允许您通过给定的路由键(或队列名称)路由命令。默认情况下,未指定此属性时为default

  • #[MakinaCorpus\CoreBus\Attr\Async]强制命令始终异步分派。警告:此功能尚未实现,只是一个空壳。

  • #[MakinaCorpus\CoreBus\Attr\Retry(count: ?int)]允许在发生错误时重试命令。第一个参数是允许的重试次数,默认为3。警告:此功能尚未实现,只是一个空壳。

领域事件属性。

  • #[MakinaCorpus\CoreBus\Attr\Aggregate(property: string, type: ?string)]允许开发人员明确地告诉哪个聚合(实体或模型)此事件的目标。第一个参数必须是事件的属性名,它是聚合标识符,第二个参数是可选的,是目标聚合类或逻辑名称。如果您使用事件存储库,聚合类型对于聚合流创建事件是强制性的,标识符足以在现有流中附加事件。

配置属性

  • #[MakinaCorpus\CoreBus\Attr\CommandHandler]如果在一个类上设置,将强制总线检查所有方法并将所有方法注册为命令处理程序;如果在一个单独的方法上设置,将注册此显式方法为命令处理程序。

  • #[MakinaCorpus\CoreBus\Attr\EventListener]如果在一个类上设置,将强制总线检查所有方法并将所有方法注册为事件监听器;如果在一个单独的方法上设置,将注册此显式方法为事件监听器。

对于所有这些属性,参数是可选的,但你可能需要设置target参数来区分处理器或监听器捕获哪个类。使用此参数,你可以使用接口进行匹配而不是具体的类。

命令输入的访问控制

此API允许你在命令输入上提供访问控制。警告:在输入上,一旦命令进入总线,你将无法再对其进行访问控制。

你可以通过实现MakinaCorpus\CoreBus\CommandBus\CommandAuthorizationChecker接口来执行自定义访问控制。

namespace App\Domain\SomeBusiness\AccessControl;

use MakinaCorpus\CoreBus\CommandBus\CommandAuthorizationChecker;

class MyCommandAuthorizationChecker implements CommandAuthorizationChecker
{
    /**
     * {@inheritdoc}
     */
    public function isGranted(object $command): bool
    {
         if ($command instanceof SomeForbiddenCommand) {
             return false;
         }
         return true;
    }
}

对于Symfony包用户,你需要在注册的授权检查服务上设置corebus.authorization_checker标签。

此外,如果你使用与API自己的包一起使用的makinacorpus/access-control Symfony包,它将自动配置。

services:
    App\Domain\SomeBusiness\AccessControl\MyCommandAuthorizationChecker:
        tags: ['corebus.authorization_checker']

在执行CommandBus::dispatch()方法时,会使用装饰器模式进行访问检查。这可能防止你运行任意命令进入总线,因此我们将提供未来的一种回退,以便能够使用不受保护的总线(例如CLI命令)。目前,你需要明智地实现授权检查,以避免意外行为。

公开命令总线HTTP端点

提供的控制器

一个有效的基本控制器实现作为MakinaCorpus\CoreBus\Bridge\Symfony\Controller\CommandController类提供,并提供了三种方法。

由于直接将总线作为HTTP端点公开会带来重要的安全问题,因此配置和安全取决于你。这就是为什么此控制器不会自动配置。

记住,默认配置下,所有命令都将通过CommandAuthorizationChecker,你可以使用它来配置访问权限。

在Symfony中配置控制器

提供了一个示例src/Bridge/Symfony/Resources/example/corebus.routing.yaml文件,但默认情况下不会配置:你必须将其复制/粘贴到你的项目中才能使其工作。

分发命令

简单地在/api/command/dispatch端点路径上发送POSTPUT HTTP请求(你可能已更改此路径)

curl -X POST \
    -H "Content-Type: application/json" \
    -d '{"some": "content"}' \
    'https:///api/command/dispatch?command=SomeCommandName&async=1'

你应该期望以下结果

{
    "status": "queued",
    "queue": "some_queue_name",
    "reply_to": null
}

请注意,如果你传递了&async=1 GET参数,命令将被排队到总线上,并且响应中将包含"status": "queued"

如果你没有指定参数,命令将被同步消费,如果消息成功,则返回"status": "ok"

reply_to参数也可以作为GET参数表示,如果设置为任何值,服务器端将生成一个回复队列名称,并在JSON属性的reply_to中返回。

这允许你使用此队列名称来使用consume端点检索潜在的回复。

分发命令事务

这尚未记录。

从给定队列中消费消息

消费端点主要是为了通过命令总线模式实现RPC方法调用而创建的,因此有以下限制:你只能监听在调用HTTP分发端点时通过设置reply_to参数隐式创建的队列。

发送reply_to参数时,将在服务器端生成一个名称,存储在会话中,并由分发命令在reply_to属性下返回。

示例

同步分发示例

curl -X POST -k -H "Content-Type: application/json" -d '{}' 'https:///api/command/dispatch?command=App\Command\Ping&reply-to=1 | json_pp

这将给出以下结果

{
   "properties" : [],
   "response" : {
      "date" : "2023-03-21 11:38:15.321013"
   },
   "status" : "ok"
}

请注意属性缺失,因为消息通过直接分发,消息中没有附带总线元数据。

请求体response属性包含处理器的响应。

异步分发示例

例如,考虑到App\Command\Ping命令仅返回一个包含一些属性的App\Command\PingResponse对象,让我们将其分发到总线上。

curl -X POST -k -H "Content-Type: application/json" -d '{}' 'https:///api/command/dispatch?command=App\Command\Ping&async=1&reply-to=1 | json_pp

这将给出以下结果

{
   "properties" : {
      "content-type" : "application/json",
      "message-id" : "989bc4db-578b-4aee-a060-bfa26a44487a",
      "reply-to" : "corebus.reply-to.c88d2f9f-37f4-46f3-8b08-52f9c5a79eb9",
      "type" : "App\Command\Ping"
   },
   "status" : "queued"
}

队列响应消费

让我们考虑之前的例子,注意你会在响应的properties属性中找到reply-to属性:这是自动计算出的接收处理程序异步响应的队列名称。

让我们消费队列

curl -X POST -k -H "Content-Type: application/json" 'https:///api/command/consume?queue=corebus.reply-to.c88d2f9f-37f4-46f3-8b08-52f9c5a79eb9' | json_pp

如果命令没有被处理,你会得到以下结果

{
   "status" : "empty"
}

一旦响应被接收并等待在队列中

{
   "properties" : {
      "content-type" : "application/json",
      "message-id" : "821a6146-0816-4387-9404-48ec640b0a67",
      "type" : "SP2.System.Command.PingResponse",
      "x-retry-count" : "0",
      "x-routing-key" : "corebus.reply-to.c88d2f9f-37f4-46f3-8b08-52f9c5a79eb9",
      "x-serial" : "138343"
   },
   "response" : {
      "date" : "2023-03-21 11:44:09.000000"
   },
   "status" : "ok"
}

响应类似于同步分发,除了你还会收到额外的属性元数据。

这个功能仍然是实验性的,目前还没有附加安全措施。

保护消费端点

默认情况下,消费端点不会允许任何任意的队列名称,因为它是为了满足通过命令总线实现RPC的需求而创建的,它只会允许特定的命名模式。

通过HTTP使用总线调用RPC方法

@待办

重写实现

本包中的任何接口都是你将要使用的依赖注入容器中的服务。你可以替换或装饰它们中的任何一个。

路线图

  • 使用makinacorpus/profiling为事件总线实现配置文件装饰器。
  • 使用makinacorpus/profiling为命令总线实现配置文件装饰器。
  • 允许多个消息代理共存,每个队列一个。
  • 实现死信队列路由。
  • 为多个实例创建重试策略链。
  • 使用我们的属性实现重试策略。
  • 可按异常类型配置的重试策略。
  • 为命令处理程序和事件监听器实现参数解析器。