girni / laravel-rabbitmq
扩展 https://github.com/vyuldashev/laravel-queue-rabbitmq 以支持微服务通信的 Laravel RabbitMQ 包。
Requires
- php: >=8.0
- illuminate/support: ^9.0|^10.0
- vladimir-yuldashev/laravel-queue-rabbitmq: ^13.0
Requires (Dev)
- orchestra/testbench: ^7.0
- phpunit/phpunit: ^9.0
README
扩展 https://github.com/vyuldashev/laravel-queue-rabbitmq 以支持微服务通信的 Laravel RabbitMQ 包。
安装
composer require girni/laravel-rabbitmq
在您的 composer.json
文件中,在 require
部分 repositories
{ "require": { "girni/laravel-rabbitmq": "^1.0" } }
之后,只需运行 composer install
或 composer update girni/laravel-rabbitmq
以安装它。
该包将自动注册自己。
将连接添加到 config/queue.php
'connections' => [ // ... 'rabbitmq' => [ 'driver' => 'rabbitmq', 'queue' => env('RABBITMQ_QUEUE', 'default'), 'connection' => PhpAmqpLib\Connection\AMQPLazyConnection::class, 'hosts' => [ [ 'host' => env('RABBITMQ_HOST', '127.0.0.1'), 'port' => env('RABBITMQ_PORT', 5672), 'user' => env('RABBITMQ_USER', 'guest'), 'password' => env('RABBITMQ_PASSWORD', 'guest'), 'vhost' => env('RABBITMQ_VHOST', '/'), ], ], 'options' => [ 'ssl_options' => [ 'cafile' => env('RABBITMQ_SSL_CAFILE', null), 'local_cert' => env('RABBITMQ_SSL_LOCALCERT', null), 'local_key' => env('RABBITMQ_SSL_LOCALKEY', null), 'verify_peer' => env('RABBITMQ_SSL_VERIFY_PEER', true), 'passphrase' => env('RABBITMQ_SSL_PASSPHRASE', null), ], 'queue' => [ 'job' => Girni\LaravelRabbitMQ\LaravelRabbitMQJobHandler::class, ], ], /* * Set to "horizon" if you wish to use Laravel Horizon. */ 'worker' => env('RABBITMQ_WORKER', Girni\LaravelRabbitMQ\Queue\RabbitMQQueue::class), ], // ... ],
发布 laravel-rabbitmq.php
配置文件
php artisan vendor:publish --provider="Girni\LaravelRabbitMQ\LaravelRabbitMQServiceProvider" --tag="config"
用法
在 .env 文件中定义队列
为了确保我们的应用程序之间通信运行顺畅,有必要为每个应用程序定义单独的队列。我们可以在 .env
文件中完成。
RABBITMQ_QUEUE=my-application-queue
定义 RABBIT_QUEUE
将导致我们的应用程序只从该队列接收消息。通过为每个应用程序创建不同的队列,我们保证我们的消息不会被其他应用程序以错误的方式处理。
创建生产者
生产意味着发送。发送消息的类是一个生产者。我们的生产者类应尽可能简单。它应包含通过构造函数传递的消息数据和必须实现合约的 ::name()
。
::name()
的值非常重要,它被用来识别我们应该运行哪个消费者来处理产生的消息。
<?php namespace App\Jobs\Producer; use Girni\LaravelRabbitMQ\Message\MessageInterface; use Girni\LaravelRabbitMQ\Producer\AbstractProducer; class PingJobProducer extends AbstractProducer { public function __construct(MessageInterface $message) { parent::__construct($message); } public function name(): string { return 'ping:job'; } }
分发消息
要将消息发送到 RabbitMQ,我们可以使用 Laravel 分发器以多种方式。更多详情请参阅 https://laravel.net.cn/docs/9.x/queues#dispatching-jobs
PingJobProducer::dispatch(\Girni\LaravelRabbitMQ\Message\BaseMessage::fromArray([ 'key1' => 'value', 'key2' => 'value2' ]))->onQueue('my-queue')
定义要发送消息的队列非常重要,我们可以使用 ::onQueue()
方法来定义。如前所述,每个应用程序应该只从自己的队列接收消息。因此,如果我们想使我们的消息被另一个应用程序处理,我们需要将其发送到该应用程序的队列。
非 Laravel 发送者的消息结构
JSON 示例
{ "job": "producer", "data": { "key": "value" } }
创建消费者
消费者类不过是生产者的“处理程序”。这个类必须处理生产者发送的数据。
namespace App\Jobs\Consumer; use Girni\LaravelRabbitMQ\Consumer\ConsumerInterface; class PingJobConsumer implements ConsumerInterface { use Dispatchable; use InteractsWithQueue; use Queueable; use SerializesModels; public function producer(): string { return 'ping:job'; // this value must be same as Producer ::name() method we want to consume. } public function handle(MessageInterface $message): void { $key1 = $message->get('key1'); // value $key2 = $message->get('key2'); // value2 \DB::table('test-table')->create(['key1' => $key1, 'key2' => $key2]); } }
注册消费者
在您的应用程序中,在 config\laravel-rabbitmq.php
文件中的 consumers
数组中,您必须注册您想要在应用程序中使用每个消费者。您的配置应该看起来像
<?php /* * You can place your custom package configuration in here. */ return [ 'consumers' => [ \App\Jobs\Consumer\PingJobConsumer::class, \App\Jobs\Consumer\MyTestConsumer::class ] ];
消费消息
要开始消费产生的消息,我们需要运行我们的队列
php artisan rabbitmq:consume
或标准的 Laravel 流程
php artisan queue:work
高级用法 - 自定义消息类
作为标准消息存储,我们实现了 Girni\LaravelRabbitMQ\Message\BaseMessage::class
,它具有简单的接口,操作数组键。
它包含一些辅助方法,如 ::get($key), ::set($key)
,以帮助客户端从消息中获取/设置数据。
然而,在某些情况下,我们的消息可能非常复杂,我们可能更喜欢将数据存储在具有面向对象接口的专用类(DTO)中。
我们可以通过将Girni\LaravelRabbitMQConsumer\HasCustomMessage
接口添加到我们的消费者类中来实现。这个接口包含一个::message()
方法。在这个方法的主体中,您应该传递一个全限定类名(FQCN)到您的类。
重要:请确保您的DTO类实现了Girni\LaravelRabbitMQ\Message\MessageInterface
接口。我们库创建对象的全部逻辑都隐藏在::fromArray(array $data)
方法中。
namespace App\Jobs\Consumer\Message; use Girni\LaravelRabbitMQ\Message\MessageInterface; class PingJobMessage implements MessageInterface { private string $id; private string $name; private Carbon $date; public function __construct(string $id, string $name, Carbon $date) { $this->id = $id; $this->name = $name; $this->date = $date; } public function getId(): string { return $this->id; } public function getName(): string { return $this->name; } public function getDate(): Carbon { return $this->date; } public static function fromArray(array $data): MessageInterface { return new self( $data['id'], $data['name'], Carbon::createFromFormat('Y-m-d H:i:s', $data['date']) ); } public function toArray(): array { return [ 'id' => $this->id, 'name' => $this->name, 'date' => $this->date ]; } } // consumer using the message namespace App\Jobs\Consumer; use Girni\LaravelRabbitMQ\Consumer\ConsumerInterface; use Girni\LaravelRabbitMQ\Consumer\HasCustomMessage; class PingJobConsumer implements ConsumerInterface, HasCustomMessage { use Dispatchable; use InteractsWithQueue; use Queueable; use SerializesModels; public function producer(): string { return 'ping:job'; } /** * @param \App\Jobs\Consumer\Message\PingJobMessage $message * @return void */ public function handle(MessageInterface $message): void { \DB::table('test-table')->create(['name' => $message->getName(), 'date' => $message->getDate()->format('Y-m-d')]); } public function message(): string { return \App\Jobs\Consumer\Message\PingJobMessage::class; } }
测试
composer test
变更日志
有关最近更改的更多信息,请参阅变更日志。
许可证
MIT许可证(MIT)。有关更多信息,请参阅许可证文件。