chapa-php/infrastructure

该软件包最新版本(0.10.0)没有提供许可证信息。


README

为项目基础设施层提供抽象和标准化包。

消息总线

虽然该模块对框架是中立的,但我们将使用框架 Laravel 作为示例。

目前我们在底层使用 Lib Ecotone 作为消息总线,可以根据需要替换,只需遵守 MessageBusInterface 合同即可。

如果需要以纯形式使用消息总线,消息总线有一个名为 getRawBus 的函数,它返回配置的 bus 实例。

可能有帮助的链接: Ecotone

配置

为了开始在项目中使用 CQRSEDA 标准,需要配置一个消息总线(commands/queries/events)。

在其依赖注入文件 App/Providers/AppServiceProvider.php 中注册

$this->app->singleton(JsonToPhpConverter::class, JsonToPhpConverter::class);
$this->app->singleton(PhpToJsonConverter::class, PhpToJsonConverter::class);
$this->app->singleton(
            MessageBusInterface::class,
            fn() => (new EcotoneLiteMessageBus())
                ->withAvailableServices($this->app)
                ->withNamespaces(['App', 'FreightPayments'])
                ->withServiceName('escrow')
                ->run()
        );

其中

  • JsonToPhpConverter 和 PhpToJsonConverter - Ecotone 的标准转换器。
  • MessageBusInterface - 消息总线接口,用于控制反转。
  • EcotoneLiteMessageBus - 消息总线(Ecotone)的标准实现
    • withAvailableServices - 通知 Ecotone 容器或对象数组以进行库的推断和运行时依赖注入。
    • withNamespaces - 通知 Ecotone 哪些命名空间应进行分析以捕获其注解。
    • withServiceName - 通知 Ecotone 服务名称。
    • run - 启动 Ecotone。

注意:此配置应在应用程序级别进行。

命令/查询

在配置消息总线后,将文件添加到功能依赖注入文件中

$this->app->when(Feature::class)->needs(Dispatcher::class)->give(
            function () {
                return new DispatcherBus($this->app->make(MessageBusInterface::class));
            }
        );

其中

  • Dispatcher - 消息分发器接口,用于控制反转。
  • DispatcherBus - 消息分发器的实现,接受上面配置的消息总线实例作为参数。

分发命令/查询

命令的发送在技术上与查询相同,因此配置相同。

class FeatureController extends Controller
{
   public function __construct(
       private readonly Dispatcher $dispatcher,
       private readonly ActionFactory $factory,
   ) {
   }

   public function __invoke(FeatureReq $req): JsonResponse
   {
       $request = $req->validated();
       $command = $this->factory->create(FeatureActions::action, $request);
       $event = $this->dispatcher->dispatch($command);
      // ...
   }
}

接收命令/查询

由于命令和查询的相似性,接收也遵循同样的规则。

在 feature/Infrastructure/Cqrs 文件夹中创建文件并添加

class FeatureCommandHandler
{
 public function __construct(private FeatureHandler $handler)
    {
    }

     #[CommandHandler]
    public function createdNotification(string $event): void
    {
         $result = $this->handler->handle($event);
         if ($result->isFailure()) {
             throw $result->getError();
         }
    }
}

此文件作为应用程序层的桥梁,同时隔离命令总线(Ecotone)的知识和注解,从而使应用程序层对 bus 的细节保持无知。

其中

  • FeatureHandler - 注入应用程序层的处理程序以执行编排规则。
  • #[CommandHandler] - 指示接收的消息是命令类型。

创建文件后,需要将其注入到 DI 容器中,在文件 feature/Infrastructure/Providers/InfrastructureProvider.php 中添加

 $this->app->bind(FeatureCommandHandler::class, FeatureCommandHandler::class, true);

事件

对于事件分派,我们将使用之前为命令/查询已实现的相同配置,因为分派器已经为三种类型的发送提供了通用实现。对于事件,需要配置消息将被发送到的队列/主题,这与在内存中执行的命令/查询不同。

第一步是配置ecotone的消费者命令,因为这些命令对于显示和/或列出应用程序中可用的端点事件是必需的。

在App/Commands文件夹下创建一个MessageBus文件夹,并添加两个文件

declare(strict_types=1);

namespace App\Console\Commands\MessageBus;

use ChapaPhp\Infrastructure\MessageBus\MessageBusInterface;
use Illuminate\Console\Command;

class MessageBusListCommand extends Command
{
    /**
     * The name and signature of the console command.
     *
     * @var string
     */
    protected $signature = 'message-bus:list';

    /**
     * The console command description.
     *
     * @var string
     */
    protected $description = 'list message bus channel consumers';

    /**
     * Execute the console command.
     *
     * @return int
     */
    public function handle(MessageBusInterface $messageBus): void
    {
        $command = $messageBus->listConsumersCommand();
        $this->table($command['header'], $command['rows']);
    }
}
declare(strict_types=1);

namespace App\Console\Commands\MessageBus;

use ChapaPhp\Infrastructure\MessageBus\MessageBusInterface;
use Illuminate\Console\Command;

class MessageBusRunCommand extends Command
{
    /**
     * The name and signature of the console command.
     *
     * @var string
     */
    protected $signature = 'message-bus:run {channelName} {verb=vvv}';

    /**
     * The console command description.
     *
     * @var string
     */
    protected $description = 'run message bus channel consumer';

    /**
     * Execute the console command.
     *
     * @return int
     */
    public function handle(MessageBusInterface $messageBus): void
    {
        $messageBus->runConsumerCommand($this->argument('channelName'), $this->argument('verb'));
    }
}

如果一切顺利,执行php artisan list命令将显示已配置的命令。

app文件夹中,创建一个Ecotone文件夹,并在其中创建配置文件

class EcotoneChannelProvider
{
    #[ServiceContext]
    public function enableEscrowChannel()
    {
        return [
            KafkaDistribuitedBusConfiguration::createPublisher(
                busReferenceName: EscrowBus::class,
                topicName: env('KAFKA_ESCROW_TOPIC_NAME'),
            ),
            KafkaDistribuitedBusConfiguration::createConsumer(
                topicName: env('KAFKA_ESCROW_TOPIC_NAME'),
                endpointId: 'consumer',
            ),
            PollingMetadata::create('consumer')
                ->setEnabledErrorChannel(true)
                ->setErrorChannelName('errorChannelPublisher'),
        ];
    }
}

其中

  • #[ServiceContext] - Ecotone的注解,用于指示服务配置
  • KafkaDistribuitedBusConfiguration::createPublisher - 创建事件发布者的驱动程序,在本例中,它是一个具有Distributed发送类型的kafka代理
    • busReferenceName(可选) - 有时需要发送多个事件,此参数告诉ecotone在引用此参数值时可以调用此主题的配置(如下面的示例),如果没有指定,则将作为一个默认发布者工作。
    • topicName - 指定接收事件的主题名称(在本例中,kafka将接收事件)
  • KafkaDistribuitedBusConfiguration::createConsumer - 创建事件消费者的驱动程序,在本例中,它是一个具有Distributed发送类型的kafka代理
    • topicName - 指定消费者将连接以消费事件的主题名称。
    • endpointId - 消费者通道的别名,此名称将在执行php artisan message-bus:list命令时显示
  • PollingMetadata::create - 创建连接池,以防事件处理发生错误,负责将'失败'消息重定向到Dead Letter Queue(DLQ)
    • setEnabledErrorChannel(true) - 启用DLQ
    • setErrorChannelName('errorChannelPublisher') - 指定DLQ的服务激活器。

发送事件

事件发送在技术上与查询相同,因此配置对两者都相同。

在某些情况下,需要发送多个事件,为此有一些额外的配置

在app/ecotone/[configuration].php文件中,在其配置中添加busReferenceName,并提供对接口的引用(如上例所示)。在依赖注入文件中添加对分发器的新的注入,建议使用laravel的类型变体。

分发器有一个withPublisherBusReferenceName函数,该函数接收busReferenceName配置中指定的引用。例如

$this->app->when(CreateFeatureController::class)->needs(TransactionDispatcher::class)->give(
            function () {
                $intance = $this->app->make(MessageBusInterface::class);
                return (new TransactionDispatcher($this->app->make(TransactionBus::class)))->withPublisherBusReferenceName(TransactionBus::class);
            }
        );

接收事件

在feature/Infrastructure/Eda中添加一个文件

class FeatureEventHandler
{
    public function __construct(private FeatureHandler $handler)
    {
    }

    #[Distributed]
    #[EventHandler(listenTo: "Domain\\Events\\FeatureCreated")]
    public function createdNotification(FeatureEvent $event): void
    {
         $result = $this->handler->handle($event);
         if ($result->isFailure()) {
             throw $result->getError();
         }
    }
}

其中

  • #[Distributed] - 指定配置的驱动程序类型,在上面的配置中,默认配置是distributed
  • #[EventHandler(listenTo: "Domain\Events\FeatureCreated")] - 指定该函数是事件处理程序
    • listenTo - 指定该处理程序将消费的事件路由,默认路由是事件自身的命名空间

与命令/查询类似,此文件具有将应用层作为桥梁的功能。

在创建命令后,在功能的infrastructure/creator文件夹中添加创建builderdirector的文件

class FeatureEventBuilder implements Builder
{
    private ?string $id = null;
    private ?array $payload = null;
    private ?array $headers = null;
    public function build(): Feature
    {
        return new Feature($this->id, $this->headers, $this->payload);
    }

    public function withId(string $id): self
    {
        $this->id = $id;
        return $this;
    }

    public function withHeaders(array $headers): self
    {
        $this->headers = $headers;
        return $this;
    }
    public function withPayload(array $payload): self
    {
        $this->payload = $payload;
        return $this;
    }

    public function targetType(): string
    {
        return Feature::class;
    }

}
/**
 * @implements Director<FeaturePlaced, array>
 */
class FeaturePlacedEventDirector implements Director
{
    public function __construct(
        private readonly FeaturePlacedEventBuilder $builder,
    ) {
    }

    public function make($data)
    {
        return $this->builder
            ->withId($data['messageHeader']['Identifier'] ?? '')
            ->withHeaders($data['messageHeader'])
            ->withPayload($data['data'])
            ->build();
    }

    public function targetType(): string
    {
        return $this->builder->targetType();
    }
}

创建后,应在Ecotone的JsonToPhpConverter的构造函数中注入事件工厂,例如

$this->app->singleton(JsonToPhpConverter::class, function () {
            $factory = new AbstractJsonToPhpFactory();
            $factory->addDirector(new FeatureEventDirector(new FeatureEventBuilder()));
            return new JsonToPhpConverter($factory);
        });

要启动事件消费者,请执行命令 php artisan message-bus:run {consumer},其中 {consumer} 是在 App/Ecotone/[configuration].php 文件中配置的 endpointId 键对应的名称。