yickson/laravel-queue-rabbitmq

Laravel Queue 的 RabbitMQ 驱动。支持 Laravel Horizon。支持 PHP-8


README

Latest Stable Version Build Status Total Downloads StyleCI License

支持政策

只有最新版本将获得新功能。将通过以下方案提供错误修复

安装

您可以使用以下命令通过 composer 安装此包

composer require vladimir-yuldashev/laravel-queue-rabbitmq

此包将自动注册自己。

将连接添加到 config/queue.php

'connections' => [
    // ...

    'rabbitmq' => [
    
       'driver' => 'rabbitmq',
       'queue' => env('RABBITMQ_QUEUE', 'default'),
       'connection' => PhpAmqpLib\Connection\AMQPLazyConnection::class,
   
       'hosts' => [
           [
               'host' => env('RABBITMQ_HOST', '127.0.0.1'),
               'port' => env('RABBITMQ_PORT', 5672),
               'user' => env('RABBITMQ_USER', 'guest'),
               'password' => env('RABBITMQ_PASSWORD', 'guest'),
               'vhost' => env('RABBITMQ_VHOST', '/'),
           ],
       ],
   
       'options' => [
           'ssl_options' => [
               'cafile' => env('RABBITMQ_SSL_CAFILE', null),
               'local_cert' => env('RABBITMQ_SSL_LOCALCERT', null),
               'local_key' => env('RABBITMQ_SSL_LOCALKEY', null),
               'verify_peer' => env('RABBITMQ_SSL_VERIFY_PEER', true),
               'passphrase' => env('RABBITMQ_SSL_PASSPHRASE', null),
           ],
           'queue' => [
               'job' => VladimirYuldashev\LaravelQueueRabbitMQ\Queue\Jobs\RabbitMQJob::class,
           ],
       ],
   
       /*
        * Set to "horizon" if you wish to use Laravel Horizon.
        */
       'worker' => env('RABBITMQ_WORKER', 'default'),
        
    ],

    // ...    
],

可选配置

可选地向连接的配置中添加队列选项。为该连接创建的每个队列都将获得这些属性。

当您想按延迟时消息的优先级进行排序时,可以通过添加额外选项来实现。

  • 当省略最大优先级时,在使用时最大优先级设置为 2。
'connections' => [
    // ...

    'rabbitmq' => [
        // ...

        'options' => [
            'queue' => [
                // ...

                'prioritize_delayed_messages' =>  false,
                'queue_max_priority' => 10,
            ],
        ],
    ],

    // ...    
],

当您想通过带有路由键的交换机发布消息时,可以通过添加额外选项来实现。

  • 当省略交换机时,RabbitMQ 将使用 amq.direct 交换机进行路由键。
  • 当省略路由键时,默认路由键是 queue 名称。
  • 当在路由键中使用 %s 时,队列名称将被替换。

注意:当使用带有路由键的交换机时,您可能需要自己创建队列的绑定。

'connections' => [
    // ...

    'rabbitmq' => [
        // ...

        'options' => [
            'queue' => [
                // ...

                'exchange' => 'application-x',
                'exchange_type' => 'topic',
                'exchange_routing_key' => '',
            ],
        ],
    ],

    // ...    
],

当您想指示其他进程也处理消息时,Laravel 将失败作业存储到数据库中。但也许您想指示某些其他进程也处理消息。当您想指示 RabbitMQ 将失败消息重路由到交换机或特定队列时,可以通过添加额外选项来实现。

  • 当省略交换机时,RabbitMQ 将使用 amq.direct 交换机进行路由键。
  • 当省略路由键时,默认路由键是 queue 名称,替换为 '.failed'
  • 当在路由键中使用 %s 时,队列名称将被替换。

注意:当使用带有路由键的失败作业交换机时,您可能需要自己创建交换机/队列的绑定。

'connections' => [
    // ...

    'rabbitmq' => [
        // ...

        'options' => [
            'queue' => [
                // ...

                'reroute_failed' => true,
                'failed_exchange' => 'failed-exchange',
                'failed_routing_key' => 'application-x.%s',
            ],
        ],
    ],

    // ...    
],

使用您自己的 RabbitMQJob 类

有时您必须处理由其他应用程序发布的消息。
这些消息可能不会遵守 Laravel 作业有效负载模式。这些消息的问题在于,Laravel 工作进程无法确定实际作业或要执行的类。

您可以通过在队列连接配置中定义自己的类来扩展内置的 RabbitMQJob::class。当您在配置中指定 job 键并使用自己的类名时,从代理检索的每个消息都将被您的类包装。

配置示例

'connections' => [
    // ...

    'rabbitmq' => [
        // ...

        'options' => [
            'queue' => [
                // ...

                'job' => \App\Queue\Jobs\RabbitMQJob::class,
            ],
        ],
    ],

    // ...    
],

您自己的作业类示例

<?php

namespace App\Queue\Jobs;

use VladimirYuldashev\LaravelQueueRabbitMQ\Queue\Jobs\RabbitMQJob as BaseJob;

class RabbitMQJob extends BaseJob
{

    /**
     * Fire the job.
     *
     * @return void
     */
    public function fire()
    {
        $payload = $this->payload();

        $class = WhatheverClassNameToExecute::class;
        $method = 'handle';

        ($this->instance = $this->resolve($class))->{$method}($this, $payload);
    }
}

或者您可能想向有效负载添加额外的属性

<?php

namespace App\Queue\Jobs;

use VladimirYuldashev\LaravelQueueRabbitMQ\Queue\Jobs\RabbitMQJob as BaseJob;

class RabbitMQJob extends BaseJob
{
   /**
     * Get the decoded body of the job.
     *
     * @return array
     */
    public function payload()
    {
        return [
            'job'  => 'WhatheverFullyQualifiedClassNameToExecute@handle',
            'data' => json_decode($this->getRawBody(), true)
        ];
    }
}

Laravel 使用

完成配置后,您可以使用 Laravel 队列 API。如果您使用过其他队列驱动,则无需更改其他任何内容。如果您不知道如何使用队列 API,请参阅官方 Laravel 文档:https://laravel.net.cn/docs/queues

Laravel Horizon 使用

从 8.0 版本开始,此包默认支持 Laravel Horizon。首先安装 Horizon,然后将 RABBITMQ_WORKER 设置为 horizon

Lumen 使用

对于 Lumen 的使用,服务提供者应手动在 bootstrap/app.php 中注册,如下所示

$app->register(VladimirYuldashev\LaravelQueueRabbitMQ\LaravelQueueRabbitMQServiceProvider::class);

消费消息

有两种方法可以消费消息。

  1. queue:work 命令是 Laravel 内置的命令。此命令使用 basic_get

  2. rabbitmq:consume 命令由本包提供。此命令使用 basic_consume,比 basic_get 性能高约 2 倍。

测试

使用 docker-compose 设置 RabbitMQ

docker-compose up -d rabbitmq

要运行测试套件,您可以使用以下命令

# To run both style and unit tests.
composer test

# To run only style tests.
composer test:style

# To run only unit tests.
composer test:unit

如果您在样式测试中遇到任何错误,您可以使用以下命令自动修复大部分,如果不是所有的问题

composer fix:style

贡献

您可以通过发现错误和创建问题来为此软件包做出贡献。请说明您创建的拉取请求或问题的软件包版本。(例如:[5.2] 延迟作业上的致命错误)