chapa-php / infrastructure
Requires
- chapa-php/application: ^0.1.0
- ecotone/enqueue: ^1.200
- ecotone/jms-converter: ^1.200
- ecotone/lite-application: ^1.200
- ecotone/redis: ^1.200
- ecotone/sqs: ^1.200
- enqueue/enqueue: ^0.10.19
- enqueue/rdkafka: ^0.10.19
- predis/predis: ^1.1
- ramsey/uuid: ^4.7
Requires (Dev)
- captainhook/captainhook: ^5.16
- captainhook/plugin-composer: ^5.3
- friendsofphp/php-cs-fixer: ^3.23
- jangregor/phpstan-prophecy: ^1.0
- marcocesarato/php-conventional-changelog: ^1.17
- mockery/mockery: ^1.6
- pestphp/pest: ^2.16
- pestphp/pest-plugin-drift: ^2.3
- pestphp/pest-plugin-faker: ^2.0
- pestphp/pest-plugin-type-coverage: ^2.0
- phpstan/extension-installer: ^1.3
- phpstan/phpstan: ^1.10
- phpstan/phpstan-beberlei-assert: ^1.1
- phpstan/phpstan-mockery: ^1.1
- phpstan/phpstan-phpunit: ^1.3
- phpstan/phpstan-strict-rules: ^1.5
- phpunit/php-code-coverage: ^10.1
- ramsey/conventional-commits: ^1.5
- struggle-for-php/sfp-phpstan-psr-log: ^0.13.0
- timeweb/phpstan-enum: ^3.1
This package is auto-updated.
Last update: 2023-12-01 03:11:46 UTC
README
为项目基础设施层提供抽象和标准化包。
消息总线
虽然该模块对框架是中立的,但我们将使用框架 Laravel 作为示例。
目前我们在底层使用 Lib Ecotone 作为消息总线,可以根据需要替换,只需遵守 MessageBusInterface 合同即可。
如果需要以纯形式使用消息总线,消息总线有一个名为 getRawBus
的函数,它返回配置的 bus 实例。
可能有帮助的链接: Ecotone
配置
为了开始在项目中使用 CQRS 和 EDA 标准,需要配置一个消息总线(commands/queries/events)。
在其依赖注入文件 App/Providers/AppServiceProvider.php 中注册
$this->app->singleton(JsonToPhpConverter::class, JsonToPhpConverter::class); $this->app->singleton(PhpToJsonConverter::class, PhpToJsonConverter::class); $this->app->singleton( MessageBusInterface::class, fn() => (new EcotoneLiteMessageBus()) ->withAvailableServices($this->app) ->withNamespaces(['App', 'FreightPayments']) ->withServiceName('escrow') ->run() );
其中
- JsonToPhpConverter 和 PhpToJsonConverter - Ecotone 的标准转换器。
- MessageBusInterface - 消息总线接口,用于控制反转。
- EcotoneLiteMessageBus - 消息总线(Ecotone)的标准实现
- withAvailableServices - 通知 Ecotone 容器或对象数组以进行库的推断和运行时依赖注入。
- withNamespaces - 通知 Ecotone 哪些命名空间应进行分析以捕获其注解。
- withServiceName - 通知 Ecotone 服务名称。
- run - 启动 Ecotone。
注意:此配置应在应用程序级别进行。
命令/查询
在配置消息总线后,将文件添加到功能依赖注入文件中
$this->app->when(Feature::class)->needs(Dispatcher::class)->give( function () { return new DispatcherBus($this->app->make(MessageBusInterface::class)); } );
其中
- Dispatcher - 消息分发器接口,用于控制反转。
- DispatcherBus - 消息分发器的实现,接受上面配置的消息总线实例作为参数。
分发命令/查询
命令的发送在技术上与查询相同,因此配置相同。
class FeatureController extends Controller { public function __construct( private readonly Dispatcher $dispatcher, private readonly ActionFactory $factory, ) { } public function __invoke(FeatureReq $req): JsonResponse { $request = $req->validated(); $command = $this->factory->create(FeatureActions::action, $request); $event = $this->dispatcher->dispatch($command); // ... } }
接收命令/查询
由于命令和查询的相似性,接收也遵循同样的规则。
在 feature/Infrastructure/Cqrs 文件夹中创建文件并添加
class FeatureCommandHandler { public function __construct(private FeatureHandler $handler) { } #[CommandHandler] public function createdNotification(string $event): void { $result = $this->handler->handle($event); if ($result->isFailure()) { throw $result->getError(); } } }
此文件作为应用程序层的桥梁,同时隔离命令总线(Ecotone)的知识和注解,从而使应用程序层对 bus 的细节保持无知。
其中
- FeatureHandler - 注入应用程序层的处理程序以执行编排规则。
- #[CommandHandler] - 指示接收的消息是命令类型。
创建文件后,需要将其注入到 DI 容器中,在文件 feature/Infrastructure/Providers/InfrastructureProvider.php 中添加
$this->app->bind(FeatureCommandHandler::class, FeatureCommandHandler::class, true);
事件
对于事件分派,我们将使用之前为命令/查询已实现的相同配置,因为分派器已经为三种类型的发送提供了通用实现。对于事件,需要配置消息将被发送到的队列/主题,这与在内存中执行的命令/查询不同。
第一步是配置ecotone的消费者命令,因为这些命令对于显示和/或列出应用程序中可用的端点事件是必需的。
在App/Commands文件夹下创建一个MessageBus文件夹,并添加两个文件
declare(strict_types=1); namespace App\Console\Commands\MessageBus; use ChapaPhp\Infrastructure\MessageBus\MessageBusInterface; use Illuminate\Console\Command; class MessageBusListCommand extends Command { /** * The name and signature of the console command. * * @var string */ protected $signature = 'message-bus:list'; /** * The console command description. * * @var string */ protected $description = 'list message bus channel consumers'; /** * Execute the console command. * * @return int */ public function handle(MessageBusInterface $messageBus): void { $command = $messageBus->listConsumersCommand(); $this->table($command['header'], $command['rows']); } }
declare(strict_types=1); namespace App\Console\Commands\MessageBus; use ChapaPhp\Infrastructure\MessageBus\MessageBusInterface; use Illuminate\Console\Command; class MessageBusRunCommand extends Command { /** * The name and signature of the console command. * * @var string */ protected $signature = 'message-bus:run {channelName} {verb=vvv}'; /** * The console command description. * * @var string */ protected $description = 'run message bus channel consumer'; /** * Execute the console command. * * @return int */ public function handle(MessageBusInterface $messageBus): void { $messageBus->runConsumerCommand($this->argument('channelName'), $this->argument('verb')); } }
如果一切顺利,执行php artisan list
命令将显示已配置的命令。
在app文件夹中,创建一个Ecotone文件夹,并在其中创建配置文件
class EcotoneChannelProvider { #[ServiceContext] public function enableEscrowChannel() { return [ KafkaDistribuitedBusConfiguration::createPublisher( busReferenceName: EscrowBus::class, topicName: env('KAFKA_ESCROW_TOPIC_NAME'), ), KafkaDistribuitedBusConfiguration::createConsumer( topicName: env('KAFKA_ESCROW_TOPIC_NAME'), endpointId: 'consumer', ), PollingMetadata::create('consumer') ->setEnabledErrorChannel(true) ->setErrorChannelName('errorChannelPublisher'), ]; } }
其中
- #[ServiceContext] - Ecotone的注解,用于指示服务配置
- KafkaDistribuitedBusConfiguration::createPublisher - 创建事件发布者的驱动程序,在本例中,它是一个具有Distributed发送类型的kafka代理
- busReferenceName(可选) - 有时需要发送多个事件,此参数告诉ecotone在引用此参数值时可以调用此主题的配置(如下面的示例),如果没有指定,则将作为一个默认发布者工作。
- topicName - 指定接收事件的主题名称(在本例中,kafka将接收事件)
- KafkaDistribuitedBusConfiguration::createConsumer - 创建事件消费者的驱动程序,在本例中,它是一个具有Distributed发送类型的kafka代理
- topicName - 指定消费者将连接以消费事件的主题名称。
- endpointId - 消费者通道的别名,此名称将在执行
php artisan message-bus:list
命令时显示
- PollingMetadata::create - 创建连接池,以防事件处理发生错误,负责将'失败'消息重定向到Dead Letter Queue(DLQ)
- setEnabledErrorChannel(true) - 启用DLQ
- setErrorChannelName('errorChannelPublisher') - 指定DLQ的服务激活器。
发送事件
事件发送在技术上与查询相同,因此配置对两者都相同。
在某些情况下,需要发送多个事件,为此有一些额外的配置
在app/ecotone/[configuration].php文件中,在其配置中添加busReferenceName,并提供对接口的引用(如上例所示)。在依赖注入文件中添加对分发器的新的注入,建议使用laravel的类型变体。
分发器有一个withPublisherBusReferenceName
函数,该函数接收busReferenceName配置中指定的引用。例如
$this->app->when(CreateFeatureController::class)->needs(TransactionDispatcher::class)->give( function () { $intance = $this->app->make(MessageBusInterface::class); return (new TransactionDispatcher($this->app->make(TransactionBus::class)))->withPublisherBusReferenceName(TransactionBus::class); } );
接收事件
在feature/Infrastructure/Eda中添加一个文件
class FeatureEventHandler { public function __construct(private FeatureHandler $handler) { } #[Distributed] #[EventHandler(listenTo: "Domain\\Events\\FeatureCreated")] public function createdNotification(FeatureEvent $event): void { $result = $this->handler->handle($event); if ($result->isFailure()) { throw $result->getError(); } } }
其中
- #[Distributed] - 指定配置的驱动程序类型,在上面的配置中,默认配置是distributed
- #[EventHandler(listenTo: "Domain\Events\FeatureCreated")] - 指定该函数是事件处理程序
- listenTo - 指定该处理程序将消费的事件路由,默认路由是事件自身的命名空间
与命令/查询类似,此文件具有将应用层作为桥梁的功能。
在创建命令后,在功能的infrastructure/creator文件夹中添加创建builder和director的文件
class FeatureEventBuilder implements Builder { private ?string $id = null; private ?array $payload = null; private ?array $headers = null; public function build(): Feature { return new Feature($this->id, $this->headers, $this->payload); } public function withId(string $id): self { $this->id = $id; return $this; } public function withHeaders(array $headers): self { $this->headers = $headers; return $this; } public function withPayload(array $payload): self { $this->payload = $payload; return $this; } public function targetType(): string { return Feature::class; } }
/** * @implements Director<FeaturePlaced, array> */ class FeaturePlacedEventDirector implements Director { public function __construct( private readonly FeaturePlacedEventBuilder $builder, ) { } public function make($data) { return $this->builder ->withId($data['messageHeader']['Identifier'] ?? '') ->withHeaders($data['messageHeader']) ->withPayload($data['data']) ->build(); } public function targetType(): string { return $this->builder->targetType(); } }
创建后,应在Ecotone的JsonToPhpConverter的构造函数中注入事件工厂,例如
$this->app->singleton(JsonToPhpConverter::class, function () { $factory = new AbstractJsonToPhpFactory(); $factory->addDirector(new FeatureEventDirector(new FeatureEventBuilder())); return new JsonToPhpConverter($factory); });
要启动事件消费者,请执行命令 php artisan message-bus:run {consumer}
,其中 {consumer} 是在 App/Ecotone/[configuration].php 文件中配置的 endpointId 键对应的名称。