agluh / outbox-bundle
实现了基于DDD的Symfony应用的Outbox模式
Requires
- php: ^7.4
- doctrine/doctrine-bundle: ^1.0 || ^2.0
- doctrine/orm: ^2.6
- ramsey/uuid-doctrine: ^1.6
- symfony/console: ^4.4 || ^5.0
- symfony/framework-bundle: ^4.4 || ^5.0
- symfony/lock: ^4.4 || ^5.0
- webmozart/assert: ^1.9
Requires (Dev)
- dama/doctrine-test-bundle: ^6.3
- doctrine/doctrine-fixtures-bundle: ^3.3
- matthiasnoback/symfony-dependency-injection-test: ^4.1
- nyholm/symfony-bundle-test: ^1.6
- phpstan/phpstan: ^0.12.40
- roave/security-advisories: dev-master
- symfony/dotenv: ^4.4 || ^5.0
- symfony/phpunit-bridge: ^4.4 || ^5.0
- symplify/easy-coding-standard: ^8.1
This package is auto-updated.
Last update: 2024-09-17 03:19:20 UTC
README
为基于DDD的Symfony应用实现了Outbox模式。
工作原理
- Bundle收集正在持久化的聚合域事件,并将它们在一个单独的事务中保存到数据库的另一个表中。
- 成功提交后,这些域事件将被排队以供发布。
- 如果Bundle配置了
auto_publish=true
选项,那么来自Outbox表的域事件将使用内核的TERMINATE或console.TERMINATE事件中的Symfony事件监听器进行处理。 - 如果Bundle配置了
auto_publish=false
选项,那么您应该使用以下CLI接口定期运行工作进程来处理存储的事件。
- 如果Bundle配置了
重要说明:事件将按照期望的发布日期(默认为注册域事件的日期)的升序顺序逐个排队以供发布。如果在DomainEventEnqueuedForPublishingEvent中未将域事件标记为已发布(即未设置发布日期),则Outbox将在下一次尝试将相同的域事件排队以供发布,直到成功。这确保了已发布域事件的时间一致性。
注意:您可以同时将自动发布与基于CLI的发布结合起来。锁定机制确保所有事件都将按正确顺序发布。
致谢
灵感来源于Domain Event Bundle。工作类主要基于symfony/messenger组件中的Worker。
安装
需要Symfony 4.4,或Symfony 5.x和PHP 7.4及更高版本
确保已全局安装Composer,如Composer文档中的安装章节所述。
使用Symfony Flex的应用程序
打开命令行,进入您的项目目录,并执行以下命令
$ composer require agluh/outbox-bundle
未使用Symfony Flex的应用程序
步骤1:下载Bundle
打开命令行,进入您的项目目录,并执行以下命令以下载此Bundle的最新稳定版本
$ composer require agluh/outbox-bundle
步骤2:启用Bundle
然后,通过将其添加到项目config/bundles.php
文件中注册的Bundle列表来启用Bundle
// config/bundles.php return [ // ... AGluh\Bundle\OutboxBundle\OutboxBundle::class => ['all' => true], ];
用法
Outbox Bundle主要通过应用程序事件集成到您的Symfony应用中。您应该实现以下示例中的监听器。
use Symfony\Component\EventDispatcher\EventSubscriberInterface; use AGluh\Bundle\OutboxBundle\Event\AggregateRootPreparedForPersistenceEvent; use AGluh\Bundle\OutboxBundle\Event\DomainEventPreparedForPersistenceEvent; use AGluh\Bundle\OutboxBundle\Event\DomainEventEnqueuedForPublishingEvent; use AGluh\Bundle\OutboxBundle\Event\DomainEventPublishedEvent; class OutboxIntegrationEventsListener implements EventSubscriberInterface { /** * In this method you should collect domain events from your aggregate root * and than call $event->addDomainEvent() to persist them to outbox. */ public function onAggregateRootPreparedForPersistence (AggregateRootPreparedForPersistenceEvent $event): void { $aggregateRoot = $event->aggregateRoot(); /** * Example: DomainEventPublisher is a base class or an interface * of your aggregate. */ if($aggregateRoot instanceof DomainEventPublisher) { /** * Here DomainEventPublisher::popEvents() is the method * that returns array of domain events for aggregate. */ foreach ($aggregateRoot->popEvents() as $domainEvent) { /** * Basically domain event can be any PHP type that supports serialization. * See Serialization section below in docs. */ $event->addDomainEvent($domainEvent); } } } /** * In this method you can alter date of expected publication for domain event. * By default it will be the date of registration of the event in outbox. */ public function onDomainEventPreparedForPersistence(DomainEventPreparedForPersistenceEvent $event): void { $domainEvent = $event->domainEvent(); /** * Here DomainEvent is an interface or base class for your domain event. */ if($domainEvent instanceof DomainEvent) { /** * In this example we use event occurrence date as date of expected publication. */ $event->changeExpectedPublicationDate($domainEvent->occurrenceDate()); } } /** * This function will be called by outbox bundle for each domain event should be published. */ public function onDomainEventEnqueuedForPublishing(DomainEventEnqueuedForPublishingEvent $event): void { // It makes sense to stop propagation for event $event->stopPropagation(); $domainEvent = $event->domainEvent(); // Do whatever you mention under 'publish event' here. For example, send message to RabbitMQ. // You MUST set publication date here to mark event as published in outbox table. $event->setPublicationDate(new \DateTimeImmutable()); } /** * This function will be called after outbox bundle persists domain event as published. */ public function onDomainEventPublished(DomainEventPublishedEvent $event): void { // Do something if you want } public static function getSubscribedEvents(): array { return [ AggregateRootPreparedForPersistenceEvent::class => 'onAggregateRootPreparedForPersistence', DomainEventPreparedForPersistenceEvent::class => 'onDomainEventPreparedForPersistence', DomainEventEnqueuedForPublishingEvent::class => 'onDomainEventEnqueuedForPublishing', DomainEventPublishedEvent::class => 'onDomainEventPublished' ]; } }
控制台命令
发布域事件
outbox:publish [options] Options: -l, --limit=LIMIT Limit the number of published events -m, --memory-limit=MEMORY-LIMIT The memory limit the worker can consume -t, --time-limit=TIME-LIMIT The time limit in seconds the worker can run -s, --sleep=SLEEP Seconds to sleep before asking for new unpublished events after no unpublished events were found. Applicable only for demonized worker [default: 1] -d, --daemonize Daemonize worker -b, --batch-size=BATCH-SIZE Limit the number of events worker can query at every iteration [default: 20] -h, --help Display this help message -q, --quiet Do not output any message -V, --version Display this application version --ansi Force ANSI output --no-ansi Disable ANSI output -n, --no-interaction Do not ask any interactive question -e, --env=ENV The Environment name. [default: "dev"] --no-debug Switches off debug mode. -v|vv|vvv, --verbose Increase the verbosity of messages: 1 for normal output, 2 for more verbose output and 3 for debug
PHP进程不是为长时间运行而设计的。因此,它必须定期退出。或者,命令可能因为错误或异常而退出。必须有人将其带回来并继续事件发布(如果未使用自动发布)。您可以使用Supervisord来做这件事。它启动进程并监视它们的工作状态。
以下是一个supervisord配置示例。
[program:outbox_worker] command=/path/to/bin/console outbox:publish --env=prod --no-debug --time-limit=3600 --daemonize process_name=%(program_name)s_%(process_num)02d numprocs=1 # There is no reason to use multiple workers here because of locking nature of events publication process autostart=true autorestart=true startsecs=0 redirect_stderr=true
注意:注意--time-limit,它告诉命令在60分钟后退出。
修剪已发布的域事件
outbox:prune-published
停止工作进程
outbox:stop-workers
自定义序列化器
默认情况下,Outbox Bundle使用PHP的serialize/unserialize函数在持久化过程中将域事件转换为字符串。您可以使用自定义序列化器来实现这一点。以下是如何使用symfony/serializer的示例。
namespace App\Service; use AGluh\Bundle\OutboxBundle\Serialization\SerializerInterface; use AGluh\Bundle\OutboxBundle\Exception\DomainEventDecodingFailedException; class CustomSerializer implements SerializerInterface { private \Symfony\Component\Serializer\SerializerInterface $serializer; // Constructor skipped public function encode($domainEvent): string { return $this->serializer->serialize($domainEvent, 'json'); } /** * @throws DomainEventDecodingFailedException */ public function decode(string $data) { // In this example we don't convert json back to an object and simply use it further return $data; } }
然后注册新的序列化器作为服务。
# config\services.yaml services: agluh_outbox_bundle.serializer: class: App\Service\CustomSerializer
默认配置
agluh_outbox: table_name: outbox # Name of outbox table for Doctrine mapping auto_publish: false # Publish domain events on kernel.TERMINATE or console.TERMINATE
贡献
欢迎贡献。已为您的方便配置了Composer脚本
> composer test # Run test suite (you should set accessible MySQL server with DATABASE_URL env variable)
> composer cs # Run coding standards checks
> composer cs-fix # Fix coding standards violations
> composer static # Run static analysis with Phpstan