djereg/symfony-rabbitmq

将 php-amqplib 集成到 Symfony Messenger & RabbitMq。

安装: 3

依赖者: 0

建议者: 0

安全性: 0

星星: 0

观察者: 1

分支: 0

开放问题: 0

类型:symfony-bundle

v0.1.2 2024-05-02 15:49 UTC

This package is auto-updated.

Last update: 2024-09-03 10:55:07 UTC


README

本包主要适用于在个人项目中内部/私有使用。如果它满足您的需求,请随意使用,但在修改请求的情况下,我将首先考虑自己的需求。

它仍然处于非常初期的开发阶段,因此我并不真正推荐现在使用它,因为任何东西都可能随时改变,并且之前的功能可能会中断。

本包是 rabbitmq-multiverse 的一部分。

目录

描述

本包是 Symfony AMQP Messenger Component 的替代品。功能非常相似,允许您启动消息消费者并将消息发送到 RabbitMQ。

通过添加通过 RabbitMQ 消息发送和接收事件和 RPC 查询的能力,扩展了原始 Symfony Messenger 的功能。

动机

由于微服务架构已经变得非常流行,我需要一个提供与不同编程语言或框架编写的服务通信的可能的库。

Symfony 有一个很好的消息系统,但它是一个封闭的仅适用于 Symfony 的系统。这个包允许您通过消息在 Symfony 和/或其他非 Symfony 微服务之间进行通信。

在简单的 JSON 消息之上,利用 Symfony Messenger 系统,它完美地完成了剩余的工作。

使用方法

安装

您可以通过以下命令使用 composer 安装此包:

composer require djereg/symfony-rabbitmq

配置

首先,您必须定义环境变量。

# Set the queue connection to rabbitmq
RABBITMQ_DSN=amqp://guest:guest@rabbitmq:5672/%2f

RABBITMQ_QUEUE_NAME=queue-name
RABBITMQ_EXCHANGE_NAME=exchange-name
RABBITMQ_EXCHANGE_TYPE=direct-name

然后,您必须将配置添加到 config/packages/messenger.yaml 文件中。

# config/packages/messenger.yaml

framework:
    messenger:
        transports:

            # The name of the transport must be rabbitmq
            # If the transport is defined with a different name,
            # an exception will be thrown at runtime.
            rabbitmq:
                dsn: '%env(RABBITMQ_DSN)%'
                options:
                    queue:
                        name: '%env(RABBITMQ_QUEUE_NAME)%'
                    exchange:
                        name: '%env(RABBITMQ_EXCHANGE_NAME)%'
                        type: '%env(RABBITMQ_EXCHANGE_TYPE)%'

启动消费者

要启动消费者,您必须运行以下命令。

php bin/console rabbitmq:consume

消费者将启动并监听传入的消息队列。

大多数选项与原始 Symfony Messenger 消费者相同。使用 -h 选项启动消费者以查看所有可用选项。

事件

提供基于事件的服务之间的异步通信。

分发事件

创建一个扩展 MessagePublishEvent 类的事件类。

use Djereg\Symfony\RabbitMQ\Event\MessagePublishEvent;

class UserCreated extends MessagePublishEvent
{
    // Set the event name
    protected string $event = 'user.created';

    public function __construct(private User $user)
    {
        $this->user = $user;
    }

    // Create a payload method that returns the data to be sent
    public function payload(): array
    {
        return [
            'user_id' => $this->user->id,
        ];
    }
}

然后像任何其他 Symfony 事件一样分发事件。

几乎一样,只是有一点不同。您必须使用此包中包含的 EventDispatcher,而不是 Symfony 事件调度器。

由于 Symfony 事件系统不支持在许多事件之上监听接口,因此 EventDispatcher 通过在内部调用 Symfony 事件调度器并传递 MessagePublishEvent 的全名,使这个技巧得以实现,并使监听此事件的监听器能够捕获实现此接口的所有事件。

use Djereg\Symfony\RabbitMQ\Service\EventDispatcher;

class UserService
{
    public function __construct(
        private EventDispatcher $dispatcher
    ) {
        //
    }

    public function createUser(User $user): void
    {
        // Dispatch the event
        $this->dispatcher->dispatch(new UserCreated($user));
    }
}

就是这样,并不复杂。

监听事件

创建一个事件监听器类,并像下面示例一样添加 AsMessageEventListener 属性。

您必须在属性中定义事件名称。事件名称必须与事件对象中定义的事件名称相同。

该属性的行为与Symfony事件监听器属性完全一样,但为服务添加了一个额外的标签,这有助于收集监听的事件。名称与Symfony属性不同,以避免混淆所使用的事件系统。

use Djereg\Symfony\RabbitMQ\Attribute\AsMessageEventListener;

#[AsMessageEventListener('user.created')]
class NotifyUserListener
{
    public function __invoke(MessageEvent $event): void
    {
        // Do something
    }
}

更多关于事件监听器的内容请参阅 Symfony 文档。您唯一需要记住的是,在监听器中定义事件名称。

监听器中的错误

当监听器中出现未处理错误时,消息将被重新入队,并将事件再次分发。这将继续发生,直到消息成功处理或达到最大尝试次数。如果多个监听器正在监听同一事件,则在抛出异常的第一个监听器处停止处理,其余监听器将不会被调用。

防止这种行为有两种方法。第一种是在监听器中捕获异常并处理它。第二种方法是监听事件并将消息放入队列,然后分别和异步地处理它们。这样,失败的消息就不会阻塞其他消息。

如何异步处理事件?

哦,这很简单!您需要一个中间监听器来自动将消息放入队列,还需要一个消息处理器来处理消息。

首先创建一个继承自 EventMessage 的消息。此消息将被发送到队列并由消息处理器处理。

use Djereg\Symfony\RabbitMQ\Message\EventMessage;

class UserCreatedMessage extends EventMessage {}

然后创建一个继承自 MessageEventListener 的事件监听器。此监听器将监听事件并将消息自动放入队列。

use Djereg\Symfony\RabbitMQ\Attribute\AsMessageEventListener;
use Djereg\Symfony\RabbitMQ\Listeners\MessageEventListener;

#[AsMessageEventListener('user.created')]
class UserCreatedListener extends MessageEventListener
{
    protected string $message = UserCreatedMessage::class;
}

最后,创建一个消息处理器来处理放入队列的消息。

#[AsMessageHandler]
class UserCreatedMessageHandler
{
    public function __invoke(UserCreatedMessage $message): void {

        // Get the event name
        $event = $message->getEvent();

        // Get the event payload
        $payload = $event->getPayload();

        // Get the event wrapped in the message
        $raw = $message->getMessageEvent();
    }
}

这很简单,不是吗?我知道,其实并不简单。但它有效。

订阅事件

消费者会自动创建不存在时交换和队列,并将所有监听的事件作为绑定键注册到队列上。

RPC

服务之间的类似同步的通信。

使用 JSON-RPC 2.0 协议进行通信。

注册客户端

要调用远程过程,您必须创建 Client 类的实例并将其注册到服务容器中。

# config/services.yaml

services:
    users_client:
        class: Djereg\Symfony\RabbitMQ\Service\Client
        tags:
            -   name: rabbitmq.rpc.client
                queue: users

    # Some example client definitions below
    orders_client:
        class: Djereg\Symfony\RabbitMQ\Service\Client
        tags:
            -   name: rabbitmq.rpc.client
                queue: orders

    products_client:
        class: Djereg\Symfony\RabbitMQ\Service\Client
        tags:
            -   name: rabbitmq.rpc.client
                queue: products

调用远程过程

创建一个服务并将客户端注入其中。

use Djereg\Symfony\RabbitMQ\Contract\ClientInterface;
use Symfony\Component\DependencyInjection\Attribute\Autowire;

class UserService
{
    public function __construct(
        #[Autowire('users_client')]
        private ClientInterface $client,
    ) {
        //
    }

    public function getUser(int $id): User
    {
        // Call the remote procedure
        $user = $this->client->call('get', ['id' => $id]);

        // Return the user or do something else with it
        return $user;
    }
}

注册远程过程

创建一个服务并添加 AsRemoteProcedure 属性。

它与上面描述的事件监听器非常相似。您可以将该属性添加到类或方法中。

use Djereg\Symfony\RabbitMQ\Attribute\AsRemoteProcedure;

#[AsRemoteProcedure('get')]
class GetUser
{
    public function __invoke(int $id): array
    {
        // Query the database and return the result
    }
}

或者将属性添加到方法的另一个示例。

use Djereg\Symfony\RabbitMQ\Attribute\AsRemoteProcedure;

class UserService
{
    #[AsRemoteProcedure('get')]
    public function getUser(int $id): array
    {
        // Query the database and return the result
    }

    // When adding the attribute to a method, you can omit the name of the procedure.
    // In this case, the name will be the same as the method name.
    #[AsRemoteProcedure]
    public function update(int $id, array $data): bool
    {
        // Update the user and return the result of the operation
    }
}

当注册两个或多个具有相同名称的过程时,在启动时将抛出异常。

Symfony Messenger

原始Symfony Messenger组件的功能也可用。将消息 路由到 rabbitmq 传输,然后它们将被发送到队列并由消费者处理。

生命周期事件

MessagePublishingEvent

在消息发送到队列之前分发。

use Djereg\Symfony\RabbitMQ\Event\MessagePublishingEvent;

MessageReceivedEvent

在从队列接收到消息时分发。

use Djereg\Symfony\RabbitMQ\Event\MessageReceivedEvent;

MessageProcessingEvent

在处理消息时分发。

use Djereg\Symfony\RabbitMQ\Event\MessageProcessingEvent;

MessageProcessedEvent

在处理消息后分发。

use Djereg\Symfony\RabbitMQ\Event\MessageProcessedEvent;

已知问题

  • 没有测试!我知道,我知道。我很快就会写。

许可

MIT许可