girni/laravel-rabbitmq

扩展 https://github.com/vyuldashev/laravel-queue-rabbitmq 以支持微服务通信的 Laravel RabbitMQ 包。

1.0.0 2024-01-17 14:21 UTC

This package is auto-updated.

Last update: 2024-09-17 16:50:35 UTC


README

扩展 https://github.com/vyuldashev/laravel-queue-rabbitmq 以支持微服务通信的 Laravel RabbitMQ 包。

安装

composer require girni/laravel-rabbitmq

在您的 composer.json 文件中,在 require 部分 repositories

{
  "require": {
    "girni/laravel-rabbitmq": "^1.0"
  }
}

之后,只需运行 composer installcomposer update girni/laravel-rabbitmq 以安装它。

该包将自动注册自己。

将连接添加到 config/queue.php

'connections' => [
    // ...
    'rabbitmq' => [
    
       'driver' => 'rabbitmq',
       'queue' => env('RABBITMQ_QUEUE', 'default'),
       'connection' => PhpAmqpLib\Connection\AMQPLazyConnection::class,
   
       'hosts' => [
           [
               'host' => env('RABBITMQ_HOST', '127.0.0.1'),
               'port' => env('RABBITMQ_PORT', 5672),
               'user' => env('RABBITMQ_USER', 'guest'),
               'password' => env('RABBITMQ_PASSWORD', 'guest'),
               'vhost' => env('RABBITMQ_VHOST', '/'),
           ],
       ],
   
       'options' => [
           'ssl_options' => [
               'cafile' => env('RABBITMQ_SSL_CAFILE', null),
               'local_cert' => env('RABBITMQ_SSL_LOCALCERT', null),
               'local_key' => env('RABBITMQ_SSL_LOCALKEY', null),
               'verify_peer' => env('RABBITMQ_SSL_VERIFY_PEER', true),
               'passphrase' => env('RABBITMQ_SSL_PASSPHRASE', null),
           ],
           'queue' => [
               'job' => Girni\LaravelRabbitMQ\LaravelRabbitMQJobHandler::class,
           ],
       ],
   
       /*
        * Set to "horizon" if you wish to use Laravel Horizon.
        */
       'worker' => env('RABBITMQ_WORKER', Girni\LaravelRabbitMQ\Queue\RabbitMQQueue::class),
        
    ],
    // ...    
],

发布 laravel-rabbitmq.php 配置文件

php artisan vendor:publish --provider="Girni\LaravelRabbitMQ\LaravelRabbitMQServiceProvider" --tag="config"

用法

在 .env 文件中定义队列

为了确保我们的应用程序之间通信运行顺畅,有必要为每个应用程序定义单独的队列。我们可以在 .env 文件中完成。

RABBITMQ_QUEUE=my-application-queue

定义 RABBIT_QUEUE 将导致我们的应用程序只从该队列接收消息。通过为每个应用程序创建不同的队列,我们保证我们的消息不会被其他应用程序以错误的方式处理。

创建生产者

生产意味着发送。发送消息的类是一个生产者。我们的生产者类应尽可能简单。它应包含通过构造函数传递的消息数据和必须实现合约的 ::name()

::name() 的值非常重要,它被用来识别我们应该运行哪个消费者来处理产生的消息。

<?php

namespace App\Jobs\Producer;

use Girni\LaravelRabbitMQ\Message\MessageInterface;
use Girni\LaravelRabbitMQ\Producer\AbstractProducer;

class PingJobProducer extends AbstractProducer
{
    public function __construct(MessageInterface $message) 
    {        
        parent::__construct($message);
    }

    public function name(): string
    {
        return 'ping:job';
    }
}

分发消息

要将消息发送到 RabbitMQ,我们可以使用 Laravel 分发器以多种方式。更多详情请参阅 https://laravel.net.cn/docs/9.x/queues#dispatching-jobs

PingJobProducer::dispatch(\Girni\LaravelRabbitMQ\Message\BaseMessage::fromArray([
    'key1' => 'value',
    'key2' => 'value2'
]))->onQueue('my-queue')

定义要发送消息的队列非常重要,我们可以使用 ::onQueue() 方法来定义。如前所述,每个应用程序应该只从自己的队列接收消息。因此,如果我们想使我们的消息被另一个应用程序处理,我们需要将其发送到该应用程序的队列。

非 Laravel 发送者的消息结构

JSON 示例

{
    "job": "producer",
    "data": { "key": "value" }
}

创建消费者

消费者类不过是生产者的“处理程序”。这个类必须处理生产者发送的数据。

namespace App\Jobs\Consumer;

use Girni\LaravelRabbitMQ\Consumer\ConsumerInterface;

class PingJobConsumer implements ConsumerInterface
{
    use Dispatchable;
    use InteractsWithQueue;
    use Queueable;
    use SerializesModels;

    public function producer(): string
    {
        return 'ping:job'; // this value must be same as Producer ::name() method we want to consume.
    }

    public function handle(MessageInterface $message): void
    {
        $key1 = $message->get('key1'); // value
        $key2 = $message->get('key2'); // value2
        
        \DB::table('test-table')->create(['key1' => $key1, 'key2' => $key2]);
    }
}

注册消费者

在您的应用程序中,在 config\laravel-rabbitmq.php 文件中的 consumers 数组中,您必须注册您想要在应用程序中使用每个消费者。您的配置应该看起来像

<?php

/*
 * You can place your custom package configuration in here.
 */
return [
    'consumers' => [
        \App\Jobs\Consumer\PingJobConsumer::class,
        \App\Jobs\Consumer\MyTestConsumer::class
    ]
];

消费消息

要开始消费产生的消息,我们需要运行我们的队列

php artisan rabbitmq:consume

或标准的 Laravel 流程

php artisan queue:work

高级用法 - 自定义消息类

作为标准消息存储,我们实现了 Girni\LaravelRabbitMQ\Message\BaseMessage::class,它具有简单的接口,操作数组键。

它包含一些辅助方法,如 ::get($key), ::set($key),以帮助客户端从消息中获取/设置数据。

然而,在某些情况下,我们的消息可能非常复杂,我们可能更喜欢将数据存储在具有面向对象接口的专用类(DTO)中。

我们可以通过将Girni\LaravelRabbitMQConsumer\HasCustomMessage接口添加到我们的消费者类中来实现。这个接口包含一个::message()方法。在这个方法的主体中,您应该传递一个全限定类名(FQCN)到您的类。

重要:请确保您的DTO类实现了Girni\LaravelRabbitMQ\Message\MessageInterface接口。我们库创建对象的全部逻辑都隐藏在::fromArray(array $data)方法中。

namespace App\Jobs\Consumer\Message;

use Girni\LaravelRabbitMQ\Message\MessageInterface;

class PingJobMessage implements MessageInterface
{
    private string $id;
    private string $name;
    private Carbon $date;

    public function __construct(string $id, string $name, Carbon $date)
    {
        $this->id = $id;
        $this->name = $name;
        $this->date = $date;
    }
    
    public  function getId(): string
    {
        return $this->id;
    }

    public  function getName(): string
    {
        return $this->name;
    }
    
    public  function getDate(): Carbon
    {
        return $this->date;
    }
    
    public static function fromArray(array $data): MessageInterface
    {
        return new self(
            $data['id'],
            $data['name'],
            Carbon::createFromFormat('Y-m-d H:i:s', $data['date'])
        );
    }

    public function toArray(): array
    {
        return [
            'id' => $this->id,
            'name' => $this->name,
            'date' => $this->date
        ];
    }
}

// consumer using the message

namespace App\Jobs\Consumer;

use Girni\LaravelRabbitMQ\Consumer\ConsumerInterface;
use Girni\LaravelRabbitMQ\Consumer\HasCustomMessage;

class PingJobConsumer implements ConsumerInterface, HasCustomMessage
{
    use Dispatchable;
    use InteractsWithQueue;
    use Queueable;
    use SerializesModels;

    public function producer(): string
    {
        return 'ping:job';
    }

    /**
    * @param \App\Jobs\Consumer\Message\PingJobMessage $message
    * @return void
     */
    public function handle(MessageInterface $message): void
    {
        \DB::table('test-table')->create(['name' => $message->getName(), 'date' => $message->getDate()->format('Y-m-d')]);
    }
    
    public function message(): string
    {
        return \App\Jobs\Consumer\Message\PingJobMessage::class;
    }
}

测试

composer test

变更日志

有关最近更改的更多信息,请参阅变更日志

许可证

MIT许可证(MIT)。有关更多信息,请参阅许可证文件