webit/message-bus-sf-event-dispatcher

1.0.1 2018-01-07 19:19 UTC

This package is auto-updated.

Last update: 2024-09-20 22:48:22 UTC


README

Message Bus 的 Symfony 事件分发器基础设施

安装

composer require webit/message-bus-sf-event-dispatcher=^1.0.0

用法

发布者集成

要通过 Symfony 事件分发器发布 Message,请使用 EventDispatcherPublisher

MessageBusEventFactory

您需要告诉 EventDispatcherPublisher 如何将您的 Message 转换为 Symfony 事件分发器的名称和事件对象。实现并配置 MessageBusEventFactory

MessageBusEventFactory: 示例

假设您将要发布两种类型的消息:type-1type-2,并且您想将它们映射到 Symfony 事件分发器的两个不同事件 Event1Event2

use Symfony\Component\EventDispatcher\Event;

class Event1 extends Event
{
    private $x;
    
    public function __construct($x) {
        $this->x = $x;
    }
    
    public function x()
    {
        return $this->x;
    }
}

class Event2 extends Event
{
    private $y;
    
    private $z;
    
    public function __construct($y, $z) {
        $this->y = $y;
        $this->z = $z;
    }
    
    public function y()
    {
        return $this->y;
    }
    
    public function z()
    {
        return $this->z;
    }
}
选项 1:实现 MessageBusEventFactory
use Webit\MessageBus\Infrastructure\Symfony\EventDispatcher\Publisher\Event\MessageBusEventFactory;

class MessageBusEvent1Factory implements MessageBusEventFactory
{
    public function create(Message $message): MessageBusEventFactory
    {
        $arContent = json_decode($message->content(), true);
        return new MessageBusEvent(
            $message->type(),
            new Event1(isset($arContent['x']) ? $arContent['x'] : '') 
        );
    }
}

class MessageBusEvent2Factory implements MessageBusEventFactory
{
    public function create(Message $message): MessageBusEventFactory
    {
        $arContent = json_decode($message->content(), true);
        return new MessageBusEvent(
            $message->type(),
            new Event2(
                isset($arContent['y']) ? $arContent['y'] : '',
                isset($arContent['z']) ? $arContent['z'] : '',
            ) 
        );
    }
}

然后将这两个工厂组合在一起

use Webit\MessageBus\Infrastructure\Symfony\EventDispatcher\Publisher\Event\ByMessageTypeMessageBusEventFactory;
$messageBusEventFactory = new ByMessageTypeMessageBusEventFactory([
    'type-1' => new MessageBusEvent1Factory(),
    'type-2' => new MessageBusEvent2Factory()
]);
选项 2:使用 GenericMessageBusEventFactory 并实现其依赖项
use Webit\MessageBus\Infrastructure\Symfony\EventDispatcher\Publisher\Event\Symfony\CallbackSymfonyEventFactory;

$eventFactory1 = new CallbackSymfonyEventFactory(
    function (Message $message) {
        $arContent = json_decode($message->content(), true);
        return new MessageBusEvent(
            $message->type(),
            new Event1(isset($arContent['x']) ? $arContent['x'] : '') 
        );
    }
);

$eventFactory2 = new CallbackSymfonyEventFactory(
    function (Message $message) {
        $arContent = json_decode($message->content(), true);
        return new MessageBusEvent(
            $message->type(),
            new Event2(
                isset($arContent['y']) ? $arContent['y'] : '',
                isset($arContent['z']) ? $arContent['z'] : '',
            ) 
        );
    }
);

然后

use Webit\MessageBus\Infrastructure\Symfony\EventDispatcher\Publisher\Event\GenericMessageBusEventFactory;
$messageBusEventFactory1 = new GenericMessageBusEventFactory(
    $eventFactory1,
    new FromMessageTypeEventNameResolver() // optional, used be default, you can provide a different implemenation
);

$messageBusEventFactory2 = new GenericMessageBusEventFactory(
    $eventFactory2
);

// combine both factories together
use Webit\MessageBus\Infrastructure\Symfony\EventDispatcher\Publisher\Event\ByMessageTypeMessageBusEventFactory;

$messageBusEventFactory = new ByMessageTypeMessageBusEventFactory([
    'type-1' => $messageBusEventFactory1,
    'type-2' => $messageBusEventFactory2
]);
选项 3:实现自己的策略

因为 EventDispatcherPublisher 期望一个 MessageBusEventFactory 接口作为依赖项,您可以提供自己的实现。您还可以提供和组合 GenericMessageBusEventFactory 使用的内部接口:SymfonyEventFactoryEventNameResolver

如果您喜欢 JMSSerializer 来生成 Symfony 事件对象,请使用 JMSSerializerSymfonyEventFactory

将这些内容组合在一起

use Webit\MessageBus\Infrastructure\Symfony\EventDispatcher\Publisher\EventDispatcherPublisher;
use Webit\MessageBus\Message;
use Symfony\Component\EventDispatcher\EventDispatcher;

$eventDispatcher = new EventDispatcher(); 

$publisher = new EventDispatcherPublisher(
    $eventDispatcher,
    $messageBusEventFactory
);

$message = new Message('type-1', '{"x":"some-x"}');
$publisher->publish($message); // will be dispatched as "event-1" and event of "Event1" class

$message = new Message('type-2', '{"y":"some-y","z":"some-z"}');
$publisher->publish($message); // will be dispatched as "event-1" and event of "Event1" class

事件消费

为什么要消费事件呢?

  1. 将公共事件发布到 Message Bus 如果您想让某些事件是公共的,并且其他应用程序能够监听它们,请使用 PublishingConsumer 通过不同的基础设施(例如 AMQP)发布它们。

  2. 异步事件处理 如果您想异步处理某些事件,请使用 PublishingConsumer 通过不同的基础设施(例如 AMQP)发布它们,然后监听它们。

要消费从 Symfony 事件分发器的 Event 创建的 Message,请使用 EventConsumingListener。它需要提供 MessageFromEventFactoryConsumer

GenericMessageFromEventFactory

它需要 EventSerialiserMessageTypeResolver(默认使用事件名称)

选项 1:实现自己的 EventSerialiser
use Webit\MessageBus\Infrastructure\Symfony\EventDispatcher\Listener\Message\Content\EventSerialiser;
use Symfony\Component\EventDispatcher\Event;

class Event1Serializer implements EventSerialiser
{
    public function serialise(MessageBusEvent $event): string
    {
        $symfonyEvent = $event->event();
        if ($symfonyEvent instanceof Event1) {
            return json_encode(['x' => $symfonyEvent->x()]);
        }
        throw new \InvalidArgumentException('Event must be an instance of Event1.');
    }
}
选项 2:使用 JMSSerializer 序列化事件
use JMS\Serializer\SerializerBuilder;
use Webit\MessageBus\Infrastructure\Symfony\EventDispatcher\Listener\Message\Content\JmsEventSerialiser;
use Webit\MessageBus\Infrastructure\Symfony\EventDispatcher\Listener\Message\Content\EventOnlySerialisationDataProvider;

$serializerBuilder = SerializerBuilder::create();

// configure Serializer

$serializer = $serializerBuilder->build();

$jsmEventSerialiser = new JmsEventSerialiser(
    $serializer,
    new EventOnlySerialisationDataProvider(), // used by default, provides data to be passed to the JMSSerializer,
    JmsEventSerialiser::FORMAT_JSON // JSON by default, can be JmsEventSerialiser::FORMAT_XML as well
);

使用 FromMessageAwareEventMessageFromEventFactory

您的事件可以选择实现 MessageAwareEvent 接口。

use Webit\MessageBus\Infrastructure\Symfony\EventDispatcher\Listener\Message\MessageAwareEvent;
use Symfony\Component\EventDispatcher\Event;

class EventX extends Event implements MessageAwareEvent
{
    public function createMessage(string $eventName): Message
    {
        return new Message($eventName, json_decode(['some'=>'stuff']));
    }
}

然后您可以使用 FromMessageAwareEventMessageFromEventFactory 生成一个事件

将所有内容组合在一起

配置 MessageFromEventFactory

use Webit\MessageBus\Infrastructure\Symfony\EventDispatcher\Listener\Message\ByEventNameMessageFromEventFactory;

$messageFactory = new ByEventNameMessageFromEventFactory([
    'type-1' => new GenericMessageFromEventFactory(
        new Event1Serializer()
    ),
    'type-2' => new GenericMessageFromEventFactory($jsmEventSerialiser)
]);

创建一个监听器

use Webit\MessageBus\Infrastructure\Symfony\EventDispatcher\Listener\EventConsumingListener;

$listener = new EventConsumingListener(
    new VoidConsumer(),
    $messageFactory
);

将监听器注册到 Symfony 事件分发器上,以处理所有必需的事件

$eventDispatcher->addListener('type-1', $listener);
$eventDispatcher->addListener('type-2', $listener);

// will produce new Message('type-1', '{"x":"xxx"}') and pass to the consumer
$eventDispatcher->dispatch('type-1', new Event1('xxx'));

运行测试

使用 composer 安装依赖项

docker-compose run --rm composer
docker-compose run --rm spec