neos/event-sourcing

在您的 Flow 框架包中集成事件存储和 CQRS 模式的轻量级且具有意见的方法

安装次数: 113 732

依赖项: 9

建议者: 0

安全: 0

星标: 44

关注者: 12

分支: 30

开放问题: 21

类型:neos-package


README

提供事件源应用程序接口和实现的库。

入门

通过 composer 安装此包

composer require neos/event-sourcing

设置事件存储

由于一个应用程序中可能同时存在多个事件存储,因此此包不再包含预配置的 "默认" 存储。只需几行 YAML 配置即可自定义存储

配置/Settings.yaml

Neos:
  EventSourcing:
    EventStore:
      stores:
        'Some.Package:EventStore':
          storage: 'Neos\EventSourcing\EventStore\Storage\Doctrine\DoctrineEventStorage'

这注册了一个名为 "Some.Package:EventStore" 的 Event Store,使用提供的 Doctrine 存储适配器将事件持久化到数据库表中。

要使用新配置的事件存储,还需要进行一个步骤以完成设置(在这种情况下,创建相应的数据库表)

./flow eventstore:setup Some.Package:EventStore
ℹ️  注意...

默认情况下,事件存储将事件持久化到与 Flow 持久化相同的数据库。但由于可以另行配置,因此该表不是通过 Doctrine 迁移生成的。如果您的应用程序依赖于存在事件表,当然可以添加 Doctrine 迁移。

获取事件存储实例

要获取特定事件存储的实例,可以使用 EventStoreFactory

use Neos\EventSourcing\EventStore\EventStoreFactory;
use Neos\Flow\Annotations as Flow;

class SomeClass {

    /**
     * @Flow\Inject
     * @var EventStoreFactory
     */
    protected $eventStoreFactory;

    function someMethod() {
        $eventStore = $this->eventStoreFactory->create('Some.Package:EventStore');
    }
}
ℹ️  其他方法...

或者,您可以直接注入事件存储

use Neos\EventSourcing\EventStore\EventStore;
use Neos\Flow\Annotations as Flow;

class SomeClass {

    /**
     * @Flow\Inject
     * @var EventStore
     */
    protected $eventStore;

    function someMethod() {
        // $this->eventStore->...
    }
}

但在此情况下,您必须指定要注入的 哪个 事件存储。这可以通过 Flow 的 对象框架 容易实现

配置/Objects.yaml

Some\Package\SomeClass:
  properties:
    'eventStore':
      object:
        factoryObjectName: Neos\EventSourcing\EventStore\EventStoreFactory
        arguments:
          1:
            value: 'Some.Package:EventStore'

如果您使用 Flow 6.2 或更高版本,您可以使用 虚拟对象配置 来简化此过程

配置/Objects.yaml

'Some.Package:EventStore':
  className: Neos\EventSourcing\EventStore\EventStore
  factoryObjectName: Neos\EventSourcing\EventStore\EventStoreFactory
  arguments:
    1:
      value: 'Some.Package:EventStore'
use Neos\EventSourcing\EventStore\EventStore;
use Neos\Flow\Annotations as Flow;

class SomeClass {

    /**
     * @Flow\Inject(name="Some.Package:EventStore")
     * @var EventStore
     */
    protected $eventStore;
}

最后,如果您在许多类中使用事件存储,您当然可以创建一个自定义事件存储外观,例如

Classes/CustomEventStore.php

<?php
namespace Some\Package;

use Neos\EventSourcing\Event\DomainEvents;
use Neos\EventSourcing\EventStore\EventStore;
use Neos\EventSourcing\EventStore\EventStoreFactory;
use Neos\EventSourcing\EventStore\EventStream;
use Neos\EventSourcing\EventStore\ExpectedVersion;
use Neos\EventSourcing\EventStore\StreamName;
use Neos\Flow\Annotations as Flow;

/**
 * @Flow\Scope("singleton")
 */
final class CustomEventStore
{

    /**
     * @var EventStore
     */
    private $instance;

    public function __construct(EventStoreFactory $eventStoreFactory)
    {
        $this->instance = $eventStoreFactory->create('Some.Package:EventStore');
    }

    public function load(StreamName $streamName, int $minimumSequenceNumber = 0): EventStream
    {
        return $this->instance->load($streamName, $minimumSequenceNumber);
    }

    public function commit(StreamName $streamName, DomainEvents $events, int $expectedVersion = ExpectedVersion::ANY): void
    {
        $this->instance->commit($streamName, $events, $expectedVersion);
    }
}

然后注入它。

编写事件

示例事件: SomethingHasHappened.php
<?php
namespace Some\Package;

use Neos\EventSourcing\Event\DomainEventInterface;

final class SomethingHasHappened implements DomainEventInterface
{
    /**
     * @var string
     */
    private $message;

    public function __construct(string $message)
    {
        $this->message = $message;
    }

    public function getMessage(): string
    {
        return $this->message;
    }

}
<?php
$event = new SomethingHasHappened('some message');
$streamName = StreamName::fromString('some-stream');
$eventStore->commit($streamName, DomainEvents::withSingleEvent($event));

读取事件

<?php
$streamName = StreamName::fromString('some-stream');
$eventStream = $eventStore->load($streamName);
foreach ($eventStream as $eventEnvelope) {
    // the event as it's stored in the Event Store, including its global sequence number and the serialized payload
    $rawEvent = $eventEnvelope->getRawEvent();

    // the deserialized DomainEventInterface instance 
    $domainEvent = $eventEnvelope->getDomainEvent();
}

使用事件监听器/投影器对事件做出反应

投影器是一种特殊的没有副作用(除了更新投影之外)的事件监听器,因此可以重置和回放。

为了对新事件做出反应,您需要一个 事件监听器

<?php
namespace Some\Package;

use Neos\EventSourcing\EventListener\EventListenerInterface;
use Some\Package\SomethingHasHappened;

class SomeEventListener implements EventListenerInterface
{

    public function whenSomethingHasHappened(SomethingHasHappened $event): void
    {
        // do something with the $event
    }

}

实现 EventListenerInterface 的类的 when*() 方法将在将相应的事件提交到事件存储时被调用。

由于可能有多个事件存储,因此必须将监听器 注册 到相应的存储

配置/Settings.yaml

Neos:
  EventSourcing:
    EventStore:
      stores:
        'Some.Package:EventStore':
          # ...
          listeners:
            'Some\Package\SomeEventListener': true

这注册了 Some\Package\SomeEventListener,以便在将相应的事件提交到 "Some.Package:EventStore" 时进行更新。

要注册所有/多个监听器到事件存储,您也可以使用正则表达式。

配置/Settings.yaml

Neos:
  EventSourcing:
    EventStore:
      stores:
        'Some.Package:EventStore':
          # ...
          listeners:
            'Some\Package\.*': true

但是请注意,一个监听器只能注册到一个事件存储(否则在“编译时”将抛出异常)。

如果您实现了一个投影器,您应该实现ProjectorInterface

在投影更新后触发副作用

有时,在某个投影更新后,有必要刷新相关数据。

警告:如果可能,首先尝试构建第二个独立投影。在投影更新后刷新状态类似于“相关投影”,只有当投影的数据以另一种表示形式(例如数据仓库或搜索索引)存储时才有意义。

这可以通过两种方式实现

变体1:在您的投影器中实现AfterInvokeInterface,并直接触发外部操作。

afterInvoke方法对每个事件都会被触发,因此没有批处理或其他类似操作。这在简单场景下是可以的,但如果事件众多且总是导致类似的刷新操作,则不适用。

变体2:实现AfterCatchUpInterface

afterCatchUp方法在投影器更新运行结束时被触发,可以用来对外部系统执行批量更新。

如果您想实现分块(即每触发例如100个事件时更新外部系统),您可以通过实现AfterInvokeInterfaceAfterCatchUpInterface来实现:在afterInvoke中,您会检查是否达到了块大小(如果是,则触发外部调用并重置您的跟踪状态)。在afterCatchUp中,您会在结束时触发未完成的批次的剩余调用。

同步响应事件(即同步投影更新)

当拥抱异步性时,您会建立一个应用扩展点,应用程序可以“拆分”。

  • 在此点上,应用程序更容易扩展(投影可以异步更新)。
  • 不同的投影可以并行更新
  • 对于长时间运行的操作,系统表现为非阻塞:您不需要等待可以响应客户端。

另一方面,异步性引入了复杂性,这会渗透到许多其他应用程序部分。通常,前端需要实现乐观更新和错误处理。

警告:您将放弃事件源的主要性能优势之一。在做决定之前三思而后行,并仔细考虑您对系统的假设,因为我们都有偏好“简单、同步世界”的倾向。

对于移动数据量较小的场景,由于同步执行不会导致性能问题,有时将模式切换回“同步”,其中投影在事件存储后直接更新是有用的。

如何强制投影(或其他事件监听器)同步运行?

您可以直接调用Neos\EventSourcing\EventListener\EventListenerInvoker::catchup()方法——这会调用投影器(以及其他需要的监听器)。

最好是创建一个服务,其中包含以下片段,以同步更新每个投影器

// $eventStore is created by EventStoreFactory::create()
// $someListener is the instanciated projector (a class implementing EventListenerInterface or ProjectorInterface)
//     usually $someListener can be injeced using @Flow\Inject(
// $dbalConnection is the database connection being used to read and update the "reading point" of the projector,
//     i.e. how many events it has already seen. (interally implemented by DoctrineAppliedEventsStorage, and by default
//     stored in the database table neos_eventsourcing_eventlistener_appliedeventslog).
//     In a Flow Application, you can retrieve this $dbalConnection most simply by using $this->entityManager->getConnection() - where
//     $this->entityManager is an injected instance of Doctrine\ORM\EntityManagerInterface. 
$eventListenerInvoker = new EventListenerInvoker($eventStore, $someListener, $dbalConnection);

$eventListenerInvoker->catchup();

事件源聚合

neos/event-sourcing包附带一个基类,可以用于实现事件源聚合

聚合构建

AbstractEventSourcedAggregateRoot类有一个私有构造函数。要创建一个新的聚合实例,您应该定义一个命名构造函数

<?php
declare(strict_types=1);
namespace Some\Package;

use Neos\EventSourcing\AbstractEventSourcedAggregateRoot;

final class SomeAggregate extends AbstractEventSourcedAggregateRoot
{
    /**
     * @var SomeAggregateId
     */
    private $id;

    public static function create(SomeAggregateId $id): self
    {
        $instance = new static();
        // This method will only be invoked once. Upon reconstitution only the when*() methods are called.
        // So we must never change the instance state directly (i.e. $instance->id = $id) but use events:
        $instance->recordThat(new SomeAggregateWasCreated($id));
        return $instance;
    }

    public function whenSomeAggregateWasCreated(SomeAggregateWasCreated $event): void
    {
        $this->id = $event->getId();
    }
}

聚合存储库

此框架不提供抽象存储库类,因为实现只需要几行代码,且没有可以提取的有用抽象。存储库只是事件存储和聚合类的一个瘦包装器

final class ProductRepository
{
    /**
     * @var EventStore
     */
    private $eventStore;

    public function __construct(EventStore $eventStore)
    {
        $this->eventStore = $eventStore;
    }

    public function load(SomeAggregateId $id): SomeAggregate
    {
        $streamName = $this->getStreamName($id);
        return SomeAggregate::reconstituteFromEventStream($this->eventStore->load($streamName));
    }

    public function save(SomeAggregate $aggregate): void
    {
        $streamName = $this->getStreamName($aggregate->id());
        $this->eventStore->commit($streamName, $aggregate->pullUncommittedEvents(), $aggregate->getReconstitutionVersion());
    }

    private function getStreamName(SomeAggregateId $id): StreamName
    {
        // we assume that the aggregate stream name is "some-aggregate-<aggregate-id>"
        return StreamName::fromString('some-aggregate-' . $id);
    }

}

教程

参见Tutorial.md

术语表

请参阅 Glossary.md

1:事件存储标识符是任意的,但为了防止命名冲突,建议在前面加上包键 – [返回](#user-content-a1) 2:Doctrine 事件存储使用为 Flow 配置的相同数据库连接,默认情况下在 neos_eventsourcing_eventstore_events 表中持久化事件 – 这可以调整,请参阅 Settings.yaml – [返回](#user-content-a2)