djereg / symfony-rabbitmq
将 php-amqplib 集成到 Symfony Messenger & RabbitMq。
Requires
- php: ^8.2
- ext-pcntl: *
- datto/json-rpc: ^6.1
- illuminate/support: ^11.5
- php-amqplib/php-amqplib: ^3.0
- ramsey/uuid: ^4.7
- symfony/messenger: ^7.0
Requires (Dev)
- phpunit/phpunit: ^9.5
- psr/cache: ^3.0
- symfony/config: ^7.0
- symfony/console: ^7.0
- symfony/dependency-injection: ^7.0
- symfony/event-dispatcher: ^7.0
- symfony/framework-bundle: ^7.0
- symfony/http-kernel: ^7.0
- symfony/rate-limiter: ^7.0
- symfony/serializer: ^7.0
- symfony/yaml: ^7.0
Suggests
- symfony/framework-bundle: To use this lib as a full Symfony Bundle and to use the profiler data collector
Conflicts
- symfony/amqp-messenger: *
- symfony/config: <7.0
- symfony/console: <7.0
- symfony/messenger: <7.0
- symfony/yaml: <7.0
This package is auto-updated.
Last update: 2024-09-03 10:55:07 UTC
README
本包主要适用于在个人项目中内部/私有使用。如果它满足您的需求,请随意使用,但在修改请求的情况下,我将首先考虑自己的需求。
它仍然处于非常初期的开发阶段,因此我并不真正推荐现在使用它,因为任何东西都可能随时改变,并且之前的功能可能会中断。
本包是 rabbitmq-multiverse 的一部分。
目录
描述
本包是 Symfony AMQP Messenger Component 的替代品。功能非常相似,允许您启动消息消费者并将消息发送到 RabbitMQ。
通过添加通过 RabbitMQ 消息发送和接收事件和 RPC 查询的能力,扩展了原始 Symfony Messenger 的功能。
动机
由于微服务架构已经变得非常流行,我需要一个提供与不同编程语言或框架编写的服务通信的可能的库。
Symfony 有一个很好的消息系统,但它是一个封闭的仅适用于 Symfony 的系统。这个包允许您通过消息在 Symfony 和/或其他非 Symfony 微服务之间进行通信。
在简单的 JSON 消息之上,利用 Symfony Messenger 系统,它完美地完成了剩余的工作。
使用方法
安装
您可以通过以下命令使用 composer 安装此包:
composer require djereg/symfony-rabbitmq
配置
首先,您必须定义环境变量。
# Set the queue connection to rabbitmq RABBITMQ_DSN=amqp://guest:guest@rabbitmq:5672/%2f RABBITMQ_QUEUE_NAME=queue-name RABBITMQ_EXCHANGE_NAME=exchange-name RABBITMQ_EXCHANGE_TYPE=direct-name
然后,您必须将配置添加到 config/packages/messenger.yaml
文件中。
# config/packages/messenger.yaml framework: messenger: transports: # The name of the transport must be rabbitmq # If the transport is defined with a different name, # an exception will be thrown at runtime. rabbitmq: dsn: '%env(RABBITMQ_DSN)%' options: queue: name: '%env(RABBITMQ_QUEUE_NAME)%' exchange: name: '%env(RABBITMQ_EXCHANGE_NAME)%' type: '%env(RABBITMQ_EXCHANGE_TYPE)%'
启动消费者
要启动消费者,您必须运行以下命令。
php bin/console rabbitmq:consume
消费者将启动并监听传入的消息队列。
大多数选项与原始 Symfony Messenger 消费者相同。使用 -h
选项启动消费者以查看所有可用选项。
事件
提供基于事件的服务之间的异步通信。
分发事件
创建一个扩展 MessagePublishEvent
类的事件类。
use Djereg\Symfony\RabbitMQ\Event\MessagePublishEvent; class UserCreated extends MessagePublishEvent { // Set the event name protected string $event = 'user.created'; public function __construct(private User $user) { $this->user = $user; } // Create a payload method that returns the data to be sent public function payload(): array { return [ 'user_id' => $this->user->id, ]; } }
然后像任何其他 Symfony 事件一样分发事件。
几乎一样,只是有一点不同。您必须使用此包中包含的 EventDispatcher
,而不是 Symfony 事件调度器。
由于 Symfony 事件系统不支持在许多事件之上监听接口,因此 EventDispatcher
通过在内部调用 Symfony 事件调度器并传递 MessagePublishEvent
的全名,使这个技巧得以实现,并使监听此事件的监听器能够捕获实现此接口的所有事件。
use Djereg\Symfony\RabbitMQ\Service\EventDispatcher; class UserService { public function __construct( private EventDispatcher $dispatcher ) { // } public function createUser(User $user): void { // Dispatch the event $this->dispatcher->dispatch(new UserCreated($user)); } }
就是这样,并不复杂。
监听事件
创建一个事件监听器类,并像下面示例一样添加 AsMessageEventListener
属性。
您必须在属性中定义事件名称。事件名称必须与事件对象中定义的事件名称相同。
该属性的行为与Symfony事件监听器属性完全一样,但为服务添加了一个额外的标签,这有助于收集监听的事件。名称与Symfony属性不同,以避免混淆所使用的事件系统。
use Djereg\Symfony\RabbitMQ\Attribute\AsMessageEventListener; #[AsMessageEventListener('user.created')] class NotifyUserListener { public function __invoke(MessageEvent $event): void { // Do something } }
更多关于事件监听器的内容请参阅 Symfony 文档。您唯一需要记住的是,在监听器中定义事件名称。
监听器中的错误
当监听器中出现未处理错误时,消息将被重新入队,并将事件再次分发。这将继续发生,直到消息成功处理或达到最大尝试次数。如果多个监听器正在监听同一事件,则在抛出异常的第一个监听器处停止处理,其余监听器将不会被调用。
防止这种行为有两种方法。第一种是在监听器中捕获异常并处理它。第二种方法是监听事件并将消息放入队列,然后分别和异步地处理它们。这样,失败的消息就不会阻塞其他消息。
如何异步处理事件?
哦,这很简单!您需要一个中间监听器来自动将消息放入队列,还需要一个消息处理器来处理消息。
首先创建一个继承自 EventMessage
的消息。此消息将被发送到队列并由消息处理器处理。
use Djereg\Symfony\RabbitMQ\Message\EventMessage; class UserCreatedMessage extends EventMessage {}
然后创建一个继承自 MessageEventListener
的事件监听器。此监听器将监听事件并将消息自动放入队列。
use Djereg\Symfony\RabbitMQ\Attribute\AsMessageEventListener; use Djereg\Symfony\RabbitMQ\Listeners\MessageEventListener; #[AsMessageEventListener('user.created')] class UserCreatedListener extends MessageEventListener { protected string $message = UserCreatedMessage::class; }
最后,创建一个消息处理器来处理放入队列的消息。
#[AsMessageHandler] class UserCreatedMessageHandler { public function __invoke(UserCreatedMessage $message): void { // Get the event name $event = $message->getEvent(); // Get the event payload $payload = $event->getPayload(); // Get the event wrapped in the message $raw = $message->getMessageEvent(); } }
这很简单,不是吗?我知道,其实并不简单。但它有效。
订阅事件
消费者会自动创建不存在时交换和队列,并将所有监听的事件作为绑定键注册到队列上。
RPC
服务之间的类似同步的通信。
使用 JSON-RPC 2.0 协议进行通信。
注册客户端
要调用远程过程,您必须创建 Client
类的实例并将其注册到服务容器中。
# config/services.yaml services: users_client: class: Djereg\Symfony\RabbitMQ\Service\Client tags: - name: rabbitmq.rpc.client queue: users # Some example client definitions below orders_client: class: Djereg\Symfony\RabbitMQ\Service\Client tags: - name: rabbitmq.rpc.client queue: orders products_client: class: Djereg\Symfony\RabbitMQ\Service\Client tags: - name: rabbitmq.rpc.client queue: products
调用远程过程
创建一个服务并将客户端注入其中。
use Djereg\Symfony\RabbitMQ\Contract\ClientInterface; use Symfony\Component\DependencyInjection\Attribute\Autowire; class UserService { public function __construct( #[Autowire('users_client')] private ClientInterface $client, ) { // } public function getUser(int $id): User { // Call the remote procedure $user = $this->client->call('get', ['id' => $id]); // Return the user or do something else with it return $user; } }
注册远程过程
创建一个服务并添加 AsRemoteProcedure
属性。
它与上面描述的事件监听器非常相似。您可以将该属性添加到类或方法中。
use Djereg\Symfony\RabbitMQ\Attribute\AsRemoteProcedure; #[AsRemoteProcedure('get')] class GetUser { public function __invoke(int $id): array { // Query the database and return the result } }
或者将属性添加到方法的另一个示例。
use Djereg\Symfony\RabbitMQ\Attribute\AsRemoteProcedure; class UserService { #[AsRemoteProcedure('get')] public function getUser(int $id): array { // Query the database and return the result } // When adding the attribute to a method, you can omit the name of the procedure. // In this case, the name will be the same as the method name. #[AsRemoteProcedure] public function update(int $id, array $data): bool { // Update the user and return the result of the operation } }
当注册两个或多个具有相同名称的过程时,在启动时将抛出异常。
Symfony Messenger
原始Symfony Messenger组件的功能也可用。将消息 路由到 rabbitmq 传输,然后它们将被发送到队列并由消费者处理。
生命周期事件
MessagePublishingEvent
在消息发送到队列之前分发。
use Djereg\Symfony\RabbitMQ\Event\MessagePublishingEvent;
MessageReceivedEvent
在从队列接收到消息时分发。
use Djereg\Symfony\RabbitMQ\Event\MessageReceivedEvent;
MessageProcessingEvent
在处理消息时分发。
use Djereg\Symfony\RabbitMQ\Event\MessageProcessingEvent;
MessageProcessedEvent
在处理消息后分发。
use Djereg\Symfony\RabbitMQ\Event\MessageProcessedEvent;
已知问题
- 没有测试!我知道,我知道。我很快就会写。