milhojas / messaging
适用于应用的通讯组件,包含 CommandBus、EventBus 和 QueryBus
Requires
- php: ^5.5 || ^7.0
- container-interop/container-interop: ^1.1
- symfony/dependency-injection: ^3.2
Requires (Dev)
- phpspec/phpspec: ^3.2
- phpunit/phpunit: ^5.7
README
适用于应用的通讯组件,包含 CommandBus、EventBus 和 QueryBus
Milhojas Messaging 是一个提供 PHP 应用内部通讯的组件。
它简单但非常灵活,易于扩展。
入门指南
什么是消息总线?
如果您想编写一个真正解耦的应用程序,其中层和组件是隔离的,您需要一种方式让组件之间进行通讯,而不需要相互深入了解。使用消息总线是解决这个问题的一个好方法。
消息总线是一个组件,用于在应用程序的不同部分之间传递消息。从长远来看,组件依赖于消息总线,但这可以被认为是可以接受的权衡。它是您应用程序的骨干通道。
根据 Verraes 的说法,消息是“系统之间的通信单元”。有三种类型:
- 命令消息:指示接收者执行操作或更改其状态。通常,我们将其建模为命令,接收者是命令处理器。这是一个一对一的关系,这意味着对于每个命令都有一个唯一的命令处理器来处理它。命令处理器不返回状态。状态的更改应使用查询或监听事件来确认。如果命令失败,则应抛出异常。
- 询问消息:向系统询问有关其状态的信息。我们将其建模为查询,接收者是查询处理器。它们类似于命令,除了两点:查询不更改状态(仅报告状态)并且它们必须返回响应,或者在无法获取数据时抛出异常。
- 信息性消息:系统的一部分传达关于自己的某些信息。我们将其建模为事件。事件由经历变化的系统部分(通常是命令的结果)引发,并且可能对应用程序中的许多其他组件都很有趣,我们称之为监听器。一个或多个监听器(或没有任何监听器)可以等待事件分发并对其采取行动。
因此,消息总线是让您将这些消息移动到应用程序核心的一种方式。
多个消息总线
在我们的实现中,根据三种消息类型,有三个消息总线:
- 命令总线,用于将命令发送到正确的处理器。
- 查询总线,用于将查询发送到可以回答它们的处理器。
- 事件总线,用于将事件发送到等待它们的监听器。
命令总线
命令总线将命令传递给其处理器。
命令是一个不可变的数据传输对象,它携带执行操作所需的数据到处理器。命令实现了从消息接口扩展而来的 Command 接口。
命令处理器是一个使用这些信息执行任务的对象。它实现了 CommandHandler 接口,因此它必须实现一个 handle(Command $command) 方法。命令处理器不应返回任何内容,但它们可以引发事件,或者系统本身可以引发事件。如果出现问题,则应抛出异常。
查询总线
查询总线将查询传递给正确的处理器,并返回响应。
查询是一个不可变的数据传输对象,它携带执行查询和获取所需信息所需的数据。查询实现了从消息接口扩展而来的 Query 接口。
查询处理器是一个对象,它使用这些信息执行其任务并从系统获取响应。它实现了QueryHandler接口,因此必须实现一个返回响应的answer(Query $query)方法。如果响应不可用,则应抛出异常,但如果没有可检索的信息,则可以返回一个空响应。例如:获取一个不存在的用户应抛出异常,但尝试查找在特定日期发布的多个帖子,如果没有这样的帖子,则可以返回一个空响应。
事件总线
事件总线将事件分发给对它们感兴趣的监听者(如果有)。
事件是一个不可变的数据传输对象,它携带与系统某个部分最近发生的更改或事件的有关数据。事件实现了从消息接口扩展而来的Event接口,并强制事件应该有一个返回事件标识名称的getName()方法。
监听器是一个使用事件信息执行某些任务的对象。它实现了Listener接口,因此必须实现一个handle(Event $event)方法。如果没有监听器来处理事件,总线必须静默失败。监听器可能会失败,但它们不应中断事件流程。
它们是如何工作的
消息总线类是非常简单的。它们将所有困难的工作委托给工作者或管道。工作者是一个处理接收到的消息的对象。管道是一组按顺序接收相同消息的工作者。实际上,工作者和管道都实现了Worker接口,因此它们可以互换。
在其他消息总线实现中,工作者被称为中间件。我不喜欢中间件命名约定。工作者意味着“这件事做工作”,而中间件意味着“这件事处于中间”。
因此,消息总线至少需要一个工作者才能,嗯,工作。实际上,每种类型的总线实际上都需要至少一个特定的工人来执行其主要任务(将消息发送到相应的处理器)。
- CommandBus需要ExecuteWorker。
- QueryBus需要QueryWorker。
- EventBus需要DispatcherWorker。
这听起来可能有些奇怪,但它给我们带来了一些优势
- 基本工作者实现可以替换,因此您可以从提供的实现开始,并在将来用您自己的实现替换它,前提是它实现了工作者接口。
- 您可以管道化多个工作者,如LoggerWorker或DispatchEventsWorker(由本包提供),在处理消息之前或之后执行多个任务。这带来了很大的灵活性。您可以编写自己的工作者或当前工作者的另一个实现。
- 一些工作者可以在总线之间共享。例如,同一个LoggerWorker实例可以被三个总线中的任何一个使用。
## 认识总线
CommandBus
CommandBus是与CommandHandlers通信的类。它需要一个至少通过构造函数注入的ExecuteWorker,但ExecuteWorker也需要一些合作者。因此,我们将首先回顾它们。
ExecuteWorker
ExecuteWorker接收一个命令,猜测CommandHandler的全限定名(FQCN)使用一个Inflector,并通过一个Loader加载它,该Loader通常是一个依赖注入容器的适配器。
因此,要实例化一个ExcuteWorker,我们首先需要一个Inflector和一个Loader。
Inflector
Milhojas Messaging附带了一个简单的Inflector类和一个Loader类。
Inflector类使用一个约定将命令的FQCN转换为标识依赖注入容器(DIC)中CommandHandler的键,这样Loader就可以获取它。提供的实现是ContainerInflector。
我通常将相关的命令分组在上下文中(想想DDD中的有界上下文),文件夹和文件的组织反映了这一点。例如,给定一个FQN(完全限定名)为
\Milhojas\Application\Context\Command\RegisterUser
的命令,Inflector应该解析出处理器的键为
context.register_user.handler
我将在同一文件夹中打包命令和命令处理器,因此示例中的相应处理器将是
\Milhojas\Application\Context\Command\RegisterUserHandler
而在DIC(依赖注入容器)中的名称将是
context.register_user.handler
明白了吗?
实例化Inflector的代码
use Milhojas\Messaging\Shared\Inflector\ContainerInflector;
$inflector = new ContainerInflector();
所以如果你想使用ContainerLoader,你应该为你的处理器创建一个DIC键。
加载器
加载器需要注入DI容器的一个适配器。任何Interop/ContainerInterface实现都应没问题。其他容器可能需要适配器。Milhojas Messaging提供了一个使用ContainerAware特质的SymfonyContainer适配器。
代码
use Milhojas\Messaging\Shared\Loader\ContainerLoader;
use Milhojas\Messaging\Shared\Loader\Container\SymfonyContainer;
$loader = new ContainerLoader(new SymfonyContainer);
等等。为什么你需要一个容器?
消息处理器是面向任务的小型对象,但通常它们需要多个协作者和参数才能执行。实例化它们最好的方式是通过依赖注入容器,总线需要实例化处理器才能工作。
实际上,你应该使用DIC来构建处理器和总线本身。
回到Worker
现在我们有ExecuteWorker的协作者,是时候实例化它了
use Milhojas\Messaging\CommandBus\Worker\ExecuteWorker;
$executeWorker = new ExecuteWorker($inflector, $loader);
回到总线
这次我们将只使用基本的工作者,并将管道留到另一天。为了构建CommandBus,我们需要以下代码
use Milhojas\Messaging\CommandBus\Worker\ExecuteWorker;
$commandBus = new CommandBus($executeWorker);
完整的代码应该如下所示
use Milhojas\Messaging\Shared\Inflector\ContainerInflector;
use Milhojas\Messaging\Shared\Loader\ContainerLoader;
use Milhojas\Messaging\Shared\Loader\Container\SymfonyContainer;
use Milhojas\Messaging\CommandBus\Worker\ExecuteWorker;
use Milhojas\Messaging\CommandBus\Worker\ExecuteWorker;
$inflector = new ContainerInflector();
$loader = new ContainerLoader(new SymfonyContainer);
$executeWorker = new ExecuteWorker($inflector, $loader);
$commandBus = new CommandBus($executeWorker);
命令和命令处理器
为了给CommandBus的生命赋予意义,你需要一些命令和命令处理器。它们编写起来相当简单。
命令模式是一个经典的模式,用于封装一个命令及其所需的数据以执行。传统上,命令模式是通过一个具有execute方法的唯一类实现的。然而,当前的方法包括两个类,一个简单的数据对象和一个独立的手动器,应用单一责任原则。这带来了几个优势。例如,注入处理器可能需要的依赖项变得更容易,而且你只需要传递一个参数(命令)到处理器的execute方法,所以所有处理器实现了一个唯一的接口。
更多文档即将发布...