makinacorpus / corebus
命令和事件总线接口及逻辑。
Requires
- php: >=8.0
- makinacorpus/argument-resolver: ^1.0.4
- makinacorpus/message: ^1.1
- psr/log: ^1.0 || ^2.0 || ^3.0
Requires (Dev)
- friendsofphp/php-cs-fixer: ^2.16
- makinacorpus/access-control: ^1.2.2
- makinacorpus/event-store: ^1.0.4
- makinacorpus/goat-query-bundle: ^3.1
- makinacorpus/message-broker: ^2.0.0
- phpstan/phpstan: ^1.10
- phpunit/phpunit: ^9.2
- symfony/config: ^5.4 || ^6.0
- symfony/console: ^5.4 || ^6.0
- symfony/dependency-injection: ^5.4 || ^6.0
- symfony/framework-bundle: ^5.4 || ^6.0
- symfony/http-kernel: ^5.4 || ^6.0
- symfony/property-access: ^5.4 || ^6.0
- symfony/security-bundle: ^5.4|^6.0
- symfony/serializer: ^5.4 || ^6.0
- symfony/yaml: ^5.4 || ^6.0
- dev-master
- 2.0.0-alpha10
- 2.0.0-alpha9
- 2.0.0-alpha8
- 2.0.0-alpha7
- 2.0.0-alpha6
- 2.0.0-alpha5
- 2.0.0-alpha4
- 2.0.0-alpha3
- 2.0.0-alpha2
- 2.0.0-alpha1
- 1.0.0-alpha28
- 1.0.0-alpha27
- 1.0.0-alpha26
- 1.0.0-alpha25
- 1.0.0-alpha24
- 1.0.0-alpha23
- 1.0.0-alpha22
- 1.0.0-alpha21
- 1.0.0-alpha20
- 1.0.0-alpha19
- 1.0.0-alpha18
- 1.0.0-alpha17
- 1.0.0-alpha16
- 1.0.0-alpha15
- 1.0.0-alpha14
- 1.0.0-alpha13
- 1.0.0-alpha12
- 1.0.0-alpha11
- 1.0.0-alpha10
- 1.0.0-alpha9
- 1.0.0-alpha8
- 1.0.0-alpha7
- 1.0.0-alpha6
- 1.0.0-alpha5
- 1.0.0-alpha4
- 1.0.0-alpha3
- 1.0.0-alpha2
- 1.0.0-alpha1
- dev-add-doc-on-corebus-push-command
This package is auto-updated.
Last update: 2024-09-08 15:45:14 UTC
README
基于消息架构的项目的离散命令总线和服务域事件调度器接口。
离散意味着您的域代码将不会受到此组件硬依赖的影响,除了用于定位命令处理方法和事件监听方法属性外。您的域业务代码仍然没有依赖。
事件总线功能
- 内部同步事件调度器。
- 基于PHP属性的监听器定位器。
- 监听器定位器快速丢弃PHP缓存。
- 外部事件调度器(尚未启用)。
命令总线功能
- 事务性同步命令总线,可处理您的交易。
- 交易期间的事件缓冲,仅在交易成功时将事件刷新到外部事件调度器。
- 基于PHP属性的命令处理程序定位器。
- 命令处理程序定位器快速丢弃PHP缓存。
- 使用
makinacorpus/goat-query
的默认事务实现。 - 基于
makinacorpus/message-broker
消息代理接口的命令异步调度器实现。
其他各种功能
- 用于在CLI中消费异步事件的Worker对象。
- 包含命令总线Worker的控制台命令在内的Symfony集成。
- 全局属性,用于方面驱动的域代码配置。
- 简单的命令总线接口。
设计
基本设计
您的应用程序预期运行时流程如下
- 命令可以被调度以触发系统中的写入。
- 命令总是异步处理的,它们可以返回一个响应。
- 一个命令意味着在您的数据库后端上有一个事务。
- 在单个命令处理期间,域代码可能会引发一个或多个域事件。
- 域事件总是在触发命令事务的域代码中同步调度。
在整个命令处理过程中,如果后端允许,数据库事务将被隔离。提交是全部或无,包括在处理过程中发出的事件和监听器执行。
事务和事件缓冲
事务处理将在实现中完全隐藏,您的业务代码永远不会看到它,下面是如何工作的
- 域事件在发出和内部调度过程中,将沿途存储到一个易失性的内存临时缓冲区。
- 一旦命令被消费并且任务结束,事务将提交。
- 在成功的情况下,缓冲区将被刷新,事件可以发送到总线,以便外部应用程序监听。
- 在失败的情况下,事务回滚,事件缓冲区被清空,事件在进一步操作之前被丢弃。
可以在每个命令的基础上禁用事务,使用命令类的PHP属性。
可选的事件存储
如果您的项目需要,您可以在事件调度器上连接一个事件存储。有两种选择
- 连接到内部事件调度器,事件将沿途存储,这要求事件存储与您的域存储库使用相同的数据库事务,即连接。
- 连接到事件缓冲区输出,这意味着事件将在提交后存储,不再存在一致性问题,但如果事件存储过程失败,您将丢失历史记录。
命令的内部工作流程
命令调度器
命令调度器是面向用户的命令总线。它有两种现有的变体
-
MakinaCorpus\CoreBus\CommandBus\CommandBus
是默认的命令总线接口。 -
MakinaCorpus\CoreBus\CommandBus\SynchronousCommandbus
,它扩展了前者。它与前者完全相同,但存在是为了依赖注入的目的:使用同步命令总线将始终直接将命令从调度器传递到消费者,这将使消息被同步消费。
所有实现都实现了这两个接口,依赖注入组件有责任区分同步和异步实现。
这也是命令访问控制将通过装饰器发生的地方,如本文档稍后所述。
命令消费者
命令消费者是本地实现,它从给定的命令中执行它。您可以找到多个实现
-
MakinaCorpus\CoreBus\CommandBus\Consumer\DefaultCommandConsumer
是实际实现,它执行处理函数查找并执行它。 -
MakinaCorpus\CoreBus\CommandBus\Consumer\TransactionalCommandConsumer
是一个实现,它将装饰默认消费者,并将处理函数执行包装在一个单独的数据库事务中,沿途引发领域事件,并在出现任何错误时回滚。 -
MakinaCorpus\CoreBus\CommandBus\Consumer\NullCommandConsumer
仅在单元测试中使用,请忽略该实现。如果出于任何原因需要模拟实现,它可能对您有用。
目前还有一个尚未通用的步骤,即从队列中获取消息并将其发送到消息消费者的命令总线工作进程:它与 makinacorpus/message-broker
包硬编码。
实现
提供了两种实现
- 内存总线,以及空事务处理(根本不进行事务)非常适合原型设计和单元测试。
- 使用
makinacorpus/goat
作为消息代理和makinacorpus/goat-query
作为事务处理(使用相同的数据库连接)的 PostgreSQL 总线实现,可靠并保证数据一致性。
所有内容都隐藏在接口后面,不同的实现容易实现。您的项目不需要选择这些实现中的任何一个,相反,鼓励实现自己的。
设置
独立
目前没有独立的设置指南。请参考提供的 Symfony 配置以获取具体示例。
Symfony
只需在您的 config/bundles.php
文件中启用捆绑包
return [ // ... your other bunbles. MakinaCorpus\CoreBus\Bridge\Symfony\CoreBusBundle::class => ['all' => true], ];
然后,将 src/Bridge/Symfony/Resources/example/corebus.sample.yaml
文件剪切并粘贴到您的 config/packages/
文件夹中,并对其进行编辑。
用法
命令和事件
命令是普通的 PHP 对象,不需要任何依赖。
只需编写一个数据传输对象
namespace App\Domain\SomeBusiness\Command; final class SayHelloCommand { public readonly string $name; public function __construct(string $name) { $this->name = $name; } }
与事件相同,所以只需写
namespace App\Domain\SomeBusiness\Event; final class HelloWasSaidEvent { public readonly string $name; public function __construct(string $name) { $this->name = $name; } }
使用基类注册处理程序
绑定单个命令处理程序
namespace App\Domain\SomeBusiness\Handler; use MakinaCorpus\CoreBus\CommandBus\AbstractCommandHandler; final class SayHelloHandler extends AbstractCommandHandler { /* * Method name is yours, you may have more than one handler in the * same class, do you as wish. Only important thing is to implement * the Handler interface (here via the AbstractHandler class). */ public function do(SayHelloCommand $command) { echo "Hello, ", $command->name, "\n"; $this->notifyEvent(new HelloWasSaidEvent($command->name)); } }
请注意,使用 AbstractCommandHandler
基类完全是可选的,它只是一个辅助程序,可以在处理程序内部使用事件调度器和命令总线。
或者,如果您不需要这些,您可以
- 或者在类上设置
#[MakinaCorpus\CoreBus\Attr\CommandHandler]
属性,在这种情况下,其所有方法都将被视为处理程序。 - 或者在作为处理程序的方法上设置
#[MakinaCorpus\CoreBus\Attr\CommandHandler]
属性。
您还可以编写尽可能多的事件监听器,然后甚至可以自己触发事件
namespace App\Domain\SomeBusiness\Listener; use MakinaCorpus\CoreBus\EventBus\EventListener; final class SayHelloListener implements EventListener { /* * Method name is yours, you may have more than one handler in the * same class, do you as wish. Only important thing is to implement * the EventListener interface. */ public function on(HelloWasSaidEvent $event) { $this->logger->debug("Hello was said to {name}.", ['name' => $event->name]); } }
与事件监听器相同,基类只是在这里提供帮助,但不是必需的,您只需
- 或者在类上设置
#[MakinaCorpus\CoreBus\Attr\EventListener]
属性,在这种情况下,其所有方法都将被视为监听器。 - 或者在作为监听器的每个方法上设置
#[MakinaCorpus\CoreBus\Attr\EventListener]
属性。
这要求您的服务为容器所知。您有三个不同的选项。
第一个选项是 Symfony 的默认选项,自动配置您所有的服务
services: _defaults: autowire: true autoconfigure: true public: false everything: namespace: App\Domain\ resource: '../src/Domain/*'
或者如果您想做得更微妙
services: _defaults: autowire: true autoconfigure: true public: false handler_listener: namespace: App\Domain\ resource: '../src/Domain/*/{Handler,Listener}'
或者如果您想使用旧方法
services: App\Domain\SomeBusiness\Handler\SayHelloHandler: ~ App\Domain\SomeBusiness\Listener\SayHelloListener: ~
在任何情况下,只要您扩展基本类或使用属性,您都不需要任何标签或其他元数据。
使用属性注册处理程序
绑定单个命令处理程序
namespace App\Domain\SomeBusiness\Handler; use MakinaCorpus\CoreBus\EventBus\EventBusAware; use MakinaCorpus\CoreBus\EventBus\EventBusAwareTrait; final class SayHelloHandler implements EventBusAware { use EventBusAwareTrait; /* * Method name is yours, you may have more than one handler in the * same class, do you as wish. Only important thing is to implement * the Handler interface (here via the AbstractHandler class). */ #[MakinaCorpus\CoreBus\Attr\CommandHandler] public function do(SayHelloCommand $command) { echo "Hello, ", $command->name, "\n"; $this->notifyEvent(new HelloWasSaidEvent($command->name)); } }
您还可以编写尽可能多的事件监听器,然后甚至可以自己触发事件
namespace App\Domain\SomeBusiness\Listener; final class SayHello { /* * Method name is yours, you may have more than one handler in the * same class, do you as wish. Only important thing is to implement * the EventListener interface. */ #[MakinaCorpus\CoreBus\Attr\EventListener] public function on(HelloWasSaidEvent $event) { $this->logger->debug("Hello was said to {name}.", ['name' => $event->name]); } }
使用Symfony容器机制,无需配置即可使用此功能。
Symfony命令
将消息推送到总线
推送消息非常简单
bin/console corebus:push CommandName <<'EOT' { "message": "contents" } EOT
运行工作进程
运行工作进程非常简单
bin/console corebus:worker -v
如果您设置-vv
,您将获得非常详细的输出,在除开发机器以外的任何环境中使用都是一个非常糟糕的想法。
使用-v
运行将输出每条正在消费的消息的单独一行,包括一些时间和内存信息。当消息失败时,异常跟踪将完全显示在输出中。这是与systemd
或将输出管道传输到日志的docker
容器一起使用的良好设置。
未设置任何-v
标志将等同于-vv
,但输出仅发生在monolog的corebus
通道下。
此外,您还可以使用以下选项进行调整
--limit=X
:仅处理X条消息并退出--routing-key=QUEUE_NAME
:仅处理QUEUE_NAME
队列中的消息--memory-limit=128M
:当PHP内存限制超过给定限制时,退出。默认情况下,进程将使用当前PHP限制减去16M,以避免在消息处理期间出现PHP内存限制错误--memory-leak=512K
:当单个消息消耗完成后未完全释放内存时,在输出中警告。给定的阈值--sleep-time=X
:在没有更多消息要消费之前,在两次消息之间等待X微秒。这可能在未来被某些实现忽略。
使用属性
此软件包提供了对命令和事件进行注释以推断总线行为的属性支持。这允许在不污染领域代码的情况下声明命令或事件行为。
命令属性
-
#[MakinaCorpus\CoreBus\Attr\NoTransaction]
禁用命令的事务处理。请谨慎使用。 -
#[MakinaCorpus\CoreBus\Attr\RoutingKey(name: string)]
允许您通过给定的路由键(或队列名称)路由命令。默认情况下,未指定此属性时为default
。 -
#[MakinaCorpus\CoreBus\Attr\Async]
强制命令始终异步分派。警告:此功能尚未实现,只是一个空壳。 -
#[MakinaCorpus\CoreBus\Attr\Retry(count: ?int)]
允许在发生错误时重试命令。第一个参数是允许的重试次数,默认为3
。警告:此功能尚未实现,只是一个空壳。
领域事件属性。
#[MakinaCorpus\CoreBus\Attr\Aggregate(property: string, type: ?string)]
允许开发人员明确地告诉哪个聚合(实体或模型)此事件的目标。第一个参数必须是事件的属性名,它是聚合标识符,第二个参数是可选的,是目标聚合类或逻辑名称。如果您使用事件存储库,聚合类型对于聚合流创建事件是强制性的,标识符足以在现有流中附加事件。
配置属性
-
#[MakinaCorpus\CoreBus\Attr\CommandHandler]
如果在一个类上设置,将强制总线检查所有方法并将所有方法注册为命令处理程序;如果在一个单独的方法上设置,将注册此显式方法为命令处理程序。 -
#[MakinaCorpus\CoreBus\Attr\EventListener]
如果在一个类上设置,将强制总线检查所有方法并将所有方法注册为事件监听器;如果在一个单独的方法上设置,将注册此显式方法为事件监听器。
对于所有这些属性,参数是可选的,但你可能需要设置target
参数来区分处理器或监听器捕获哪个类。使用此参数,你可以使用接口进行匹配而不是具体的类。
命令输入的访问控制
此API允许你在命令输入上提供访问控制。警告:在输入上,一旦命令进入总线,你将无法再对其进行访问控制。
你可以通过实现MakinaCorpus\CoreBus\CommandBus\CommandAuthorizationChecker
接口来执行自定义访问控制。
namespace App\Domain\SomeBusiness\AccessControl; use MakinaCorpus\CoreBus\CommandBus\CommandAuthorizationChecker; class MyCommandAuthorizationChecker implements CommandAuthorizationChecker { /** * {@inheritdoc} */ public function isGranted(object $command): bool { if ($command instanceof SomeForbiddenCommand) { return false; } return true; } }
对于Symfony包用户,你需要在注册的授权检查服务上设置corebus.authorization_checker
标签。
此外,如果你使用与API自己的包一起使用的makinacorpus/access-control
Symfony包,它将自动配置。
services: App\Domain\SomeBusiness\AccessControl\MyCommandAuthorizationChecker: tags: ['corebus.authorization_checker']
在执行CommandBus::dispatch()
方法时,会使用装饰器模式进行访问检查。这可能防止你运行任意命令进入总线,因此我们将提供未来的一种回退,以便能够使用不受保护的总线(例如CLI命令)。目前,你需要明智地实现授权检查,以避免意外行为。
公开命令总线HTTP端点
提供的控制器
一个有效的基本控制器实现作为MakinaCorpus\CoreBus\Bridge\Symfony\Controller\CommandController
类提供,并提供了三种方法。
由于直接将总线作为HTTP端点公开会带来重要的安全问题,因此配置和安全取决于你。这就是为什么此控制器不会自动配置。
记住,默认配置下,所有命令都将通过CommandAuthorizationChecker
,你可以使用它来配置访问权限。
在Symfony中配置控制器
提供了一个示例src/Bridge/Symfony/Resources/example/corebus.routing.yaml
文件,但默认情况下不会配置:你必须将其复制/粘贴到你的项目中才能使其工作。
分发命令
简单地在/api/command/dispatch
端点路径上发送POST
或PUT
HTTP请求(你可能已更改此路径)
curl -X POST \ -H "Content-Type: application/json" \ -d '{"some": "content"}' \ 'https:///api/command/dispatch?command=SomeCommandName&async=1'
你应该期望以下结果
{ "status": "queued", "queue": "some_queue_name", "reply_to": null }
请注意,如果你传递了&async=1
GET参数,命令将被排队到总线上,并且响应中将包含"status": "queued"
。
如果你没有指定参数,命令将被同步消费,如果消息成功,则返回"status": "ok"
。
reply_to
参数也可以作为GET参数表示,如果设置为任何值,服务器端将生成一个回复队列名称,并在JSON属性的reply_to
中返回。
这允许你使用此队列名称来使用consume
端点检索潜在的回复。
分发命令事务
这尚未记录。
从给定队列中消费消息
消费端点主要是为了通过命令总线模式实现RPC方法调用而创建的,因此有以下限制:你只能监听在调用HTTP分发端点时通过设置reply_to
参数隐式创建的队列。
发送reply_to
参数时,将在服务器端生成一个名称,存储在会话中,并由分发命令在reply_to
属性下返回。
示例
同步分发示例
curl -X POST -k -H "Content-Type: application/json" -d '{}' 'https:///api/command/dispatch?command=App\Command\Ping&reply-to=1 | json_pp
这将给出以下结果
{ "properties" : [], "response" : { "date" : "2023-03-21 11:38:15.321013" }, "status" : "ok" }
请注意属性缺失,因为消息通过直接分发,消息中没有附带总线元数据。
请求体response
属性包含处理器的响应。
异步分发示例
例如,考虑到App\Command\Ping
命令仅返回一个包含一些属性的App\Command\PingResponse
对象,让我们将其分发到总线上。
curl -X POST -k -H "Content-Type: application/json" -d '{}' 'https:///api/command/dispatch?command=App\Command\Ping&async=1&reply-to=1 | json_pp
这将给出以下结果
{ "properties" : { "content-type" : "application/json", "message-id" : "989bc4db-578b-4aee-a060-bfa26a44487a", "reply-to" : "corebus.reply-to.c88d2f9f-37f4-46f3-8b08-52f9c5a79eb9", "type" : "App\Command\Ping" }, "status" : "queued" }
队列响应消费
让我们考虑之前的例子,注意你会在响应的properties
属性中找到reply-to
属性:这是自动计算出的接收处理程序异步响应的队列名称。
让我们消费队列
curl -X POST -k -H "Content-Type: application/json" 'https:///api/command/consume?queue=corebus.reply-to.c88d2f9f-37f4-46f3-8b08-52f9c5a79eb9' | json_pp
如果命令没有被处理,你会得到以下结果
{ "status" : "empty" }
一旦响应被接收并等待在队列中
{ "properties" : { "content-type" : "application/json", "message-id" : "821a6146-0816-4387-9404-48ec640b0a67", "type" : "SP2.System.Command.PingResponse", "x-retry-count" : "0", "x-routing-key" : "corebus.reply-to.c88d2f9f-37f4-46f3-8b08-52f9c5a79eb9", "x-serial" : "138343" }, "response" : { "date" : "2023-03-21 11:44:09.000000" }, "status" : "ok" }
响应类似于同步分发,除了你还会收到额外的属性元数据。
这个功能仍然是实验性的,目前还没有附加安全措施。
保护消费端点
默认情况下,消费端点不会允许任何任意的队列名称,因为它是为了满足通过命令总线实现RPC的需求而创建的,它只会允许特定的命名模式。
通过HTTP使用总线调用RPC方法
@待办
重写实现
本包中的任何接口都是你将要使用的依赖注入容器中的服务。你可以替换或装饰它们中的任何一个。
路线图
- 使用
makinacorpus/profiling
为事件总线实现配置文件装饰器。 - 使用
makinacorpus/profiling
为命令总线实现配置文件装饰器。 - 允许多个消息代理共存,每个队列一个。
- 实现死信队列路由。
- 为多个实例创建重试策略链。
- 使用我们的属性实现重试策略。
- 可按异常类型配置的重试策略。
- 为命令处理程序和事件监听器实现参数解析器。