vladimir-yuldashev / laravel-queue-rabbitmq
Laravel Queue 的 RabbitMQ 驱动。支持 Laravel Horizon。
Requires
- php: ^8.0
- ext-json: *
- illuminate/queue: ^10.0|^11.0
- php-amqplib/php-amqplib: ^v3.6
Requires (Dev)
- laravel/framework: ^9.0|^10.0|^11.0
- laravel/horizon: ^5.0
- laravel/pint: ^1.2
- mockery/mockery: ^1.0
- orchestra/testbench: ^7.0|^8.0|^9.0
- phpunit/phpunit: ^10.0|^11.0
Suggests
- ext-pcntl: Required to use all features of the queue consumer.
- v14.1.0
- v14.0.0
- v13.3.5
- v13.3.4
- v13.3.3
- 13.3.2
- v13.3.1
- v13.3.0
- v13.2.0
- v13.1.0
- dev-master / 13.0.x-dev
- v13.0.1
- v13.0.0
- v12.0.x-dev
- v12.0.1
- v12.0.0
- v11.3.0
- v11.2.0
- v11.1.2
- v11.1.1
- v11.1.0
- v11.0.x-dev
- v11.0.2
- v11.0.1
- v11.0.0
- v10.2.3
- v10.2.2
- v10.2.1
- v10.2.0
- v10.1.3
- v10.1.2
- v10.1.1
- v10.1.0
- v10.0.x-dev
- v10.0.2
- v10.0.1
- v10.0.0
- v9.1.2
- v9.1.1
- v9.1.0
- v9.0.x-dev
- v9.0.0
- v8.3.0
- v8.2.0
- v8.1.0
- v8.0.x-dev
- v8.0.0
- v7.5.0
- v7.4.2
- v7.4.1
- v7.4.0
- v7.3.0
- v7.2.0
- v7.1.2
- v7.1.1
- v7.1.0
- v7.0.x-dev
- v7.0.2
- v7.0.1
- v7.0.0
- v6.0.x-dev
- v6.0.3
- v6.0.2
- v6.0.1
- v6.0.0
- v5.5.x-dev
- 5.5
- v5.4.x-dev
- 5.4
- v5.3.x-dev
- 5.3
- v5.2.x-dev
- 5.2
- v5.1.x-dev
- 5.1
- v5.0.x-dev
- 5.0
- v4.2.x-dev
- 4.2
- 4.1
- 4.0
- dev-l11
- dev-verifypeer
- dev-fix-lastpushed
- dev-changelog-1331
- dev-feature/support-octane-reconnect
- dev-changelog-13x-release-13_3_0
- dev-php82
- dev-newversions
- dev-octane
- dev-feature/abbility-to-hook-into-the-created-connection
- dev-after-creating-connection-callback
- dev-feature/use-custom-job-class-v10
This package is auto-updated.
Last update: 2024-09-08 17:25:25 UTC
README
支持策略
只有最新版本将获得新功能。错误修复将通过以下方案提供
安装
您可以通过以下命令使用 composer 安装此包
composer require vladimir-yuldashev/laravel-queue-rabbitmq
此包将自动注册自己。
配置
将连接添加到 config/queue.php
这是 rabbitMQ 连接/驱动程序工作的最小配置。
'connections' => [ // ... 'rabbitmq' => [ 'driver' => 'rabbitmq', 'hosts' => [ [ 'host' => env('RABBITMQ_HOST', '127.0.0.1'), 'port' => env('RABBITMQ_PORT', 5672), 'user' => env('RABBITMQ_USER', 'guest'), 'password' => env('RABBITMQ_PASSWORD', 'guest'), 'vhost' => env('RABBITMQ_VHOST', '/'), ], // ... ], // ... ], // ... ],
可选队列配置
可选地向连接的配置中添加队列选项。为该连接创建的每个队列都将获得这些属性。
当您想在消息延迟时优先处理消息时,可以通过添加额外选项来实现。
- 当省略 max-priority 时,当使用时最大优先级设置为 2。
'connections' => [ // ... 'rabbitmq' => [ // ... 'options' => [ 'queue' => [ // ... 'prioritize_delayed' => false, 'queue_max_priority' => 10, ], ], ], // ... ],
当您想针对具有路由键的交换机发布消息时,可以通过添加额外选项来实现。
- 当省略交换机时,RabbitMQ 将使用
amq.direct
交换机用于路由键。 - 当省略路由键时,默认路由键是队列名称。
- 当在路由键中使用
%s
时,队列名称将被替换。
注意:当使用具有路由键的交换机时,您可能需要自己创建队列的绑定。
'connections' => [ // ... 'rabbitmq' => [ // ... 'options' => [ 'queue' => [ // ... 'exchange' => 'application-x', 'exchange_type' => 'topic', 'exchange_routing_key' => '', ], ], ], // ... ],
在 Laravel 中,失败的任务存储到数据库中。但您可能希望指示其他进程也处理消息。当您想指示 RabbitMQ 将失败消息重新路由到交换机或特定队列时,可以通过添加额外选项来实现。
- 当省略交换机时,RabbitMQ 将使用
amq.direct
交换机用于路由键。 - 当省略路由键时,默认路由键将用
queue
名称替换为'.failed'
。 - 当在路由键中使用
%s
时,队列名称将被替换。
注意:当使用具有路由键的 failed_job 交换机时,您可能需要自己创建交换机/队列的绑定。
'connections' => [ // ... 'rabbitmq' => [ // ... 'options' => [ 'queue' => [ // ... 'reroute_failed' => true, 'failed_exchange' => 'failed-exchange', 'failed_routing_key' => 'application-x.%s', ], ], ], // ... ],
Horizon 支持
从 8.0 版本开始,此包支持 Laravel Horizon。首先安装 Horizon,然后将 RABBITMQ_WORKER
设置为 horizon
。
Horizon 依赖于由工作进程发送的事件。这些事件通知 Horizon 消息/任务的处理情况。
此库支持 Horizon,但在配置中您必须通知 Laravel 使用与 Horizon 兼容的 QueueApi。
'connections' => [ // ... 'rabbitmq' => [ // ... /* Set to "horizon" if you wish to use Laravel Horizon. */ 'worker' => env('RABBITMQ_WORKER', 'default'), ], // ... ],
使用自己的 RabbitMQJob 类
有时您必须处理由其他应用程序发布的消息。
这些消息可能不会遵循 Laravel 的作业有效负载模式。这些消息的问题在于,Laravel 工作进程无法确定实际作业或要执行的类。
您可以通过扩展内置的 RabbitMQJob::class
并在队列连接配置中定义自己的类,来自定义 job
键在配置中,每个从代理检索的消息都将被您的类包装。
配置示例
'connections' => [ // ... 'rabbitmq' => [ // ... 'options' => [ 'queue' => [ // ... 'job' => \App\Queue\Jobs\RabbitMQJob::class, ], ], ], // ... ],
自定义作业类的示例
<?php namespace App\Queue\Jobs; use VladimirYuldashev\LaravelQueueRabbitMQ\Queue\Jobs\RabbitMQJob as BaseJob; class RabbitMQJob extends BaseJob { /** * Fire the job. * * @return void */ public function fire() { $payload = $this->payload(); $class = WhatheverClassNameToExecute::class; $method = 'handle'; ($this->instance = $this->resolve($class))->{$method}($this, $payload); $this->delete(); } }
或者,您可能想向有效负载添加额外属性
<?php namespace App\Queue\Jobs; use VladimirYuldashev\LaravelQueueRabbitMQ\Queue\Jobs\RabbitMQJob as BaseJob; class RabbitMQJob extends BaseJob { /** * Get the decoded body of the job. * * @return array */ public function payload() { return [ 'job' => 'WhatheverFullyQualifiedClassNameToExecute@handle', 'data' => json_decode($this->getRawBody(), true) ]; } }
如果您想处理非 JSON 格式或没有 'job' 键的 JSON 的原始消息,您应该为 getName
方法添加存根
<?php namespace App\Queue\Jobs; use Illuminate\Support\Facades\Log; use VladimirYuldashev\LaravelQueueRabbitMQ\Queue\Jobs\RabbitMQJob as BaseJob; class RabbitMQJob extends BaseJob { public function fire() { $anyMessage = $this->getRawBody(); Log::info($anyMessage); $this->delete(); } public function getName() { return ''; } }
使用自己的连接
您可以扩展内置的 PhpAmqpLib\Connection\AMQPStreamConnection::class
或 PhpAmqpLib\Connection\AMQPSLLConnection::class
,并在连接配置中定义自己的类。当您在配置中指定 connection
键并使用自己的类名时,每个连接都将使用您自己的类。
配置示例
'connections' => [ // ... 'rabbitmq' => [ // ... 'connection' = > \App\Queue\Connection\MyRabbitMQConnection::class, ], // ... ],
使用自己的 Worker 类
如果您想使用自己的 RabbitMQQueue::class
,可以通过扩展 VladimirYuldashev\LaravelQueueRabbitMQ\Queue\RabbitMQQueue
实现。通过将 RABBITMQ_WORKER
设置为 \App\Queue\RabbitMQQueue::class
来通知 Laravel 使用您的类。
注意:Worker 类 必须 扩展
VladimirYuldashev\LaravelQueueRabbitMQ\Queue\RabbitMQQueue
'connections' => [ // ... 'rabbitmq' => [ // ... /* Set to a class if you wish to use your own. */ 'worker' => \App\Queue\RabbitMQQueue::class, ], // ... ],
<?php namespace App\Queue; use VladimirYuldashev\LaravelQueueRabbitMQ\Queue\RabbitMQQueue as BaseRabbitMQQueue; class RabbitMQQueue extends BaseRabbitMQQueue { // ... }
例如:重连实现。
如果您想在连接断开时重连到 RabbitMQ,可以重写发布和创建通道的方法。
注意:这不是最佳实践,仅为例子。
<?php namespace App\Queue; use PhpAmqpLib\Exception\AMQPChannelClosedException; use PhpAmqpLib\Exception\AMQPConnectionClosedException; use VladimirYuldashev\LaravelQueueRabbitMQ\Queue\RabbitMQQueue as BaseRabbitMQQueue; class RabbitMQQueue extends BaseRabbitMQQueue { protected function publishBasic($msg, $exchange = '', $destination = '', $mandatory = false, $immediate = false, $ticket = null): void { try { parent::publishBasic($msg, $exchange, $destination, $mandatory, $immediate, $ticket); } catch (AMQPConnectionClosedException|AMQPChannelClosedException) { $this->reconnect(); parent::publishBasic($msg, $exchange, $destination, $mandatory, $immediate, $ticket); } } protected function publishBatch($jobs, $data = '', $queue = null): void { try { parent::publishBatch($jobs, $data, $queue); } catch (AMQPConnectionClosedException|AMQPChannelClosedException) { $this->reconnect(); parent::publishBatch($jobs, $data, $queue); } } protected function createChannel(): AMQPChannel { try { return parent::createChannel(); } catch (AMQPConnectionClosedException) { $this->reconnect(); return parent::createChannel(); } } }
默认队列
当 Laravel 没有提供队列时,连接会使用默认的队列,值为 'default'。您可以通过在连接配置中添加额外的参数来更改默认队列。
'connections' => [ // ... 'rabbitmq' => [ // ... 'queue' => env('RABBITMQ_QUEUE', 'default'), ], // ... ],
心跳
默认情况下,您的连接将使用心跳设置 0
创建。您可以通过更改配置来更改心跳设置。
'connections' => [ // ... 'rabbitmq' => [ // ... 'options' => [ // ... 'heartbeat' => 10, ], ], // ... ],
SSL 安全
如果您需要与 rabbitMQ 服务器建立安全连接,您需要添加这些额外的配置选项。
'connections' => [ // ... 'rabbitmq' => [ // ... 'secure' = > true, 'options' => [ // ... 'ssl_options' => [ 'cafile' => env('RABBITMQ_SSL_CAFILE', null), 'local_cert' => env('RABBITMQ_SSL_LOCALCERT', null), 'local_key' => env('RABBITMQ_SSL_LOCALKEY', null), 'verify_peer' => env('RABBITMQ_SSL_VERIFY_PEER', true), 'passphrase' => env('RABBITMQ_SSL_PASSPHRASE', null), ], ], ], // ... ],
数据库提交后的事件
为了指导 Laravel 工作者在所有数据库提交完成后分发事件。
'connections' => [ // ... 'rabbitmq' => [ // ... 'after_commit' => true, ], // ... ],
懒加载连接
默认情况下,您的连接将创建为懒加载连接。如果出于某种原因您不想使用懒加载连接,可以通过以下配置将其关闭。
'connections' => [ // ... 'rabbitmq' => [ // ... 'lazy' = > false, ], // ... ],
网络协议
默认情况下,用于连接的网络协议是 tcp。如果出于某种原因您想使用其他网络协议,可以在配置选项中添加额外的值。可用协议:tcp
、ssl
、tls
'connections' => [ // ... 'rabbitmq' => [ // ... 'network_protocol' => 'tcp', ], // ... ],
Octane 支持
从 13.3.0 版本开始,此软件包默认支持 Laravel Octane。首先,安装 Octane 并不要忘记在 octane 配置中预热 'rabbitmq' 连接。
Laravel 使用
配置完成后,您可以使用 Laravel 队列 API。如果您使用了其他队列驱动,则不需要进行其他更改。如果您不知道如何使用队列 API,请参阅官方 Laravel 文档:https://laravel.net.cn/docs/queues
Lumen 使用
对于 Lumen 使用,应该在 bootstrap/app.php
中手动注册服务提供者,如下所示。
$app->register(VladimirYuldashev\LaravelQueueRabbitMQ\LaravelQueueRabbitMQServiceProvider::class);
消费消息
有两种消费消息的方法。
-
queue:work
命令是 Laravel 内置的命令。此命令利用basic_get
。如果您想消费多个队列,请使用此命令。 -
rabbitmq:consume
命令是由此软件包提供的。此命令利用basic_consume
,比basic_get
性能高约 2 倍,但不支持多个队列。
测试
使用 docker-compose
设置 RabbitMQ
docker compose up -d
要运行测试套件,可以使用以下命令
# To run both style and unit tests. composer test # To run only style tests. composer test:style # To run only unit tests. composer test:unit
如果您收到样式测试的错误,您可以使用以下命令自动修复大部分,如果不是所有的问题
composer fix:style
贡献
您可以通过发现错误并打开问题来为此软件包做出贡献。请添加您创建拉取请求或问题的软件包版本。(例如,[5.2] 延迟作业上的致命错误)