etg24 / eventsourcing
一个用于使用EventSourcing的TYPO3 Flow包
Requires
- dbellettini/eventstore-client: ~0.4
- pda/pheanstalk: 3.0.*
- typo3/flow: dev-master
This package is not auto-updated.
Last update: 2020-08-25 13:37:24 UTC
README
此包为TYPO3 Flow提供基本的CQRS/ES基础设施。
其目的是为您提供编写自己的定制工具集以与CQRS/ES一起工作的灵感。我强烈建议在理解底层概念之前不要使用此包。
安装
$ composer require etg24/eventsourcing dev-master
命令
命令定义了您领域模型的接口。它们进入命令总线,然后将被命令处理器(同步/异步)处理。命令永远不会返回任何数据,因此处理命令的接口类型为void
。
定义命令
您可以通过扩展Etg24\EventSourcing\Command\Command
(我将很快更新代码库以使用CommandInterface
,但在那之前您需要从Command
继承)来定义一个命令。命令有一个commandId
,可以用于日志记录目的,如果需要的话。
<?php namespace Vendor\Foo\Command; use Etg24\EventSourcing\Command\Command; /** * Any doc comment will be part of the CLI documentation! */ class AddInventoryItemToBasket extends Command { /** * @var string */ public $basketId; /** * You will see this in the CLI! * * @var string */ public $inventoryItemId; /** * @param string $basketId * @param string $inventoryItemId */ public function __construct($basketId, $inventoryItemId) { parent::__construct(); $this->basketId = $basketId; $this->inventoryItemId = $inventoryItemId; } }
命令处理器
命令处理器执行来自命令总线的命令。它们是您应用程序层的一部分,只编排领域模型。它们的工作是解决依赖关系并将它们传递给领域模型。
您可以通过实现接口Etg24\EventSourcing\Command\Handler\CommandHandlerInterface
来定义一个命令处理器。或者,您可以扩展Etg24\EventSourcing\Command\CommandHandler
,它将提供一个正常处理器的默认实现。
Etg24.EventSourcing提供的命令处理器有方法命名的约定:handle<CommandName>Command
。命令名称是简单的类名。为了避免冲突,还需要保证命令处理器的命名空间为CommandHandler
和命令的Command
。请参阅下面的示例文件夹结构和实现。
- Vendor
- Foo
- CommandHandler
- BasketCommandHandler.php
- Command
- AddInventoryItemToBasket.php
以下是BasketCommandHandler的实现,用于处理AddInventoryItemToBasket命令。
<?php namespace Vendor\Foo\CommandHandler; use Vendor\Foo\Command\AddInventoryItemToBasket; use Vendor\Foo\Domain\Repository\BasketRepository; use Etg24\EventSourcing\Command\CommandHandler; use TYPO3\Flow\Annotations as Flow; /** * @Flow\Scope("singleton") */ class BasketCommandHandler extends CommandHandler { /** * @var BasketRepository * @Flow\Inject */ protected $basketRepository; /** * @param AddInventoryItemToBasket $command */ protected function handleAddInventoryItemToBasketCommand(AddInventoryItemToBasket $command) { // let's assume it exists $basket = $this->basketRepository->find($command->basketId); $basket->addInventoryItem($command->inventoryItemId); $this->basketRepository->save($basket); } }
命令总线
为了使命令处理器处理命令,使用命令总线。您可以在事件处理器或ActionControllers中注入命令总线。以下示例说明了在ActionController中的使用。
<?php namespace Vendor\Foo\Controller; // ... NS imports class BasketController extends ActionController { /** * @var InternalCommandBus * @Flow\Inject */ protected $commandBus; /** * @param string $basketId * @param string $inventoryItemId */ public function addInventoryItemToBasketAction($basketId, $inventoryItemId) { // exceptions can and will be thrown here, catch accordingly // at least, until error handling is implemented $this->commandBus->handle(new AddInventoryItemToBasket( $basketId, $inventoryItemId )); // redirect } }
理想情况下,您会编写一个TypeConverter,自动将例如POST数据转换为命令,在单个ActionController中处理它们。
使用CLI
默认情况下,所有命令都通过Flow CLI暴露。一旦定义,您可以使用
$ ./flow help
$ ./flow help foo:addinventoryitemtobasket
$ ./flow foo:addinventoryitemtobasket --basket-id="2" --inventory-item-id="1"
来查看任何给定命令所需的参数并执行它们。
要禁用命令CLI访问,请像这样编辑您的项目的Settings.yaml
Etg24: EventSourcing: Command: Controller: enabled: false
或者,您可以通过将它们标记为内部来从正常CLI用户中隐藏命令。这样,命令仍然可以访问,但不再由./flow help
打印。
Etg24: EventSourcing: Command: Controller: enabled: true markAsInternal: true
事件
领域事件由聚合创建,由仓库检索,存储在事件存储中并通过事件总线发布。它们是事件源模型中的唯一真相来源,并定义了您的模型状态。
定义事件
事件通过扩展Etg24\EventSourcing\Event\DomainEvent
来定义。请注意,必须调用父类的构造函数以生成事件发生时的日期(我还在考虑是否使用AOP等来执行此操作)。
<?php namespace Vendor\Foo\Domain\Event; use Etg24\EventSourcing\Event\DomainEvent; class InventoryItemToBaskedAdded extends DomainEvent { /** * @var string */ public $basketId; /** * @var string */ public $inventoryItemId; /** * @param string $basketId * @param string $inventoryItemId */ public function __construct($basketId, $inventoryItemId) { parent::__construct(); $this->basketId = $basketId; $this->inventoryItemId = $inventoryItemId; } }
仓库
仓库用于保存和检索聚合。Flow的命名规范也适用于此处。聚合Basket
的BasketRepository
看起来像这样
namespace Vendor\Foo\Domain\Repository; use Etg24\EventSourcing\Store\Repository; use TYPO3\Flow\Annotations as Flow; /** * @Flow\Scope("singleton") */ class BasketRepository extends Repository {}
默认事件存储后端是EventStore的EventStoreBackend
。您可以实现StoreBackendInterface
并使用Objects.yaml
来使用您自己的实现。
事件总线
事件总线通常只应由仓库使用。我很快将添加一些接口,以便根据事件是否需要异步或同步处理来配置不同的队列。
事件处理器
类似于命令处理器,事件处理器处理在事件总线上发布的事件。事件处理器订阅一个或多个领域事件。要创建事件处理器并使其监听事件,请实现EventHandlerInterface
(异步)或ImmediateEventHandlerInterface
(同步)接口。然后,根据接口,事件总线将事件推送到队列或将它们直接传递以进行处理(将通过不同的队列完成,但API不会更改)。
通常,您会使用事件处理器来处理最终一致性或发送电子邮件等问题。以下示例说明了AbstractEventHandler
及其约定的用法。
定义事件处理器
<?php namespace Vendor\Foo\Service; // ... NS imports class InformingCustomerAboutAddedInventoryItemService extends AbstractEventHandler implements ImmediateEventHandlerInterface { /** * @var array */ protected $subscribedToEvents = [ InventoryItemToBaskedAdded::class ]; /** * @param InventoryItemToBaskedAdded $event */ protected function handleInventoryItemToBaskedAddedEvent(InventoryItemToBaskedAdded $event) { // this is probably a bad idea to do, but certainly possible // needs some more checking though ;) $this->flashMessageContainer->addFlashMessage('Hi! The inventory item "' . $event->inventoryItemId . '" has been added to your basket.'); } }
聚合
在事件源环境中,聚合是通过领域事件来重建的!您可以通过实现 Etg24\EventSourcing\AggregateRootInterface
来创建一个聚合类。特质 Etg24\EventSourcing\AggregateSourcing
包含满足接口的行为,并为您的聚合提供合理的默认实现。
实例化
您可以使用PHP中的任何其他类来实例化新的聚合实例。请注意,没有自动生成标识符。特质 AggregateSourcing
添加了一个属性 $identifier
但没有填充它。在考虑 命令 时,您可能会在发送命令之前生成聚合的标识符。
$basket = new Basket('123');
发布新的领域事件
一旦创建,您就可以发布领域事件。领域事件只从聚合(或 实体)内部发布。
public function addInventoryItem($inventoryItemId) { // business logic // validation logic $this->applyNewEvent(new InventoryItemToBaskedAdded($this->identifier, $inventoryItemId)); }
现在,在将此聚合保存到存储库时,新事件将被写入到 事件存储。
从事件流中加载聚合
通常,存储库 会为您处理现有聚合的加载。然而,在测试时,您可能希望手动加载它们。特质 AggregateSourcing
实现了静态方法 loadFromEventStream
,这将实例化并应用事件。
$existingBasket = Basket::loadFromEventStream([ new BasketCreated(123), new InventoryItemToBaskedAdded(123, 1) ]);
然后,特质将在不调用其构造函数的情况下实例化一个新的 Basket
实例,然后应用每个事件。为此,您必须实现事件应用方法。这种约定的格式是 on<EventName>
。
/** * @param BasketCreated $event */ protected function onBasketCreated(BasketCreated $event) { $this->identifier = $event->basketId; } /** * @param InventoryItemToBaskedAdded $event */ protected function onInventoryItemToBaskedAdded(InventoryItemToBaskedAdded $event) { $this->inventoryItems[] = $event->inventoryItemId; }
如果由于缺少应用方法而无法应用事件,特质将抛出异常。在处理您的模型中仅涉及 CRUD 部分的模型时,您可能希望在这方面不那么严格。这样,您可以在模型中获得提炼后的业务逻辑,而不带任何噪音。
购物车聚合
<?php namespace Vendor\Foo\Domain\Model; // ... NS imports class Basket implements AggregateRootInterface { use AggregateSourcing; /** * @var array<string> */ protected $inventoryItems = []; /** * @param string $basketId */ public function __construct($basketId) { $this->applyNewEvent(new BasketCreated($basketId)); } /** * @param string $inventoryItemId */ public function addInventoryItem($inventoryItemId) { // business logic // validation logic $this->applyNewEvent(new InventoryItemToBaskedAdded($this->identifier, $inventoryItemId)); } /** * @param BasketCreated $event */ protected function onBasketCreated(BasketCreated $event) { $this->identifier = $event->basketId; } /** * @param InventoryItemToBaskedAdded $event */ protected function onInventoryItemToBaskedAdded(InventoryItemToBaskedAdded $event) { $this->inventoryItems[] = $event->inventoryItemId; } }
实体
实体与聚合(技术上)非常相似,但它们由聚合维护和拥有。实体的生命周期是聚合的责任,您必须始终通过聚合访问实体。
实体必须实现 Etg24\EventSourcing\EntityInterface
,特质 Etg24\EventSourcing\EntitySourcing
提供了行为。
在创建实体时,您必须将实体注册到聚合中
public function onSomeThingsHappened(SomeThingsHappened $event) { $entity = new Entity(..); $this->entities[$entity->getIdentifier()] = $entity; $this->registerEntity($entity); }
注意聚合是如何处理创建实体的事件的,而不是实体。这就是为什么在 Entity::__construct
方法中没有检查,因为这就像是一个应用事件的方法。
class Entity implements EntityInterface { use EntitySourcing; public function __construct($entityId) { $this->identifier = $entityId; } }
这确保了事件也会转发到每个实体。然后,实体必须通过实现 canApplyEvent
来暴露它们订阅的事件。
public function canApplyEvent(DomainEvent $event) { if ($event instanceof EntityEvent) { return ($event->entityId === $this->identifier); } return FALSE; }
应用事件的工作方式与在聚合根中回放事件完全一样。
存储
目前,唯一实现的存储后端是EventStore。您可以实现StoreBackendInterface
并使用Objects.yaml
来使用自己的实现。
投影
投影,也称为查询模型,用于查询事件源环境中的数据。基本上,它们只是更新某些查询优化数据库的事件处理器。
todo:为MysqlProjector添加一些示例
队列
消息队列可用于异步或同步地处理命令和事件。目前,唯一工作的队列是处理消息的同步队列ImmediateQueue
。我的计划是将来使用TYPO3.Jobqueue(无需重新发明轮子)。
序列化
目前实现了两个序列化器
- ArraySerializer:将消息转换为数组
- JsonSerializer:将消息转换为JSON字符串
测试
todo:关于如何测试事件源模型(提示:不是通过使用getter)
待办事项
- 完成此文档
- 每个处理器(命令/事件/投影)中的异常处理
- 实现快照
- 为包编写测试(是的,这个包完全是未经测试的,但它工作得很好:o!)
许可证
Etg24.EventSourcing遵循MIT许可证。