agluh / outbox-bundle

实现了基于DDD的Symfony应用的Outbox模式

安装: 16

依赖项: 0

建议者: 0

安全: 0

星标: 8

关注者: 2

分支: 0

开放问题: 0

类型:symfony-bundle

v0.1.0 2020-09-03 12:25 UTC

This package is auto-updated.

Last update: 2024-09-17 03:19:20 UTC


README

Build Status Latest Stable Version Total Downloads License

为基于DDD的Symfony应用实现了Outbox模式

工作原理

  1. Bundle收集正在持久化的聚合域事件,并将它们在一个单独的事务中保存到数据库的另一个表中。
  2. 成功提交后,这些域事件将被排队以供发布。
    1. 如果Bundle配置了auto_publish=true选项,那么来自Outbox表的域事件将使用内核的TERMINATE或console.TERMINATE事件中的Symfony事件监听器进行处理。
    2. 如果Bundle配置了auto_publish=false选项,那么您应该使用以下CLI接口定期运行工作进程来处理存储的事件。

重要说明:事件将按照期望的发布日期(默认为注册域事件的日期)的升序顺序逐个排队以供发布。如果在DomainEventEnqueuedForPublishingEvent中未将域事件标记为已发布(即未设置发布日期),则Outbox将在下一次尝试将相同的域事件排队以供发布,直到成功。这确保了已发布域事件的时间一致性。

注意:您可以同时将自动发布与基于CLI的发布结合起来。锁定机制确保所有事件都将按正确顺序发布。

致谢

灵感来源于Domain Event Bundle。工作类主要基于symfony/messenger组件中的Worker。

安装

需要Symfony 4.4,或Symfony 5.x和PHP 7.4及更高版本

确保已全局安装Composer,如Composer文档中的安装章节所述。

使用Symfony Flex的应用程序

打开命令行,进入您的项目目录,并执行以下命令

$ composer require agluh/outbox-bundle

未使用Symfony Flex的应用程序

步骤1:下载Bundle

打开命令行,进入您的项目目录,并执行以下命令以下载此Bundle的最新稳定版本

$ composer require agluh/outbox-bundle

步骤2:启用Bundle

然后,通过将其添加到项目config/bundles.php文件中注册的Bundle列表来启用Bundle

// config/bundles.php

return [
    // ...
    AGluh\Bundle\OutboxBundle\OutboxBundle::class => ['all' => true],
];

用法

Outbox Bundle主要通过应用程序事件集成到您的Symfony应用中。您应该实现以下示例中的监听器。

use Symfony\Component\EventDispatcher\EventSubscriberInterface;
use AGluh\Bundle\OutboxBundle\Event\AggregateRootPreparedForPersistenceEvent;
use AGluh\Bundle\OutboxBundle\Event\DomainEventPreparedForPersistenceEvent;
use AGluh\Bundle\OutboxBundle\Event\DomainEventEnqueuedForPublishingEvent;
use AGluh\Bundle\OutboxBundle\Event\DomainEventPublishedEvent;

class OutboxIntegrationEventsListener implements EventSubscriberInterface
{
    /**
     * In this method you should collect domain events from your aggregate root
     * and than call $event->addDomainEvent() to persist them to outbox.
     */
    public function onAggregateRootPreparedForPersistence (AggregateRootPreparedForPersistenceEvent $event): void
    {
        $aggregateRoot = $event->aggregateRoot();
         
        /**
         * Example: DomainEventPublisher is a base class or an interface
         * of your aggregate.
         */
        if($aggregateRoot instanceof DomainEventPublisher) {
    
            /**
             * Here DomainEventPublisher::popEvents() is the method
             * that returns array of domain events for aggregate.
             */
            foreach ($aggregateRoot->popEvents() as $domainEvent) {
                /**
                 * Basically domain event can be any PHP type that supports serialization.
                 * See Serialization section below in docs.
                 */
                $event->addDomainEvent($domainEvent);
            }
        }
    }

    /**
     * In this method you can alter date of expected publication for domain event.
     * By default it will be the date of registration of the event in outbox.
     */
    public function onDomainEventPreparedForPersistence(DomainEventPreparedForPersistenceEvent $event): void
    {
        $domainEvent = $event->domainEvent();
        
        /**
         * Here DomainEvent is an interface or base class for your domain event.
         */
        if($domainEvent instanceof DomainEvent) {
            
            /**
             * In this example we use event occurrence date as date of expected publication.
             */
            $event->changeExpectedPublicationDate($domainEvent->occurrenceDate());
        }
    }
    
    /**
     * This function will be called by outbox bundle for each domain event should be published.
     */
    public function onDomainEventEnqueuedForPublishing(DomainEventEnqueuedForPublishingEvent $event): void
    {
        // It makes sense to stop propagation for event
        $event->stopPropagation();

        $domainEvent = $event->domainEvent();

        // Do whatever you mention under 'publish event' here. For example, send message to RabbitMQ.
        
        // You MUST set publication date here to mark event as published in outbox table.
        $event->setPublicationDate(new \DateTimeImmutable());
    }
    
    /**
     * This function will be called after outbox bundle persists domain event as published.
     */
    public function onDomainEventPublished(DomainEventPublishedEvent $event): void
    {
        // Do something if you want
    }

    public static function getSubscribedEvents(): array
    {
        return [
            AggregateRootPreparedForPersistenceEvent::class => 'onAggregateRootPreparedForPersistence',
            DomainEventPreparedForPersistenceEvent::class => 'onDomainEventPreparedForPersistence',
            DomainEventEnqueuedForPublishingEvent::class => 'onDomainEventEnqueuedForPublishing',
            DomainEventPublishedEvent::class => 'onDomainEventPublished'
        ];
    }
}

控制台命令

发布域事件

outbox:publish [options]

Options:
  -l, --limit=LIMIT                Limit the number of published events
  -m, --memory-limit=MEMORY-LIMIT  The memory limit the worker can consume
  -t, --time-limit=TIME-LIMIT      The time limit in seconds the worker can run
  -s, --sleep=SLEEP                Seconds to sleep before asking for new unpublished events after no unpublished events were found. Applicable only for demonized worker [default: 1]
  -d, --daemonize                  Daemonize worker
  -b, --batch-size=BATCH-SIZE      Limit the number of events worker can query at every iteration [default: 20]
  -h, --help                       Display this help message
  -q, --quiet                      Do not output any message
  -V, --version                    Display this application version
      --ansi                       Force ANSI output
      --no-ansi                    Disable ANSI output
  -n, --no-interaction             Do not ask any interactive question
  -e, --env=ENV                    The Environment name. [default: "dev"]
      --no-debug                   Switches off debug mode.
  -v|vv|vvv, --verbose             Increase the verbosity of messages: 1 for normal output, 2 for more verbose output and 3 for debug

PHP进程不是为长时间运行而设计的。因此,它必须定期退出。或者,命令可能因为错误或异常而退出。必须有人将其带回来并继续事件发布(如果未使用自动发布)。您可以使用Supervisord来做这件事。它启动进程并监视它们的工作状态。

以下是一个supervisord配置示例。

[program:outbox_worker]
command=/path/to/bin/console outbox:publish --env=prod --no-debug --time-limit=3600 --daemonize
process_name=%(program_name)s_%(process_num)02d
numprocs=1 # There is no reason to use multiple workers here because of locking nature of events publication process
autostart=true
autorestart=true
startsecs=0
redirect_stderr=true

注意:注意--time-limit,它告诉命令在60分钟后退出。

修剪已发布的域事件

outbox:prune-published

停止工作进程

outbox:stop-workers

自定义序列化器

默认情况下,Outbox Bundle使用PHP的serialize/unserialize函数在持久化过程中将域事件转换为字符串。您可以使用自定义序列化器来实现这一点。以下是如何使用symfony/serializer的示例。

namespace App\Service;

use AGluh\Bundle\OutboxBundle\Serialization\SerializerInterface;
use AGluh\Bundle\OutboxBundle\Exception\DomainEventDecodingFailedException;

class CustomSerializer implements SerializerInterface
{
    private \Symfony\Component\Serializer\SerializerInterface $serializer;
    
    // Constructor skipped

    public function encode($domainEvent): string
    {
        return $this->serializer->serialize($domainEvent, 'json');
    }
    
    /**
     * @throws DomainEventDecodingFailedException
     */
    public function decode(string $data)
    {
        // In this example we don't convert json back to an object and simply use it further
        return $data;
    }
}

然后注册新的序列化器作为服务。

# config\services.yaml
services:
    agluh_outbox_bundle.serializer:
        class: App\Service\CustomSerializer

默认配置

agluh_outbox:
    table_name: outbox      # Name of outbox table for Doctrine mapping
    auto_publish: false     # Publish domain events on kernel.TERMINATE or console.TERMINATE

贡献

欢迎贡献。已为您的方便配置了Composer脚本

> composer test       # Run test suite (you should set accessible MySQL server with DATABASE_URL env variable)
> composer cs         # Run coding standards checks
> composer cs-fix     # Fix coding standards violations
> composer static     # Run static analysis with Phpstan