nixilla/cqrs-bundle

集成Prooph CQRS/ES库的Symfony Bundle

安装: 140

依赖: 0

建议者: 0

安全: 0

星星: 4

关注者: 2

分支: 3

开放问题: 0

类型:symfony-bundle

dev-master 2017-03-01 11:49 UTC

This package is auto-updated.

Last update: 2024-09-19 01:22:10 UTC


README

此Symfony Bundle将Prooph CQRS/ES库集成到Symfony项目中。

Build Status Coverage Status

安装

使用composer安装

composer require nixilla/cqrs-bundle

将bundle添加到AppKernel

<?php

// app/AppKernel.php

class AppKernel extends Kernel
{
    public function registerBundles()
    {
        $bundles = [
            // other bundles here,
            new Nixilla\CqrsBundle\NixillaCqrsBundle()
        ];
        
        return $bundles;
    }
}

配置存储适配器。以下是一个设置MongoDB适配器的示例

# app/config/services.yml

parameters:

    mongodb_server: mongodb://localhost:27017
    mongodb_dbname: my_database

services:
    mongo.client:
        class: MongoClient
        arguments: [ "%mongodb_server%" ]
        
    mongo.event.store.adapter:
        class: Prooph\EventStore\Adapter\MongoDb\MongoDbEventStoreAdapter
        arguments: [ "@prooph.message.factory", "@prooph.message.converter", "@mongo.client", "%mongodb_dbname%" ]
# app/config/config.yml

nixilla_cqrs:
    event_store:
        adapter: mongo.event.store.adapter

使用

CQRS/ES在开发中非常简单且易于使用,但另一方面也非常耗时。对于您想在系统中记录的每个事件,您都需要创建几个类。以下我将向您展示如何使用此bundle创建简单的新闻通讯注册事件。

以下是我们将按顺序创建的CQRS/ES工件列表

  • CreateContact - 命令
  • CreateContactHandler - 命令处理器
  • ContactRepository - 与数据存储交互的类 - 类似于Doctrine存储库的概念
  • ContactCreated - 这是持久化到数据存储的事件类
  • MarketingNotificationListener - 一个示例监听器,用于发送Hipchat通知
  • CampaignMonitorContactCreatedProjector - 一个示例投影仪,用于更新Campaign Monitor

让我们从非CQRS的内容开始,那就是Symfony表单和控制器。我省略了一些代码(如构造函数和use语句)以使其更短。

<?php

namespace AppBundle\Form;

class BasicContactType extends AbstractType
{
    public function buildForm(FormBuilderInterface $builder, array $options)
    {
        $builder
            ->add('emailAddress', EmailType::class, [
                'constraints' => [ new NotBlank(), new Email() ]
            ])
        ;
    }
}
<?php

namespace AppBundle\Controller;

class ContactCreateController
{
    /** @var Prooph\ServiceBus\CommandBus */
    private $commandBus;

    /** @var Symfony\Component\Form\FormInterface */
    private $form;

    public function createAction(Request $request)
    {
        $this->form->submit(json_decode($request->getContent(), true));

        if($this->form->isValid())
        {
            $this->commandBus->dispatch(new CreateContact($this->form->getData()));
            return new Response('', 201);
        }

        return new Response($this->form->getErrors(true)->__toString(), 422);
    }
}

此bundle提供commandBus服务,但CreateContact命令是我们现在要创建的。这是第一个CQRS工件。

<?php

namespace Newsletter\Domain\Command;

class CreateContact extends Prooph\Common\Messaging\Command
{
    private $payload;

    public function __construct(array $payload)
    {
        $this->init();
        $this->setPayload($payload);
    }

    public function payload() { return $this->payload; }

    protected function setPayload(array $payload) { $this->payload = $payload; }
}

现在如果配置路由如下

# src/AppBundle/Resources/config/routing.yml

contact_create:
    path: /contact
    defaults: { _controller: controller.contact.create:createAction }
    methods: [ POST ]

那么您应该可以使用curl调用它,如下所示

curl -v \
    -H "Content-Type: application/json" \
    -X POST \
    --data '{"emailAddress":"john@smith.local"}' \
    http://localhost/contact

当您实际运行此命令时,您将得到异常 Message dispatch failed during locate-handler phase. Error: You have requested a non-existent service "handler.create_contact"。

handler.create_contact是我们接下来要创建的服务。此服务的唯一目的是处理CreateContact命令。换句话说,每个命令只有一个命令处理器,每个命令处理器只能处理一个命令。

<?php

namespace Newsletter\Domain\CommandHandler;

class CreateContactHandler
{
    /** @var Newsletter\Domain\Repository\ContactRepositoryInterface */
    private $repository;

    public function __invoke(CreateContact $command)
    {
        $this->repository->add(Contact::fromPayload($command->payload()));
    }
}

此处理器需要2个额外的CQRS工件。

第一个是ContactRepository,它通常有两个方法:getadd

<?php

namespace Newsletter\Domain\Repository;

interface ContactRepositoryInterface
{
    public function add(Newsletter\Domain\Aggregate\Contact $contact);
    public function get($id);
}

和实现

<?php

namespace AppBundle\Cqrs\Repository;

class ContactRepository implements Newsletter\Domain\Repository\ContactRepositoryInterface
{
    /** @var Prooph\EventStore\Aggregate\AggregateRepository */
    private $repository;

    public function add(Contact $contact) { $this->repository->addAggregateRoot($contact); }

    public function get($id) { return $this->repository->getAggregateRoot($id); }
}

第二个是事件,它是记录在数据存储中的。所有事件都是过去时态。好处是您不必编写任何代码——只需扩展基类即可。

<?php
namespace Newsletter\Domain\Event;

use Prooph\EventSourcing\AggregateChanged;

class ContactCreated extends AggregateChanged
{
}

现在您需要让Symfony知道如何构造服务

# src/AppBundle/Resources/config/services.yml

parameters:

    form.contact.basic.type.class: AppBundle\Form\BasicContactType

services:

    aggregate.type.contact:
        class: Prooph\EventStore\Aggregate\AggregateType
        factory: [ 'Prooph\EventStore\Aggregate\AggregateType', 'fromAggregateRootClass']
        arguments: [ 'Newsletter\Domain\Aggregate\Contact' ]

    repository.aggregate.contact:
        class: Prooph\EventStore\Aggregate\AggregateRepository
        arguments: [ "@prooph.event.store", "@aggregate.type.contact", "@prooph.aggregate.translator" ]

    repository.document:
        class: AppBundle\Cqrs\Repository\ContactRepository
        arguments: [ "@repository.aggregate.contact" ]

    handler.create_contact:
        class: Newsletter\Domain\CommandHandler\CreateContactHandler
        arguments: [ "@repository.document" ]

    form.contact.basic.type:
        class: "%form.contact.basic.type.class%"
        tags:
            - { name: "form.type" }

    form.contact.basic:
        class: Symfony\Component\Form\FormInterface
        factory: [ "@form.factory", create ]
        arguments: [ "%form.contact.basic.type.class%" ]

    controller.contact.create:
        class: AppBundle\Controller\ContactCreateController
        arguments: [ "@prooph.command.bus", "@form.contact.basic", "@prooph.event.publisher" ]

请注意,传递给控制器的参数数量多于它接受的参数数量。@todo解释原因

现在如果您运行上面列出的curl命令,您应该得到HTTP 201 Created,并且您应该在MongoDB中看到此记录

{
    "_id" : "2d44331b-9d0d-47fa-b5be-01cd247c8a70",
    "version" : 1,
    "event_name" : "Newsletter\\Domain\\Event\\ContactCreated",
    "payload" : {
        "emailAddress" : "john@smith.local"
    },
    "created_at" : "2017-02-28T16:51:27.414000",
    "aggregate_id" : "john@smith.local",
    "aggregate_type" : "Newsletter\\Domain\\Aggregate\\Contact",
    "causation_id" : "21cd3129-0216-4437-86bc-9d7ede0bb08c",
    "causation_name" : "Newsletter\\Domain\\Command\\CreateContact"
}

由于命令和命令处理器之间存在一对一的关系,因此此bundle假定以下配置

  • 如果命令是\Any\Namespace\SomeCommand,则处理此命令的服务ID应称为handler.some_command
  • 命名空间被忽略,仅使用类名
  • 类名使用Symfony自己的CamelCaseToSnakeCaseNameConverter转换为snake_case

监听器和投影仪

尽管Prooph库不区分监听器和投影仪,但此bundle支持这种分离。

为什么要分离它?ES - Event Sourcing是一个将数据作为一系列事件存储的概念。在某个时候,您将想要回放所有事件并构建新的读取模型。这可能在各种情况下发生,例如当有人问您

  • “新闻通讯每月订阅频率是多少?”
  • 或“您能将此新闻通讯发送给所有在八月份订阅的人吗?”

该理念是只在事件实际发生时(实时)运行监听器。例如,您可能希望通知市场团队他们有新的通讯录注册。您不希望在重新播放事件以编译新的读取模型或新的报告时执行监听器。

另一方面,投影器可以根据需要运行多次。投影器的作用是更新读取模型。更新读取模型可能包括以下操作:

  • 编写SQL命令读取数据库
  • 将缓存写入Varnish
  • 直接将数据写入ElasticSearch
  • 构建静态HTML文件

一个监听器可能看起来像这样

<?php

namespace AppBundle\Cqrs\Listeners;

class MarketingNotificationListener
{
    /**
     * @var HipchatNotifier (from nixilla/hipchat-bundle)
     */
    private $notifier;
    
    public function __invoke(ContactCreated $event)
    {
        $payload = $event->payload();
        $message = sprintf("New Newsletter subscription, email address '%s'", $payload['emailAddress']);
        $this->notifier->notify('red', $message, 'text', true);
    }
}

为了让它工作,您需要让Symfony知道如何注入它。

# src/AppBundle/Resources/config/services.yml

services:

    listener.contact_created:
        class: AppBundle\Cqrs\Listeners\MarketingNotificationListener
        arguments: [ "@hipchat.notifier" ]
        tags:
            - { name: cqrs.event.listener, event: Newsletter\Domain\Event\ContactCreated }

单个事件可以有多个事件监听器和多个投影器。因此,为了配置监听器,您需要用{ name: cqrs.event.listener, event: Newsletter\Domain\Event\ContactCreated }标签服务。这将把监听器添加到给定事件类的监听器数组中,当该事件发生时,所有监听器都将被执行。

一个示例投影器可能看起来像这样

<?php

namespace AppBundle\Cqrs\Projectors;

class CampaignMonitorContactCreatedProjector
{
    private $campaignMonitor;
    
    public function __invoke(ContactCreated $event)
    {
        $payload = $event->payload();
        $this->campaignMonitor->subscribe($payload['emailAddress'], $listId = 'my list id');
    }
}

配置方式与上面监听器部分中看到的方式类似,但重要的是要用cqrs.event.projector标签。

# src/AppBundle/Resources/config/services.yml

services:

    projector.campaign_monitor.contact_created:
        class: AppBundle\Cqrs\Projectors\CampaignMonitorContactCreatedProjector
        arguments: [ "@campaign.monitor" ]
        tags:
            - { name: cqrs.event.projector, event: Newsletter\Domain\Event\ContactCreated }