djereg / laravel-rabbitmq
Laravel RabbitMQ
Requires
- php: ^8.2
- ext-pcntl: *
- datto/json-rpc: ^6.1
- illuminate/contracts: ^10.0 || ^11.0
- illuminate/queue: ^10.0 || ^11.0
- illuminate/support: ^10.0 || ^11.0
- php-amqplib/php-amqplib: ^3.0
- vladimir-yuldashev/laravel-queue-rabbitmq: ^13.0 || ^14.0
Requires (Dev)
- orchestra/testbench: ^8.0 || ^9.0
README
本软件包主要面向内部/私有项目使用。如果它符合您的需求,请随意使用,但如果需要任何修改,我会首先考虑我的个人需求。
它目前处于非常早期的开发阶段,因此我目前不推荐使用它,因为任何东西都可能随时发生变化,并且之前的功能可能会中断。
本软件包是rabbitmq-multiverse的一部分。
目录
描述
本软件包是 RabbitMQ 和 Laravel 队列之间的中间层。
本软件包基于vladimir-yuldashev/laravel-queue-rabbitmq软件包,该软件包为 Laravel 添加了 RabbitMQ 作为队列驱动程序。
本软件包通过添加通过 RabbitMQ 消息发送和接收事件和 RPC 调用的能力,扩展了原始软件包的功能。
动机
由于微服务架构变得越来越流行,我需要一个库,该库可以提供与用不同编程语言或框架编写的服务进行通信的可能性。
Laravel 有一个强大的队列系统,但它是一个封闭的 Laravel 仅系统。本软件包允许您在 Laravel 和/或其他非 Laravel 微服务之间通过消息进行通信。
在简单的 JSON 消息之上,利用 Laravel 队列和事件系统,它完美地完成了其余的工作。
使用
安装
您可以使用以下命令通过 composer 安装此软件包
composer require djereg/laravel-rabbitmq
软件包将自动注册自身。
配置
配置通过环境变量完成。
# Set the queue connection to rabbitmq QUEUE_CONNECTION=rabbitmq RABBITMQ_HOST=rabbitmq RABBITMQ_PORT=5672 RABBITMQ_USER=guest RABBITMQ_PASSWORD=guest RABBITMQ_VHOST=/ RABBITMQ_QUEUE=queue-name RABBITMQ_EXCHANGE_NAME=exchange-name RABBITMQ_EXCHANGE_TYPE=direct
启动消费者
要启动消费者,只需运行以下命令
php artisan rabbitmq:consume
事件
提供基于事件的服务之间的异步通信。
派发事件
创建一个扩展本软件包提供的MessagePublishEvent
的事件类。
# app/Events/UserCreated.php namespace App\Events; use Djereg\Laravel\RabbitMQ\Events\MessagePublishEvent; class UserCreated extends MessagePublishEvent { // Set the event name protected string $event = 'user.created'; public function __construct(private User $user) { $this->user = $user; } // Create a payload method that returns the data to be sent public function payload(): array { return [ 'user_id' => $this->user->id, ]; } }
然后像任何其他 Laravel 事件一样派发事件。
event(new UserCreated($user));
监听事件
创建一个扩展本软件包提供的MessageEventListener
的事件监听器。
其工作原理与 Laravel 事件监听器略有不同。首先,您必须在$listen
属性中指定您想要监听的事件。接下来,而不是公共的handle()
方法,您必须定义onEvent()
方法。这是因为handle()
方法已经被基础MessageEventListener
类内部使用。
# app/Listeners/NotifyUser.php namespace App\Listeners; use Djereg\Laravel\RabbitMQ\Listeners\MessageEventListener; class NotifyUser extends MessageEventListener { // Specify the events you want to listen to. // You can listen to multiple events by adding them to the array. public static array $listen = [ 'user.created', // 'user.updated', // 'user.deleted', // etc ]; // The method that will be called when the event is received. // The event object is passed as an argument containing the event name and payload. protected function onEvent(MessageEvent $event): void { } // You can also define separate methods for each event. // The method name must be in the format on{EventName} where {EventName} // is the StudlyCase format of the event defined in the $listen property. // When both methods are defined, the on{EventName} method will be called. protected function onUserCreated(MessageEvent $event): void { // } // If no on{EventName} or onEvent method is defined, an exception will be thrown. }
事件监听器中的错误
由于事件监听器默认情况下是同步处理的,如果发生错误,作业将失败,并且如果启用了重试机制并进行了配置,它将重试。
如果有多个监听器监听同一个事件,处理将在第一个抛出错误的监听器处停止,其余的监听器将不会被执行。
您有多种方法来防止这种行为
- 在监听器中运行try-catch块,并在监听器中处理错误。
- 将监听器放入队列中,异步处理事件。这样,失败的监听器(作业)不会阻塞其他监听器的处理。
异步处理事件
默认情况下,事件是同步处理的,但您可以通过实现ShouldQueue
接口来异步处理它们。
# app/Listeners/NotifyUser.php namespace App\Listeners; use Djereg\Laravel\RabbitMQ\Listeners\MessageEventListener; class NotifyUser extends MessageEventListener implements ShouldQueue { // Listener content }
订阅事件
在启动消费者时,如果它们不存在,将自动创建交换和队列,但为了注册要监听的事件,您必须修改EventServiceProvider
以扩展由本包提供的EventServiceProvider
。在Laravel 11中,默认情况下不存在EventServiceProvider
,因此您必须手动创建和注册它。请参阅下面的示例。
# app/Providers/EventServiceProvider.php namespace App\Providers; use Djereg\Laravel\RabbitMQ\Providers\EventServiceProvider as ServiceProvider; class EventServiceProvider extends ServiceProvider { // Provider content }
服务提供程序从app/Listeners
目录发现所有事件监听器,并且当消费者启动时,路由键将自动绑定。
远程过程调用(RPC)
服务之间的类似同步通信。
使用JSON-RPC 2.0协议进行通信。
注册客户端
要调用远程过程,您必须创建Client
类的实例,并将其注入到您想使用的服务中。最佳做法是在服务提供程序中这样做。
# app/Providers/AppServiceProvider.php namespace App\Providers; use Djereg\Laravel\RabbitMQ\Services\Client; class AppServiceProvider extends ServiceProvider { public function register(): void { $this->app->singleton(UserService::class, function($app) { // Instantiate the client with the remote service name and the queue connection $client = new Client('users', $app['queue.connection']) // Inject the client into the service return new UserService($client); }); } }
无论如何,您可以在任何位置创建客户端实例,但请记住将队列连接作为第二个参数传递。
$client = new Client('users', app('queue.connection'));
调用远程过程
在注入客户端的服务中,您可以调用远程过程。
# app/Services/UserService.php namespace App\Services; use Djereg\Laravel\RabbitMQ\Services\Client; class UserService { public function __construct(private Client $users) {} public function getUser(int $id): mixed { // Call the remote procedure $user = $this->users->call('get', ['id' => $id]); // Process the response and return it } }
注册远程过程
注册由本包提供的ProcedureServiceProvider
。服务提供程序将默认从app/Procedures
目录自动发现所有过程。自动发现仅在控制台模式下启动应用程序时运行。
如果您想自定义服务提供程序,您可以创建自己的,它扩展了本包提供的ProcedureServiceProvider
类,并将其注册。
# app/Providers/ProcedureServiceProvider.php namespace App\Providers; use Djereg\Laravel\RabbitMQ\Providers\ProcedureServiceProvider as ServiceProvider; class ProcedureServiceProvider extends ServiceProvider { // }
处理过程调用
当服务提供程序注册后,您可以在app/Procedures
目录中创建过程。创建一个扩展Procedure
类的类,并使用过程的名称定义method
属性。
# app/Procedures/GetUser.php namespace App\Procedures; use Djereg\Laravel\RabbitMQ\Procedures\Procedure; class GetUser extends Procedure { // Set the procedure name that will be called by the client public static string $name = 'get'; public function __invoke(int $id): mixed { // Get the user from the database and return it } // OR public function handle(int $id): mixed { // Get the user from the database and return it } }
您可以定义__invoke()
或handle()
方法来处理过程调用。如果定义了两种方法,则会抛出异常,因为一个过程不允许有多个处理程序。对于具有相同名称的多个过程类也适用。
过程调用是如何处理的?
当收到过程调用消息时,请求体将传递给datto/php-json-rpc服务器组件,该组件处理请求并调用匹配的过程,最后返回响应对象,并将其发送回请求者。
Laravel 队列
此包也支持Laravel Queue。您可以将作业发送到队列,消费者将像原始的Laravel队列工作者一样处理它们。
生命周期事件
在消息处理过程中,此包会发出事件。
消息发布
在消息发布之前发出。
use Djereg\Laravel\RabbitMQ\Events\MessagePublishing;
消息已发布
在消息发布后发出。
use Djereg\Laravel\RabbitMQ\Events\MessagePublished;
消息处理
在消息处理之前发出。
use Djereg\Laravel\RabbitMQ\Events\MessageReceived;
消息已处理
在消息处理之后发出。
use Djereg\Laravel\RabbitMQ\Events\MessageProcessed;
已知问题
- 没有测试!我知道,我知道。我很快就会写。