nilportugues / messagebus
MessageBus 库。包含 CommandBus、QueryBus 和 EventBus 以及多个中间件实现。
Requires
- php: >=7
- container-interop/container-interop: ^1.2
- nilportugues/assert: ^1.0
- nilportugues/serializer: ^1.2
- psr/cache: ^1.0
- psr/container: ^1.0
- psr/log: ^1.0
Requires (Dev)
- fabpot/php-cs-fixer: ~1.9
- phpunit/phpunit: 5.2.*
- symfony/dependency-injection: ~2.7|~3.0
README
PHP 7 和使用 PSR-11 的 CommandBus、QueryBus 和 EventBus 的实现。
安装
使用 Composer 安装此包
$ composer require nilportugues/messagebus
简介
消息总线的基本思想是创建代表应用程序要执行的操作的消息对象。然后,将其扔入总线,总线确保消息对象到达所需的位置。
很简单,继续阅读!
什么是消息总线?
消息总线是消息的管道。此实现负责处理三种类型的消息,命令、查询和事件。虽然它们看起来很相似,但它们的意图是不同的。
- 命令:其意图是对系统下达指令并修改其当前状态。用户不期望任何响应。
- 事件:其意图是表达系统中已经发生的事情并记录它。用户不期望任何响应。
- 查询:其意图是向系统提出问题。用户期望有响应。
从这个分类中,我们可以看出命令和事件可以很好地一起工作。
它的好处是什么?
鉴于消息的性质,实现一个接口,您可以编写封装消息以进行记录、处理或修改响应的行为,这称为 中间件。
例如
- 实现基于任务的用户界面,您可以轻松地将概念映射到命令、查询和事件。
- 它允许您轻松编写一个日志系统来了解正在进行什么,无论是命令、查询还是事件。这是可能的。
总结一下,它的好处包括
- 鼓励关注点的分离。
- 鼓励单一职责设计。
1. CommandBus
1.1 - 使用方法
1.1.1 - 创建一个命令
<?php use NilPortugues\MessageBus\CommandBus\Contracts\Command; final class RegisterUser implements Command { private $username; private $password; private $emailAddress; public function __construct(string $username, string $password, string $emailAddress) { $this->username = $username; $this->password = $password; $this->emailAddress = $emailAddress; } //getters... }
1.1.2 - 创建一个 CommandHandler
Command Handler 必须实现 CommandHandler
接口并实现 __invoke
方法。
例如
<?php use NilPortugues\MessageBus\CommandBus\Contracts\CommandHandler; use NilPortugues\MessageBus\CommandBus\Contracts\Command; final class RegisterUserHandler implements CommandHandler { private $userRepository; public function __construct($userRepository) { $this->userRepository = $userRepository; } public function __invoke(RegisterUser $command) { $user = new User( $command->getUsername(), $command->getPassword(), $command->getEmail(), ); $this->userRepository->save($user); } }
1.1.3 - 注册 CommandHandler
我假设您正在使用某种服务容器。现在是时候注册您的CommandHandler了。
例如,在一个符合Psr\Container
规范的服务容器中,我们可以这样做:
<?php //...your other registered classes $container['RegisterUserHandler'] = function($container) { return new RegisterUserHandler($container->get('UserRepository'); };
1.1.4 - 设置 CommandBus
Command Bus Middleware需要注入两个类。第一个是命令翻译器,第二个是处理器解析器。
CommandTranslator
实现此接口的类将为给定的命令提供处理器类的完全限定名。
本包提供了一个实现,NilPortugues\MessageBus\CommandBus\Translator\AppendStrategy
,它基本上将单词Handler
追加到提供的Command
类。
对于自定义策略,您可以编写自己的实现,实现NilPortugues\MessageBus\CommandBus\Contracts\CommandTranslator
接口。
CommandHandlerResolver
实现此接口的类将根据使用的命令翻译器的输出解析所需的类。
本包提供了一个实现,NilPortugues\MessageBus\CommandBus\Resolver\PsrContainerResolver
,它期望任何实现Psr\Container
接口的服务容器。
对于Symfony 2和3框架用户,截至版本3.2,您应使用Symfony容器:NilPortugues\MessageBus\CommandBus\Resolver\SymfonyContainerResolver
。对于3.3及以上版本,使用PSR-11 ContainerResolver类。
1.1.5 - 注册剩余的 CommandBus 类
使Command Bus正常工作的最小设置是:
<?php //...your other registered classes $container['CommandTranslator'] = function($container) { return new \NilPortugues\MessageBus\CommandBus\Translator\AppendStrategy('Handler'); }; $container['CommandHandlerResolver'] = function($container) { return new \NilPortugues\MessageBus\CommandBus\Resolver\PsrContainerResolver($container); }; $container['CommandBusMiddleware'] = function($container) { return new \NilPortugues\MessageBus\CommandBus\CommandBusMiddleware( $container->get('CommandTranslator'), $container->get('CommandHandlerResolver'), ); }; $container['CommandBus'] = function($container) { return new \NilPortugues\MessageBus\CommandBus\CommandBus([ $container->get('CommandBusMiddleware'), ]); };
例如,如果我们想在Command Bus中记录所有发生的事情,我们将向中间件列表中添加日志记录中间件。这将包装Command Bus,能够在它运行前后记录,并在发生错误时。
<?php //...your other registered classes $container['LoggerCommandBusMiddleware'] = function($container) { return new \NilPortugues\MessageBus\CommandBus\LoggerCommandBusMiddleware( $container->get('Monolog') ); }; //Update the CommandBus with the LoggerCommandBusMiddleware $container['CommandBus'] = function($container) { return new \NilPortugues\MessageBus\CommandBus\CommandBus([ $container->get('LoggerCommandBusMiddleware'), $container->get('CommandBusMiddleware'), ]); };
1.1.6 - 运行 CommandBus
最后,为了使用CommandBus,您只需要运行以下代码:
<?php $commandBus = $container->get('CommandBus'); $command = new RegisterUser('MyUsername', 'MySecretPassword', 'hello@example.com'); $commandBus($command);
1.2 - 预定义中间件
TransactionalCommandBusMiddleware
- 类:
NilPortugues\MessageBus\CommandBus\TransactionalCommandBusMiddleware
- 类构造方法期望一个PDO连接。它将使用
beginTransaction-commit
和rollback
包装所有底层中间件的调用,如果抛出任何类型的异常。
LoggerQueryBusMiddleware
- 类:
NilPortugues\MessageBus\CommandBus\LoggerCommandBusMiddleware
- 类构造方法期望一个PSR3 Logger实现。
1.3 - 自定义中间件
要编写自定义中间件,需要创建一个新的类,实现NilPortugues\MessageBus\CommandBus\Contracts\CommandBusMiddleware
接口。
2. QueryBus
2.1 - 使用方法
2.1.1 - 创建一个查询
<?php use NilPortugues\MessageBus\QueryBus\Contracts\Query; final class GetUser implements Query { private $userId; public function __construct(string $userId) { $this->userId = $userId; } public function getUserId() : string { return $this->userId; } }
2.1.2 - 创建一个 QueryHandler
查询处理器必须实现QueryHandler
接口并实现__invoke
方法。
例如
<?php use NilPortugues\MessageBus\QueryBus\Contracts\Query; use NilPortugues\MessageBus\QueryBus\Contracts\QueryHandler; use NilPortugues\MessageBus\QueryBus\Contracts\QueryResponse; final class GetUserHandler implements QueryHandler { private $userRepository; public function __construct($userRepository) { $this->userRepository = $userRepository; } public function __invoke(GetUser $query) : QueryResponse { $userId = $query->getUserId(); $user = $this->userRepository->find($userId); return new UserQueryResponse($user); } }
2.1.3 - 创建一个 QueryResponse
响应查询是通用响应。
如果您考虑第2.1.2节,您会看到UserQueryResponse将$user注入到构造函数中。这样做是为了在其他场景中重用QueryResponse,例如获取更新的用户。
<?php class UserQueryResponse implements QueryResponse { public function __construct(User $user) { //fetch the relevant properties from the User object } //..getters. No setters required! }
2.1.4 - 注册 QueryHandler
我假设您正在使用某种服务容器。现在是时候注册您的QueryHandler了。
例如,在一个符合Psr\Container
规范的服务容器中,我们可以这样做:
<?php //...your other registered classes $container['GetUserHandler'] = function($container) { return new GetUserHandler($container->get('UserRepository')); };
2.1.5 - 设置 QueryBusMiddleware
Query Bus Middleware需要注入两个类。第一个是查询翻译器,第二个是处理器解析器。
QueryTranslator
实现此接口的类将为给定的查询提供处理器类的完全限定名。
本包提供了一个实现,NilPortugues\MessageBus\QueryBus\Translator\AppendStrategy
,它基本上将单词Handler
追加到提供的Query
类。
对于自定义策略,您可以编写自己的实现,实现NilPortugues\MessageBus\QueryBus\Contracts\QueryTranslator
接口。
QueryHandlerResolver
实现此接口的类将根据使用的查询翻译器的输出解析所需的类。
本包提供了一个实现,NilPortugues\MessageBus\QueryBus\Resolver\PsrContainerResolver
,它期望任何实现Psr\Container
接口的服务容器。
对于Symfony 2和3框架用户,截至版本3.2,您应使用Symfony容器:NilPortugues\MessageBus\QueryBus\Resolver\SymfonyContainerResolver
。对于3.3及以上版本,使用PSR-11 ContainerResolver类。
2.1.5 - 注册剩余的QueryBus类
使QueryBus正常工作的最小设置是
<?php //...your other registered classes $container['QueryTranslator'] = function($container) { return new \NilPortugues\MessageBus\QueryBus\Translator\AppendStrategy('Handler'); }; $container['QueryHandlerResolver'] = function($container) { return new \NilPortugues\MessageBus\QueryBus\Resolver\PsrContainerResolver($container); }; $container['QueryBusMiddleware'] = function($container) { return new \NilPortugues\MessageBus\QueryBus\QueryBusMiddleware( $container->get('QueryTranslator'), $container->get('QueryHandlerResolver'), ); }; $container['QueryBus'] = function($container) { return new \NilPortugues\MessageBus\QueryBus\QueryBus([ $container->get('QueryBusMiddleware'), ]); };
例如,如果我们想记录QueryBus中发生的所有操作,我们将向中间件列表中添加日志记录中间件。这将包装QueryBus,能够在它运行前后进行日志记录,并在有错误时记录。
<?php //...your other registered classes $container['LoggerQueryBusMiddleware'] = function($container) { return new \NilPortugues\MessageBus\QueryBus\LoggerQueryBusMiddleware( $container->get('Monolog') ); }; //Update the QueryBus with the LoggerQueryBusMiddleware $container['QueryBus'] = function($container) { return new \NilPortugues\MessageBus\QueryBus\QueryBus([ $container->get('LoggerQueryBusMiddleware'), $container->get('QueryBusMiddleware'), ]); };
2.1.6 - 运行QueryBus
最后,要使用QueryBus,您只需要运行以下代码
<?php $queryBus = $container->get('QueryBus'); $query = new GetUser(1): $userQueryResponse = $queryBus($query);
2.2 - 预定义中间件
CacheQueryBusMiddleware
- 类:
NilPortugues\MessageBus\QueryBus\CacheQueryBusMiddleware
- 类构造方法期望一个序列化器(见下文)、一个PSR6缓存实现和队列名称。
LoggerQueryBusMiddleware
- 类:
NilPortugues\MessageBus\QueryBus\LoggerQueryBusMiddleware
- 类构造方法期望一个PSR3 Logger实现。
2.3 - 自定义中间件
要编写自定义中间件,需要一个实现NilPortugues\MessageBus\QueryBus\Contracts\QueryBusMiddleware
接口的新类。
3. EventBus
3.1 - 使用方法
3.1.1 - 创建一个事件
我们将创建一个事件。由于事件的本性,一个事件可以映射到一个或多个事件处理器。
<?php use NilPortugues\MessageBus\EventBus\Contracts\Event; final class UserRegistered implements Event { private $userId; private $email; public function __construct(string $userId, string $email) { $this->userId = $userId; $this->email = $email; } public function getUserId() : string { return $this->userId; } public function getEmail() : string { return $this->email; } }
3.1.2 - 创建一个 EventHandler
为了说明事件处理的强大功能,我们将之前的事件UserRegistered
映射到两个事件处理器。
第一个事件处理器
我们将创建的第一个事件处理器假设我们有一个用于发送欢迎邮件的电子邮件服务。
<?php use NilPortugues\MessageBus\EventBus\Contracts\Event; use NilPortugues\MessageBus\EventBus\Contracts\EventHandler; final class SendWelcomeEmailHandler implements EventHandler { private $emailProvider; public function __construct($emailProvider) { $this->emailProvider = $emailProvider; } public function __invoke(UserRegistered $event) { $this->guard($event); $this->emailProvider->send('welcome_email', $event->getEmail()); } public static function subscribedTo() : string { return UserRegistered::class; } }
第二个事件处理器
我们将创建第二个事件处理器以在数据库中建立用户朋友和用户信用关系。
<?php use NilPortugues\MessageBus\EventBus\Contracts\Event; use NilPortugues\MessageBus\EventBus\Contracts\EventHandler; final class SetupUserAccountHandler implements EventHandler { private $userFriendsRepository; private $userCreditsRepository; public function __construct($userFriendsRepository, $userCreditsRepository) { $this->userFriendsRepository = $userFriendsRepository; $this->userCreditsRepository = $userCreditsRepository; } public function __invoke(UserRegistered $event) { $this->userFriendsRepository->add(new UserFriendsCollection($event->getUserId(), [])); $this->userCreditsRepository->add(new UserCredits($event->getUserId(), new Credits(0)); } public static function subscribedTo() : string { return UserRegistered::class; } }
3.1.3 - (可选)设置 EventHandler 的优先级
有时您需要确保一个操作先于另一个操作执行。
默认情况下,所有事件都有一个优先级,这个优先级由EventHandlerPriority::LOW_PRIORITY
常量值设置。
要实现您在类中的优先级顺序,必须实现EventHandler
接口,并实现另一个接口,即EventHandlerPriority
。
例如,如果我们想SendWelcomeEmailHandler
在SetupUserAccountHandler
之后发生,我们应该给前者更低的优先级,或者给后者更高的优先级。
在分发事件UserRegistered时,SetupUserAccountHandler应该首先运行
<?php use NilPortugues\MessageBus\EventBus\Contracts\Event; use NilPortugues\MessageBus\EventBus\Contracts\EventHandler; use NilPortugues\MessageBus\EventBus\Contracts\EventHandlerPriority; final class SetupUserAccountHandler implements EventHandler, EventHandlerPriority { //same as before... public static function priority() : int { return EventHandlerPriority::MAX_PRIORITY; } }
在分发事件UserRegistered时,SendWelcomeEmailHandler应该其次运行
请注意,通过从MAX_PRIORITY顺序中减去来设置相对顺序是一个好主意。
<?php use NilPortugues\MessageBus\EventBus\Contracts\Event; use NilPortugues\MessageBus\EventBus\Contracts\EventHandler; use NilPortugues\MessageBus\EventBus\Contracts\EventHandlerPriority; final class SendWelcomeEmailHandler implements EventHandler, EventHandlerPriority { //same as before... public static function priority() : int { return EventHandlerPriority::MAX_PRIORITY - 1; } }
3.1.4 - 注册 EventHandler
我假设您正在使用某种服务容器。现在是时候注册您的事件处理器了。
例如,在一个符合Psr\Container
规范的服务容器中,我们可以这样做:
<?php //...your other registered classes $container['UserFriendRepository'] = function($container) { return []; //your repository }; $container['UserCreditsRepository'] = function($container) { return []; //your repository }; $container['EmailProvider'] = function($container) { return []; //your email provider }; $container['SetupUserAccountHandler'] = function($container) { return new SetupUserAccountHandler( $container->get('UserFriendRepository'), $container->get('UserCreditsRepository') ); }; $container['SendWelcomeEmailHandler'] = function($container) { return new SendWelcomeEmailHandler($container->get('EmailProvider'); };
3.1.5 - 设置 EventBusMiddleware
事件总线中间件需要注入两个类。第一个是事件翻译器,第二个是处理器解析器。
事件翻译器
负责注册订阅事件的事件处理器。
其实现可以在以下位置找到:NilPortugues\MessageBus\EventBus\Translator\EventFullyQualifiedClassNameStrategy
。
事件处理器解析器
实现此接口的类将根据事件翻译器使用的输出解析所需的类。
此包提供了一个实现,即NilPortugues\MessageBus\EventBus\Resolver\PsrContainerResolver
,它期望任何实现Psr\Container
接口的服务容器。
对于Symfony 2和3框架用户,版本3.2以下应使用Symfony容器:NilPortugues\MessageBus\EventBus\Resolver\SymfonyContainerResolver
。对于3.3及以上版本,使用PSR-11 ContainerResolver类。
3.1.6 - 注册剩余的 EventBus 类
使事件总线正常工作的最小设置是
<?php //...your other registered classes $container['EventTranslator'] = function($container) { $handlers = [ SendWelcomeEmailHandler::class, SetupUserAccountHandler::class, ]; return new \NilPortugues\MessageBus\EventBus\Translator\EventFullyQualifiedClassNameStrategy($handlers); }; $container['EventHandlerResolver'] = function($container) { return new \NilPortugues\MessageBus\EventBus\Resolver\PsrContainerResolver($container); }; $container['EventBusMiddleware'] = function($container) { return new \NilPortugues\MessageBus\EventBus\EventBusMiddleware( $container->get('EventTranslator'), $container->get('EventHandlerResolver'), ); }; $container['EventBus'] = function($container) { return new \NilPortugues\MessageBus\EventBus\EventBus([ $container->get('EventBusMiddleware'), ]); };
例如,如果我们想记录事件总线中发生的一切,我们将向中间件列表中添加日志记录中间件。这将包装事件总线,能够在它运行前后进行日志记录,并在有错误时记录。
<?php //...your other registered classes $container['LoggerEventBusMiddleware'] = function($container) { return new \NilPortugues\MessageBus\EventBus\LoggerEventBusMiddleware( $container->get('Monolog') ); }; //Update the EventBus with the LoggerEventBusMiddleware $container['EventBus'] = function($container) { return new \NilPortugues\MessageBus\EventBus\EventBus([ $container->get('LoggerEventBusMiddleware'), $container->get('EventBusMiddleware'), ]); };
3.1.7 - 运行 EventBus
最后,要使用事件总线,您只需要运行以下代码
<?php $eventBus = $container->get('EventBus'); $Event = new GetUser(1): $userEventResponse = $eventBus($Event);
3.1.8 - (可选)将 EventBus 作为队列运行
节省用户的时间并加快页面加载速度!使用队列进行异步操作。
为此,您需要引入一个额外的包:事件总线队列。此扩展可以通过composer下载。
composer require nilportugues/eventbus-queue
3.2 - 预定义中间件
TransactionalEventBusMiddleware
- 类:
NilPortugues\MessageBus\EventBus\TransactionalEventBusMiddleware
- 类的构造方法期望一个PDO连接。如果抛出任何类型的异常,它将使用beginTransaction-commit和rollback包装所有底层的中间件调用。
LoggerEventBusMiddleware
- 类:
NilPortugues\MessageBus\EventBus\LoggerEventBusMiddleware
- 类构造方法期望一个PSR3 Logger实现。
ProducerEventBusMiddleware
- 类:
NilPortugues\MessageBus\EventBusQueue\ProducerEventBusMiddleware
- 将事件添加到事件队列。首先需要运行
composer require nilportugues/eventbus-queue
。
3.3 - 自定义中间件
要编写自定义中间件,需要创建一个实现NilPortugues\MessageBus\EventBus\Contracts\EventBusMiddleware
接口的新类。
4 - 序列化器
序列化器主要用于所有<Name>ProducerEventBusMiddleware
类。您也可能在缓存类中找到它们。
根据您的需求选择一个或另一个。
4.1 - NilPortugues\MessageBus\Serializer\NativeSerializer
对于缓存,这是最佳选择。
在EventBus中使用此方法,如果您的消费者是用PHP编写的,并且将与序列化的对象共享相同的代码库。
- 优点:可能的最快序列化。
- 缺点:消费者必须用PHP编写,并且类必须是可用的,否则反序列化将失败。
4.2 - NilPortugues\MessageBus\Serializer\JsonSerializer
不建议用于缓存。
在EventBus中使用此方法,如果您的消费者是用PHP编写的,但消费者可能用多种语言编写。
- 优点:如果消费者是PHP,数据将被恢复,并且如果与序列化的对象共享相同的代码库,则可以在反序列化时获得对象。如果不共享,您可能可以像常规JSON一样手动获取数据。
- 缺点:如果从数据存储作为JSON获取数据,它将保留对PHP数据结构的引用,但不会干扰数据的消费。
4.3 - NilPortugues\MessageBus\Serializer\JsonObjectSerializer
不适用于缓存。
在EventBus中使用此方法,如果您的消费者是用PHP编写的,但目前没有进行数据消费。
- 优点:JSON可以重用于将来在支持其的任何地方消费这些数据。
- 缺点:您必须编写自己的消费者以读取JSON结构。
贡献
欢迎对该软件包做出贡献!
支持
您可以通过以下方式之一与我联系
- 通过contact@nilportugues.com给我发邮件
- 打开一个问题
作者
许可
代码库在MIT许可证下授权。