grimurrash/laravel-queue-rabbitmq

Laravel Queue 的 RabbitMQ 驱动。支持 Laravel Horizon。

v8.2.7 2023-08-22 15:07 UTC

This package is auto-updated.

Last update: 2024-09-22 17:46:31 UTC


README

Latest Stable Version Build Status Total Downloads License

支持策略

只有最新版本将获得新功能。将按照以下方案提供错误修复:

安装

您可以通过以下命令使用 composer 安装此包:

composer require vladimir-yuldashev/laravel-queue-rabbitmq

此包将自动注册自身。

配置

将连接添加到 config/queue.php

这是 rabbitMQ 连接/驱动器工作的最小配置。

'connections' => [
    // ...

    'rabbitmq' => [
    
       'driver' => 'rabbitmq',
       'hosts' => [
           [
               'host' => env('RABBITMQ_HOST', '127.0.0.1'),
               'port' => env('RABBITMQ_PORT', 5672),
               'user' => env('RABBITMQ_USER', 'guest'),
               'password' => env('RABBITMQ_PASSWORD', 'guest'),
               'vhost' => env('RABBITMQ_VHOST', '/'),
           ],
           // ...
       ],

       // ...
    ],

    // ...    
],

可选队列配置

可选地将队列选项添加到连接的配置中。为该连接创建的每个队列都获得属性。

当您想要在消息延迟时优先处理消息时,可以通过添加额外选项来实现。

  • 当省略最大优先级时,在使用的最大优先级为 2。
'connections' => [
    // ...

    'rabbitmq' => [
        // ...

        'options' => [
            'queue' => [
                // ...

                'prioritize_delayed' =>  false,
                'queue_max_priority' => 10,
            ],
        ],
    ],

    // ...    
],

当您想要发布与具有路由键的交换机相关的消息时,可以通过添加额外选项来实现。

  • 当省略交换机时,RabbitMQ 将使用 amq.direct 交换机用于路由键。
  • 当省略路由键时,默认的路由键是队列名称。
  • 当在路由键中使用 %s 时,将替换队列名称。

注意:当使用带路由键的交换机时,您可能需要自己创建队列的绑定。

'connections' => [
    // ...

    'rabbitmq' => [
        // ...

        'options' => [
            'queue' => [
                // ...

                'exchange' => 'application-x',
                'exchange_type' => 'topic',
                'exchange_routing_key' => '',
            ],
        ],
    ],

    // ...    
],

在 Laravel 中,失败的工作会存储到数据库中。但也许您想指示其他进程也处理消息。当您想指示 RabbitMQ 将失败消息重新路由到交换机或特定的队列时,可以通过添加额外选项来实现。

  • 当省略交换机时,RabbitMQ 将使用 amq.direct 交换机用于路由键。
  • 当省略路由键时,默认的路由键将用 queue 名称替换,后跟 `.failed'
  • 当在路由键中使用 %s 时,将替换队列名称。

注意:当使用带有路由键的失败工作交换机时,您可能需要自己创建交换机/队列的绑定。

'connections' => [
    // ...

    'rabbitmq' => [
        // ...

        'options' => [
            'queue' => [
                // ...

                'reroute_failed' => true,
                'failed_exchange' => 'failed-exchange',
                'failed_routing_key' => 'application-x.%s',
            ],
        ],
    ],

    // ...    
],

Horizon 支持

从 8.0 版本开始,此包默认支持 Laravel Horizon。首先安装 Horizon,然后将 RABBITMQ_WORKER 设置为 horizon

Horizon 依赖于由工作进程分发的事件。这些事件通知 Horizon 消息/工作已执行的操作。

此库支持 Horizon,但在配置中您必须通知 Laravel 使用与 Horizon 兼容的 QueueApi。

'connections' => [
    // ...

    'rabbitmq' => [
        // ...

        /* Set to "horizon" if you wish to use Laravel Horizon. */
       'worker' => env('RABBITMQ_WORKER', 'default'),
    ],

    // ...    
],

使用您自己的 RabbitMQJob 类

有时您必须处理由其他应用程序发布的消息。
这些消息可能不会遵循 Laravel 的作业有效负载模式。这些消息的问题在于,Laravel 工作进程将无法确定要执行的实际作业或类。

您可以通过扩展内置的 RabbitMQJob::class 并在队列连接配置中定义自己的类来扩展内置的类。当您在配置中指定 job 键时,使用您自己的类名,从代理检索的每个消息都将由您的类包装。

配置示例

'connections' => [
    // ...

    'rabbitmq' => [
        // ...

        'options' => [
            'queue' => [
                // ...

                'job' => \App\Queue\Jobs\RabbitMQJob::class,
            ],
        ],
    ],

    // ...    
],

您的自定义作业类示例

<?php

namespace App\Queue\Jobs;

use VladimirYuldashev\LaravelQueueRabbitMQ\Queue\Jobs\RabbitMQJob as BaseJob;

class RabbitMQJob extends BaseJob
{

    /**
     * Fire the job.
     *
     * @return void
     */
    public function fire()
    {
        $payload = $this->payload();

        $class = WhatheverClassNameToExecute::class;
        $method = 'handle';

        ($this->instance = $this->resolve($class))->{$method}($this, $payload);

        $this->delete();
    }
}

或者,您可能想向有效负载添加额外的属性

<?php

namespace App\Queue\Jobs;

use VladimirYuldashev\LaravelQueueRabbitMQ\Queue\Jobs\RabbitMQJob as BaseJob;

class RabbitMQJob extends BaseJob
{
   /**
     * Get the decoded body of the job.
     *
     * @return array
     */
    public function payload()
    {
        return [
            'job'  => 'WhatheverFullyQualifiedClassNameToExecute@handle',
            'data' => json_decode($this->getRawBody(), true)
        ];
    }
}

如果您想处理原始消息(不是 JSON 格式或 JSON 中没有 'job' 键),则应添加用于 getName 方法的占位符

<?php

namespace App\Queue\Jobs;

use Illuminate\Support\Facades\Log;
use VladimirYuldashev\LaravelQueueRabbitMQ\Queue\Jobs\RabbitMQJob as BaseJob;

class RabbitMQJob extends BaseJob
{
    public function fire()
    {
        $anyMessage = $this->getRawBody();
        Log::info($anyMessage);

        $this->delete();
    }

    public function getName()
    {
        return '';
    }
}

使用您自己的连接

您可以通过扩展内置的 PhpAmqpLib\Connection\AMQPStreamConnection::classPhpAmqpLib\Connection\AMQPSLLConnection::class 来扩展内置的类,并在连接配置中定义自己的类。当您在配置中指定 connection 键时,使用您自己的类名,每个连接都将使用您的类。

配置示例

'connections' => [
    // ...

    'rabbitmq' => [
        // ...

        'connection' = > \App\Queue\Connection\MyRabbitMQConnection::class,
    ],

    // ...    
],

使用您自己的Worker类

如果您想使用自己的RabbitMQQueue::class,可以通过扩展VladimirYuldashev\LaravelQueueRabbitMQ\Queue\RabbitMQQueue来实现。然后通过设置RABBITMQ_WORKER\App\Queue\RabbitMQQueue::class来告知Laravel使用您的类。

注意:Worker类必须扩展VladimirYuldashev\LaravelQueueRabbitMQ\Queue\RabbitMQQueue

'connections' => [
    // ...

    'rabbitmq' => [
        // ...

        /* Set to a class if you wish to use your own. */
       'worker' => \App\Queue\RabbitMQQueue::class,
    ],

    // ...    
],
<?php

namespace App\Queue;

use VladimirYuldashev\LaravelQueueRabbitMQ\Queue\RabbitMQQueue as BaseRabbitMQQueue;

class RabbitMQQueue extends BaseRabbitMQQueue
{
    // ...
}

例如:重连实现。

如果您希望在连接断开时重新连接到RabbitMQ,可以覆盖发布和创建通道的方法。

注意:这不是最佳实践,只是一个示例。

<?php

namespace App\Queue;

use PhpAmqpLib\Exception\AMQPChannelClosedException;
use PhpAmqpLib\Exception\AMQPConnectionClosedException;
use VladimirYuldashev\LaravelQueueRabbitMQ\Queue\RabbitMQQueue as BaseRabbitMQQueue;

class RabbitMQQueue extends BaseRabbitMQQueue
{

    protected function publishBasic($msg, $exchange = '', $destination = '', $mandatory = false, $immediate = false, $ticket = null): void
    {
        try {
            parent::publishBasic($msg, $exchange, $destination, $mandatory, $immediate, $ticket);
        } catch (AMQPConnectionClosedException|AMQPChannelClosedException) {
            $this->reconnect();
            parent::publishBasic($msg, $exchange, $destination, $mandatory, $immediate, $ticket);
        }
    }

    protected function publishBatch($jobs, $data = '', $queue = null): void
    {
        try {
            parent::publishBatch($jobs, $data, $queue);
        } catch (AMQPConnectionClosedException|AMQPChannelClosedException) {
            $this->reconnect();
            parent::publishBatch($jobs, $data, $queue);
        }
    }

    protected function createChannel(): AMQPChannel
    {
        try {
            return parent::createChannel();
        } catch (AMQPConnectionClosedException) {
            $this->reconnect();
            return parent::createChannel();
        }
    }
}

默认队列

当Laravel没有提供队列时,连接将使用默认队列,其值为'default'。您可以通过在连接配置中添加额外参数来更改默认队列。

'connections' => [
    // ...

    'rabbitmq' => [
        // ...
            
        'queue' => env('RABBITMQ_QUEUE', 'default'),
    ],

    // ...    
],

心跳

默认情况下,您的连接将创建一个心跳设置为0的连接。您可以通过更改配置来修改心跳设置。

'connections' => [
    // ...

    'rabbitmq' => [
        // ...

        'options' => [
            // ...

            'heartbeat' => 10,
        ],
    ],

    // ...    
],

SSL安全

如果您需要与rabbitMQ服务器进行安全连接,需要添加这些额外的配置选项。

'connections' => [
    // ...

    'rabbitmq' => [
        // ...

        'secure' = > true,
        'options' => [
            // ...

            'ssl_options' => [
                'cafile' => env('RABBITMQ_SSL_CAFILE', null),
                'local_cert' => env('RABBITMQ_SSL_LOCALCERT', null),
                'local_key' => env('RABBITMQ_SSL_LOCALKEY', null),
                'verify_peer' => env('RABBITMQ_SSL_VERIFY_PEER', true),
                'passphrase' => env('RABBITMQ_SSL_PASSPHRASE', null),
            ],
        ],
    ],

    // ...    
],

数据库提交后的事件

指示Laravel工作器在所有数据库提交完成后派发事件。

'connections' => [
    // ...

    'rabbitmq' => [
        // ...

        'after_commit' => true,
    ],

    // ...    
],

懒加载连接

默认情况下,您的连接将创建为懒加载连接。如果出于某种原因您不希望连接懒加载,可以通过以下配置将其关闭。

'connections' => [
    // ...

    'rabbitmq' => [
        // ...

        'lazy' = > false,
    ],

    // ...    
],

网络协议

默认情况下,用于连接的网络协议是tcp。如果出于某种原因您想使用其他网络协议,可以在配置选项中添加额外值。支持的协议:tcpssltls

'connections' => [
    // ...

    'rabbitmq' => [
        // ...

        'network_protocol' => 'tcp',
    ],

    // ...    
],

Octane支持

从13.3.0版本开始,此包默认支持Laravel Octane。首先,安装Octane,并不要忘记在octane配置中预热'rabbitmq'连接。

参阅:vyuldashev#460 (评论)

Laravel使用

完成配置后,您可以使用Laravel队列API。如果您使用过其他队列驱动程序,则不需要更改其他任何内容。如果您不知道如何使用队列API,请参阅官方Laravel文档:https://laravel.net.cn/docs/queues

Lumen使用

对于Lumen使用,应在bootstrap/app.php中手动注册服务提供程序,如下所示:

$app->register(VladimirYuldashev\LaravelQueueRabbitMQ\LaravelQueueRabbitMQServiceProvider::class);

消费消息

有两种方式来消费消息。

  1. queue:work命令是Laravel内置的命令。该命令使用basic_get。如果您想消费多个队列,请使用此命令。

  2. rabbitmq:consume命令是由此包提供的。该命令使用basic_consume,比basic_get性能高约2倍,但不支持多个队列。

测试

使用docker-compose设置RabbitMQ

docker compose up -d

要运行测试套件,可以使用以下命令:

# To run both style and unit tests.
composer test

# To run only style tests.
composer test:style

# To run only unit tests.
composer test:unit

如果您从样式测试中收到任何错误,您可以使用以下命令自动修复大多数,如果不是全部问题。

composer fix:style

贡献

您可以通过发现错误并打开问题来为此包做出贡献。请记住,在创建pull request或问题时要添加包的版本。(例如:[5.2] 延迟作业上的致命错误)