neos / event-sourcing
在您的 Flow 框架包中集成事件存储和 CQRS 模式的轻量级且具有意见的方法
资助包维护!
bwaidelich
robertlemke
skurfuerst
Requires
- php: >=8.0
- flowpack/jobqueue-common: ^3.0 || dev-master
- neos/flow: ^7.0 || ^8.0 || ^9.0 || dev-master
- ramsey/uuid: ^3.9 || ^4.0
- symfony/property-access: ^5.1
- symfony/serializer: ^5.1
Requires (Dev)
- roave/security-advisories: dev-master
- dev-master
- 2.7.0
- 2.7.0-beta.2
- 2.7.0-beta.1
- 2.6.0
- 2.5.0
- 2.4.2
- 2.4.1
- 2.4.0
- 2.3.3
- 2.3.2
- 2.3.1
- 2.3.0
- 2.2.0
- 2.1.0
- 2.0.2
- 2.0.1
- 2.0.0
- 1.0.x-dev
- 1.0.1
- 1.0.0
- dev-upgrade/symfony_packages
- dev-feature/encrypted-events
- dev-task/flow-9
- dev-event-stream-test
- dev-feature/282-extensible-eventApplier
- dev-analysis-QMLP6m
- dev-feature-docs-appliedeventsstorage-for-sideeffects
- dev-bugfix/refactor-functionaltest
- dev-escrStaticAnalysis
- dev-bugfix/291-avoid-transaction-in-eventstore-setup
- dev-feature/projection-snapshots
This package is auto-updated.
Last update: 2024-09-19 17:44:20 UTC
README
提供事件源应用程序接口和实现的库。
- 对于 Symfony,使用 Neos.EventSourcingSymfonyBridge 以将此包适配到 Symfony 世界。
- 对于 Neos/Flow,直接使用此包。
入门
通过 composer 安装此包
composer require neos/event-sourcing
设置事件存储
由于一个应用程序中可能同时存在多个事件存储,因此此包不再包含预配置的 "默认" 存储。只需几行 YAML 配置即可自定义存储
配置/Settings.yaml
Neos: EventSourcing: EventStore: stores: 'Some.Package:EventStore': storage: 'Neos\EventSourcing\EventStore\Storage\Doctrine\DoctrineEventStorage'
这注册了一个名为 "Some.Package:EventStore" 的 Event Store,使用提供的 Doctrine 存储适配器将事件持久化到数据库表中。
要使用新配置的事件存储,还需要进行一个步骤以完成设置(在这种情况下,创建相应的数据库表)
./flow eventstore:setup Some.Package:EventStore
ℹ️ 注意...
默认情况下,事件存储将事件持久化到与 Flow 持久化相同的数据库。但由于可以另行配置,因此该表不是通过 Doctrine 迁移生成的。如果您的应用程序依赖于存在事件表,当然可以添加 Doctrine 迁移。
获取事件存储实例
要获取特定事件存储的实例,可以使用 EventStoreFactory
use Neos\EventSourcing\EventStore\EventStoreFactory; use Neos\Flow\Annotations as Flow; class SomeClass { /** * @Flow\Inject * @var EventStoreFactory */ protected $eventStoreFactory; function someMethod() { $eventStore = $this->eventStoreFactory->create('Some.Package:EventStore'); } }
ℹ️ 其他方法...
或者,您可以直接注入事件存储
use Neos\EventSourcing\EventStore\EventStore; use Neos\Flow\Annotations as Flow; class SomeClass { /** * @Flow\Inject * @var EventStore */ protected $eventStore; function someMethod() { // $this->eventStore->... } }
但在此情况下,您必须指定要注入的 哪个 事件存储。这可以通过 Flow 的 对象框架 容易实现
配置/Objects.yaml
Some\Package\SomeClass: properties: 'eventStore': object: factoryObjectName: Neos\EventSourcing\EventStore\EventStoreFactory arguments: 1: value: 'Some.Package:EventStore'
如果您使用 Flow 6.2 或更高版本,您可以使用 虚拟对象配置 来简化此过程
配置/Objects.yaml
'Some.Package:EventStore': className: Neos\EventSourcing\EventStore\EventStore factoryObjectName: Neos\EventSourcing\EventStore\EventStoreFactory arguments: 1: value: 'Some.Package:EventStore'
use Neos\EventSourcing\EventStore\EventStore; use Neos\Flow\Annotations as Flow; class SomeClass { /** * @Flow\Inject(name="Some.Package:EventStore") * @var EventStore */ protected $eventStore; }
最后,如果您在许多类中使用事件存储,您当然可以创建一个自定义事件存储外观,例如
Classes/CustomEventStore.php
<?php namespace Some\Package; use Neos\EventSourcing\Event\DomainEvents; use Neos\EventSourcing\EventStore\EventStore; use Neos\EventSourcing\EventStore\EventStoreFactory; use Neos\EventSourcing\EventStore\EventStream; use Neos\EventSourcing\EventStore\ExpectedVersion; use Neos\EventSourcing\EventStore\StreamName; use Neos\Flow\Annotations as Flow; /** * @Flow\Scope("singleton") */ final class CustomEventStore { /** * @var EventStore */ private $instance; public function __construct(EventStoreFactory $eventStoreFactory) { $this->instance = $eventStoreFactory->create('Some.Package:EventStore'); } public function load(StreamName $streamName, int $minimumSequenceNumber = 0): EventStream { return $this->instance->load($streamName, $minimumSequenceNumber); } public function commit(StreamName $streamName, DomainEvents $events, int $expectedVersion = ExpectedVersion::ANY): void { $this->instance->commit($streamName, $events, $expectedVersion); } }
然后注入它。
编写事件
示例事件: SomethingHasHappened.php
<?php namespace Some\Package; use Neos\EventSourcing\Event\DomainEventInterface; final class SomethingHasHappened implements DomainEventInterface { /** * @var string */ private $message; public function __construct(string $message) { $this->message = $message; } public function getMessage(): string { return $this->message; } }
<?php $event = new SomethingHasHappened('some message'); $streamName = StreamName::fromString('some-stream'); $eventStore->commit($streamName, DomainEvents::withSingleEvent($event));
读取事件
<?php $streamName = StreamName::fromString('some-stream'); $eventStream = $eventStore->load($streamName); foreach ($eventStream as $eventEnvelope) { // the event as it's stored in the Event Store, including its global sequence number and the serialized payload $rawEvent = $eventEnvelope->getRawEvent(); // the deserialized DomainEventInterface instance $domainEvent = $eventEnvelope->getDomainEvent(); }
使用事件监听器/投影器对事件做出反应
投影器是一种特殊的没有副作用(除了更新投影之外)的事件监听器,因此可以重置和回放。
为了对新事件做出反应,您需要一个 事件监听器
<?php namespace Some\Package; use Neos\EventSourcing\EventListener\EventListenerInterface; use Some\Package\SomethingHasHappened; class SomeEventListener implements EventListenerInterface { public function whenSomethingHasHappened(SomethingHasHappened $event): void { // do something with the $event } }
实现 EventListenerInterface
的类的 when*()
方法将在将相应的事件提交到事件存储时被调用。
由于可能有多个事件存储,因此必须将监听器 注册 到相应的存储
配置/Settings.yaml
Neos: EventSourcing: EventStore: stores: 'Some.Package:EventStore': # ... listeners: 'Some\Package\SomeEventListener': true
这注册了 Some\Package\SomeEventListener
,以便在将相应的事件提交到 "Some.Package:EventStore" 时进行更新。
要注册所有/多个监听器到事件存储,您也可以使用正则表达式。
配置/Settings.yaml
Neos: EventSourcing: EventStore: stores: 'Some.Package:EventStore': # ... listeners: 'Some\Package\.*': true
但是请注意,一个监听器只能注册到一个事件存储(否则在“编译时”将抛出异常)。
如果您实现了一个投影器,您应该实现ProjectorInterface
。
在投影更新后触发副作用
有时,在某个投影更新后,有必要刷新相关数据。
警告:如果可能,首先尝试构建第二个独立投影。在投影更新后刷新状态类似于“相关投影”,只有当投影的数据以另一种表示形式(例如数据仓库或搜索索引)存储时才有意义。
这可以通过两种方式实现
变体1:在您的投影器中实现AfterInvokeInterface
,并直接触发外部操作。
afterInvoke
方法对每个事件都会被触发,因此没有批处理或其他类似操作。这在简单场景下是可以的,但如果事件众多且总是导致类似的刷新操作,则不适用。
变体2:实现AfterCatchUpInterface
afterCatchUp
方法在投影器更新运行结束时被触发,可以用来对外部系统执行批量更新。
如果您想实现分块(即每触发例如100个事件时更新外部系统),您可以通过实现AfterInvokeInterface
和AfterCatchUpInterface
来实现:在afterInvoke
中,您会检查是否达到了块大小(如果是,则触发外部调用并重置您的跟踪状态)。在afterCatchUp
中,您会在结束时触发未完成的批次的剩余调用。
同步响应事件(即同步投影更新)
当拥抱异步性时,您会建立一个应用扩展点,应用程序可以“拆分”。
- 在此点上,应用程序更容易扩展(投影可以异步更新)。
- 不同的投影可以并行更新
- 对于长时间运行的操作,系统表现为非阻塞:您不需要等待可以响应客户端。
另一方面,异步性引入了复杂性,这会渗透到许多其他应用程序部分。通常,前端需要实现乐观更新和错误处理。
警告:您将放弃事件源的主要性能优势之一。在做决定之前三思而后行,并仔细考虑您对系统的假设,因为我们都有偏好“简单、同步世界”的倾向。
对于移动数据量较小的场景,由于同步执行不会导致性能问题,有时将模式切换回“同步”,其中投影在事件存储后直接更新是有用的。
如何强制投影(或其他事件监听器)同步运行?
您可以直接调用Neos\EventSourcing\EventListener\EventListenerInvoker::catchup()
方法——这会调用投影器(以及其他需要的监听器)。
最好是创建一个服务,其中包含以下片段,以同步更新每个投影器
// $eventStore is created by EventStoreFactory::create() // $someListener is the instanciated projector (a class implementing EventListenerInterface or ProjectorInterface) // usually $someListener can be injeced using @Flow\Inject( // $dbalConnection is the database connection being used to read and update the "reading point" of the projector, // i.e. how many events it has already seen. (interally implemented by DoctrineAppliedEventsStorage, and by default // stored in the database table neos_eventsourcing_eventlistener_appliedeventslog). // In a Flow Application, you can retrieve this $dbalConnection most simply by using $this->entityManager->getConnection() - where // $this->entityManager is an injected instance of Doctrine\ORM\EntityManagerInterface. $eventListenerInvoker = new EventListenerInvoker($eventStore, $someListener, $dbalConnection); $eventListenerInvoker->catchup();
事件源聚合
neos/event-sourcing
包附带一个基类,可以用于实现事件源聚合。
聚合构建
AbstractEventSourcedAggregateRoot
类有一个私有构造函数。要创建一个新的聚合实例,您应该定义一个命名构造函数
<?php declare(strict_types=1); namespace Some\Package; use Neos\EventSourcing\AbstractEventSourcedAggregateRoot; final class SomeAggregate extends AbstractEventSourcedAggregateRoot { /** * @var SomeAggregateId */ private $id; public static function create(SomeAggregateId $id): self { $instance = new static(); // This method will only be invoked once. Upon reconstitution only the when*() methods are called. // So we must never change the instance state directly (i.e. $instance->id = $id) but use events: $instance->recordThat(new SomeAggregateWasCreated($id)); return $instance; } public function whenSomeAggregateWasCreated(SomeAggregateWasCreated $event): void { $this->id = $event->getId(); } }
聚合存储库
此框架不提供抽象存储库类,因为实现只需要几行代码,且没有可以提取的有用抽象。存储库只是事件存储和聚合类的一个瘦包装器
final class ProductRepository { /** * @var EventStore */ private $eventStore; public function __construct(EventStore $eventStore) { $this->eventStore = $eventStore; } public function load(SomeAggregateId $id): SomeAggregate { $streamName = $this->getStreamName($id); return SomeAggregate::reconstituteFromEventStream($this->eventStore->load($streamName)); } public function save(SomeAggregate $aggregate): void { $streamName = $this->getStreamName($aggregate->id()); $this->eventStore->commit($streamName, $aggregate->pullUncommittedEvents(), $aggregate->getReconstitutionVersion()); } private function getStreamName(SomeAggregateId $id): StreamName { // we assume that the aggregate stream name is "some-aggregate-<aggregate-id>" return StreamName::fromString('some-aggregate-' . $id); } }
教程
术语表
请参阅 Glossary.md
1:事件存储标识符是任意的,但为了防止命名冲突,建议在前面加上包键 – [返回](#user-content-a1) 2:Doctrine 事件存储使用为 Flow 配置的相同数据库连接,默认情况下在 neos_eventsourcing_eventstore_events
表中持久化事件 – 这可以调整,请参阅 Settings.yaml – [返回](#user-content-a2)