neos / cqrs
资助包维护!
bwaidelich
robertlemke
skurfuerst
Requires
- php: >=8.0
- flowpack/jobqueue-common: ^3.0 || dev-master
- neos/flow: ^7.0 || ^8.0 || ^9.0 || dev-master
- ramsey/uuid: ^3.9 || ^4.0
- symfony/property-access: ^5.1
- symfony/serializer: ^5.1
Requires (Dev)
- roave/security-advisories: dev-master
- dev-master
- 2.7.0
- 2.7.0-beta.2
- 2.7.0-beta.1
- 2.6.0
- 2.5.0
- 2.4.2
- 2.4.1
- 2.4.0
- 2.3.3
- 2.3.2
- 2.3.1
- 2.3.0
- 2.2.0
- 2.1.0
- 2.0.2
- 2.0.1
- 2.0.0
- 1.0.x-dev
- 1.0.1
- 1.0.0
- dev-upgrade/symfony_packages
- dev-feature/encrypted-events
- dev-task/flow-9
- dev-event-stream-test
- dev-feature/282-extensible-eventApplier
- dev-analysis-QMLP6m
- dev-feature-docs-appliedeventsstorage-for-sideeffects
- dev-bugfix/refactor-functionaltest
- dev-escrStaticAnalysis
- dev-bugfix/291-avoid-transaction-in-eventstore-setup
- dev-feature/projection-snapshots
This package is auto-updated.
Last update: 2024-02-19 16:34:38 UTC
README
提供接口和实现的库,用于事件源应用程序。
- 对于Symfony,请使用Neos.EventSourcingSymfonyBridge,该桥接器将此包适配到Symfony世界。
- 对于Neos/Flow,直接使用此包。
入门
通过composer安装此包
composer require neos/event-sourcing
设置事件存储
由于一个应用程序中可能同时存在多个事件存储,此包不再包含预配置的“默认”存储。只需几行YAML配置即可设置自定义存储
配置/Settings.yaml
Neos: EventSourcing: EventStore: stores: 'Some.Package:EventStore': storage: 'Neos\EventSourcing\EventStore\Storage\Doctrine\DoctrineEventStorage'
这注册了一个名为"Some.Package:EventStore"的事件存储,它使用提供的Doctrine存储适配器将事件持久化到数据库表中。
为了使用新配置的事件存储,还需要执行一个额外步骤才能完成设置(在这种情况下是创建相应的数据库表)
./flow eventstore:setup Some.Package:EventStore
ℹ️ 注意...
默认情况下,事件存储将事件持久化到与Flow持久化相同的数据库。但由于这可能被配置为其他方式,因此此表不是通过Doctrine迁移生成的。如果您的应用程序依赖于事件表的存在,当然可以添加Doctrine迁移以支持它。
获取事件存储
要获取特定事件存储的实例,可以使用EventStoreFactory
。
use Neos\EventSourcing\EventStore\EventStoreFactory; use Neos\Flow\Annotations as Flow; class SomeClass { /** * @Flow\Inject * @var EventStoreFactory */ protected $eventStoreFactory; function someMethod() { $eventStore = $this->eventStoreFactory->create('Some.Package:EventStore'); } }
ℹ️ 其他方法...
或者,您可以直接注入事件存储
use Neos\EventSourcing\EventStore\EventStore; use Neos\Flow\Annotations as Flow; class SomeClass { /** * @Flow\Inject * @var EventStore */ protected $eventStore; function someMethod() { // $this->eventStore->... } }
但在此情况下,您必须指定要注入的哪个事件存储。这可以使用Flow的对象框架轻松完成
配置/Objects.yaml
Some\Package\SomeClass: properties: 'eventStore': object: factoryObjectName: Neos\EventSourcing\EventStore\EventStoreFactory arguments: 1: value: 'Some.Package:EventStore'
如果您使用Flow 6.2或更高版本,可以使用虚拟对象配置来简化此过程
配置/Objects.yaml
'Some.Package:EventStore': className: Neos\EventSourcing\EventStore\EventStore factoryObjectName: Neos\EventSourcing\EventStore\EventStoreFactory arguments: 1: value: 'Some.Package:EventStore'
use Neos\EventSourcing\EventStore\EventStore; use Neos\Flow\Annotations as Flow; class SomeClass { /** * @Flow\Inject(name="Some.Package:EventStore") * @var EventStore */ protected $eventStore; }
最后,如果您在许多类中使用事件存储,当然可以创建一个自定义事件存储外观,例如
Classes/CustomEventStore.php
<?php namespace Some\Package; use Neos\EventSourcing\Event\DomainEvents; use Neos\EventSourcing\EventStore\EventStore; use Neos\EventSourcing\EventStore\EventStoreFactory; use Neos\EventSourcing\EventStore\EventStream; use Neos\EventSourcing\EventStore\ExpectedVersion; use Neos\EventSourcing\EventStore\StreamName; use Neos\Flow\Annotations as Flow; /** * @Flow\Scope("singleton") */ final class CustomEventStore { /** * @var EventStore */ private $instance; public function __construct(EventStoreFactory $eventStoreFactory) { $this->instance = $eventStoreFactory->create('Some.Package:EventStore'); } public function load(StreamName $streamName, int $minimumSequenceNumber = 0): EventStream { return $this->instance->load($streamName, $minimumSequenceNumber); } public function commit(StreamName $streamName, DomainEvents $events, int $expectedVersion = ExpectedVersion::ANY): void { $this->instance->commit($streamName, $events, $expectedVersion); } }
并注入它。
编写事件
示例事件:SomethingHasHappened.php
<?php namespace Some\Package; use Neos\EventSourcing\Event\DomainEventInterface; final class SomethingHasHappened implements DomainEventInterface { /** * @var string */ private $message; public function __construct(string $message) { $this->message = $message; } public function getMessage(): string { return $this->message; } }
<?php $event = new SomethingHasHappened('some message'); $streamName = StreamName::fromString('some-stream'); $eventStore->commit($streamName, DomainEvents::withSingleEvent($event));
读取事件
<?php $streamName = StreamName::fromString('some-stream'); $eventStream = $eventStore->load($streamName); foreach ($eventStream as $eventEnvelope) { // the event as it's stored in the Event Store, including its global sequence number and the serialized payload $rawEvent = $eventEnvelope->getRawEvent(); // the deserialized DomainEventInterface instance $domainEvent = $eventEnvelope->getDomainEvent(); }
使用事件监听器/投影器对事件做出响应
投影仪是一种特殊的事件监听器,它没有副作用(除了更新投影之外),因此可以被重置和回放。
为了对新事件做出反应,你需要一个事件监听器
<?php namespace Some\Package; use Neos\EventSourcing\EventListener\EventListenerInterface; use Some\Package\SomethingHasHappened; class SomeEventListener implements EventListenerInterface { public function whenSomethingHasHappened(SomethingHasHappened $event): void { // do something with the $event } }
实现EventListenerInterface
的类的when*()
方法将在相应的事件被提交到事件存储时被调用。
由于可能存在多个事件存储,监听器必须注册到相应的存储。
配置/Settings.yaml
Neos: EventSourcing: EventStore: stores: 'Some.Package:EventStore': # ... listeners: 'Some\Package\SomeEventListener': true
这注册了Some\Package\SomeEventListener
,以便在将相应事件提交到"Some.Package:EventStore"时进行更新。
要使用正则表达式注册事件存储的所有/多个监听器,也可以这样做
配置/Settings.yaml
Neos: EventSourcing: EventStore: stores: 'Some.Package:EventStore': # ... listeners: 'Some\Package\.*': true
但请注意,监听器只能注册到单个事件存储(否则在"编译时"将抛出异常)。
如果你实现了投影仪,你应该实现ProjectorInterface
。
在投影更新后触发副作用
有时,在某个投影更新后刷新依赖数据是必要的。
警告:如果可能,首先尽量构建第二个独立的投影。在投影更新后刷新状态类似于"依赖投影",这只有在投影的相同数据以另一种表示形式(例如数据仓库或搜索索引)存储时才有意义。
这可以通过两种方式实现
变体1:在你的投影仪中实现AfterInvokeInterface
,并直接触发外部操作。
afterInvoke
方法对每个事件都会被触发,因此没有批处理或其他类似操作。这在简单场景下是可行的,但如果事件量很大且总是导致类似的刷新操作,则不适用。
变体2:实现AfterCatchUpInterface
afterCatchUp
方法在投影更新运行结束时被触发,可以用来对外部系统执行批量更新。
如果你想实现分块(即在每100个事件后触发外部系统的更新),你可以通过实现AfterInvokeInterface
和AfterCatchUpInterface
来实现:在afterInvoke
中,你会检查是否达到了块大小(如果是,则触发外部调用并重置你的跟踪状态)。在afterCatchUp
中,你将在结束时触发未完成的批次的剩余调用。
同步响应对事件(即投影同步更新)
当接受异步性时,你建立了一个缩放点,在这个点上应用程序可以被"拆分"。
- 应用程序可以更容易地在这个点上扩展(投影可以异步更新)
- 不同的投影可以并行更新
- 对于长时间运行的操作,系统表现为非阻塞:你不需要等待直到可以响应用户。
另一方面,异步性引入了复杂性,这会泄漏到应用程序的其他许多部分。通常,前端需要实现乐观更新和错误处理。
警告:你将放弃事件源的主要性能优势之一。在做这件事之前三思而后行,并仔细考虑你对系统的假设,因为我们都有偏好“简单、同步世界”的倾向。
对于少量移动数据的情况,由于同步执行不会导致性能问题,有时将模式切换回“同步”模式是有用的,在这种模式下,事件存储后投影将直接更新。
我们如何强制投影(或另一个事件监听器)同步运行?
您可以直接调用 Neos\EventSourcing\EventListener\EventListenerInvoker::catchup()
方法 - 这将调用投影(以及所需的其他事件监听器)。
最好的做法是为您想要同步更新的每个投影创建一个包含以下片段的服务:
// $eventStore is created by EventStoreFactory::create() // $someListener is the instanciated projector (a class implementing EventListenerInterface or ProjectorInterface) // usually $someListener can be injeced using @Flow\Inject( // $dbalConnection is the database connection being used to read and update the "reading point" of the projector, // i.e. how many events it has already seen. (interally implemented by DoctrineAppliedEventsStorage, and by default // stored in the database table neos_eventsourcing_eventlistener_appliedeventslog). // In a Flow Application, you can retrieve this $dbalConnection most simply by using $this->entityManager->getConnection() - where // $this->entityManager is an injected instance of Doctrine\ORM\EntityManagerInterface. $eventListenerInvoker = new EventListenerInvoker($eventStore, $someListener, $dbalConnection); $eventListenerInvoker->catchup();
事件源聚合
neos/event-sourcing
包提供了一个基类,可以用来实现 事件源聚合。
聚合构造
AbstractEventSourcedAggregateRoot
类有一个私有构造函数。为了创建一个新的聚合实例,您应该定义一个命名构造函数
<?php declare(strict_types=1); namespace Some\Package; use Neos\EventSourcing\AbstractEventSourcedAggregateRoot; final class SomeAggregate extends AbstractEventSourcedAggregateRoot { /** * @var SomeAggregateId */ private $id; public static function create(SomeAggregateId $id): self { $instance = new static(); // This method will only be invoked once. Upon reconstitution only the when*() methods are called. // So we must never change the instance state directly (i.e. $instance->id = $id) but use events: $instance->recordThat(new SomeAggregateWasCreated($id)); return $instance; } public function whenSomeAggregateWasCreated(SomeAggregateWasCreated $event): void { $this->id = $event->getId(); } }
聚合仓库
该框架没有为聚合提供抽象仓库类,因为实现只是几行代码,并且无法提取出有用的抽象。仓库只是 EventStore 和聚合类的一个瘦包装器
final class ProductRepository { /** * @var EventStore */ private $eventStore; public function __construct(EventStore $eventStore) { $this->eventStore = $eventStore; } public function load(SomeAggregateId $id): SomeAggregate { $streamName = $this->getStreamName($id); return SomeAggregate::reconstituteFromEventStream($this->eventStore->load($streamName)); } public function save(SomeAggregate $aggregate): void { $streamName = $this->getStreamName($aggregate->id()); $this->eventStore->commit($streamName, $aggregate->pullUncommittedEvents(), $aggregate->getReconstitutionVersion()); } private function getStreamName(SomeAggregateId $id): StreamName { // we assume that the aggregate stream name is "some-aggregate-<aggregate-id>" return StreamName::fromString('some-aggregate-' . $id); } }
教程
请参阅 Tutorial.md
术语表
请参阅 Glossary.md
1:事件存储标识符是任意的,但为了防止命名冲突,建议使用包键进行前缀命名 ↩ 2:Doctrine 事件存储使用为 Flow 配置的相同数据库连接,默认情况下在 neos_eventsourcing_eventstore_events
表中持久化事件 - 这可以进行调整,请参阅 Settings.yaml ↩