grimurrash / laravel-queue-rabbitmq
Laravel Queue 的 RabbitMQ 驱动。支持 Laravel Horizon。
Requires
- php: ^8.2
- ext-json: *
- grimurrash/php-amqplib: v8.2.7
- illuminate/queue: ^9.0|^10.0
Requires (Dev)
- laravel/framework: ^9.0|^10.0
- laravel/horizon: ^5.0
- laravel/pint: ^1.2
- mockery/mockery: ^1.0
- orchestra/testbench: ^7.0|^8.0
- phpunit/phpunit: ^9.3
Suggests
- ext-pcntl: Required to use all features of the queue consumer.
This package is auto-updated.
Last update: 2024-09-22 17:46:31 UTC
README
支持策略
只有最新版本将获得新功能。将按照以下方案提供错误修复:
安装
您可以通过以下命令使用 composer 安装此包:
composer require vladimir-yuldashev/laravel-queue-rabbitmq
此包将自动注册自身。
配置
将连接添加到 config/queue.php
这是 rabbitMQ 连接/驱动器工作的最小配置。
'connections' => [ // ... 'rabbitmq' => [ 'driver' => 'rabbitmq', 'hosts' => [ [ 'host' => env('RABBITMQ_HOST', '127.0.0.1'), 'port' => env('RABBITMQ_PORT', 5672), 'user' => env('RABBITMQ_USER', 'guest'), 'password' => env('RABBITMQ_PASSWORD', 'guest'), 'vhost' => env('RABBITMQ_VHOST', '/'), ], // ... ], // ... ], // ... ],
可选队列配置
可选地将队列选项添加到连接的配置中。为该连接创建的每个队列都获得属性。
当您想要在消息延迟时优先处理消息时,可以通过添加额外选项来实现。
- 当省略最大优先级时,在使用的最大优先级为 2。
'connections' => [ // ... 'rabbitmq' => [ // ... 'options' => [ 'queue' => [ // ... 'prioritize_delayed' => false, 'queue_max_priority' => 10, ], ], ], // ... ],
当您想要发布与具有路由键的交换机相关的消息时,可以通过添加额外选项来实现。
- 当省略交换机时,RabbitMQ 将使用
amq.direct
交换机用于路由键。 - 当省略路由键时,默认的路由键是队列名称。
- 当在路由键中使用
%s
时,将替换队列名称。
注意:当使用带路由键的交换机时,您可能需要自己创建队列的绑定。
'connections' => [ // ... 'rabbitmq' => [ // ... 'options' => [ 'queue' => [ // ... 'exchange' => 'application-x', 'exchange_type' => 'topic', 'exchange_routing_key' => '', ], ], ], // ... ],
在 Laravel 中,失败的工作会存储到数据库中。但也许您想指示其他进程也处理消息。当您想指示 RabbitMQ 将失败消息重新路由到交换机或特定的队列时,可以通过添加额外选项来实现。
- 当省略交换机时,RabbitMQ 将使用
amq.direct
交换机用于路由键。 - 当省略路由键时,默认的路由键将用
queue
名称替换,后跟`.failed'
。 - 当在路由键中使用
%s
时,将替换队列名称。
注意:当使用带有路由键的失败工作交换机时,您可能需要自己创建交换机/队列的绑定。
'connections' => [ // ... 'rabbitmq' => [ // ... 'options' => [ 'queue' => [ // ... 'reroute_failed' => true, 'failed_exchange' => 'failed-exchange', 'failed_routing_key' => 'application-x.%s', ], ], ], // ... ],
Horizon 支持
从 8.0 版本开始,此包默认支持 Laravel Horizon。首先安装 Horizon,然后将 RABBITMQ_WORKER
设置为 horizon
。
Horizon 依赖于由工作进程分发的事件。这些事件通知 Horizon 消息/工作已执行的操作。
此库支持 Horizon,但在配置中您必须通知 Laravel 使用与 Horizon 兼容的 QueueApi。
'connections' => [ // ... 'rabbitmq' => [ // ... /* Set to "horizon" if you wish to use Laravel Horizon. */ 'worker' => env('RABBITMQ_WORKER', 'default'), ], // ... ],
使用您自己的 RabbitMQJob 类
有时您必须处理由其他应用程序发布的消息。
这些消息可能不会遵循 Laravel 的作业有效负载模式。这些消息的问题在于,Laravel 工作进程将无法确定要执行的实际作业或类。
您可以通过扩展内置的 RabbitMQJob::class
并在队列连接配置中定义自己的类来扩展内置的类。当您在配置中指定 job
键时,使用您自己的类名,从代理检索的每个消息都将由您的类包装。
配置示例
'connections' => [ // ... 'rabbitmq' => [ // ... 'options' => [ 'queue' => [ // ... 'job' => \App\Queue\Jobs\RabbitMQJob::class, ], ], ], // ... ],
您的自定义作业类示例
<?php namespace App\Queue\Jobs; use VladimirYuldashev\LaravelQueueRabbitMQ\Queue\Jobs\RabbitMQJob as BaseJob; class RabbitMQJob extends BaseJob { /** * Fire the job. * * @return void */ public function fire() { $payload = $this->payload(); $class = WhatheverClassNameToExecute::class; $method = 'handle'; ($this->instance = $this->resolve($class))->{$method}($this, $payload); $this->delete(); } }
或者,您可能想向有效负载添加额外的属性
<?php namespace App\Queue\Jobs; use VladimirYuldashev\LaravelQueueRabbitMQ\Queue\Jobs\RabbitMQJob as BaseJob; class RabbitMQJob extends BaseJob { /** * Get the decoded body of the job. * * @return array */ public function payload() { return [ 'job' => 'WhatheverFullyQualifiedClassNameToExecute@handle', 'data' => json_decode($this->getRawBody(), true) ]; } }
如果您想处理原始消息(不是 JSON 格式或 JSON 中没有 'job' 键),则应添加用于 getName
方法的占位符
<?php namespace App\Queue\Jobs; use Illuminate\Support\Facades\Log; use VladimirYuldashev\LaravelQueueRabbitMQ\Queue\Jobs\RabbitMQJob as BaseJob; class RabbitMQJob extends BaseJob { public function fire() { $anyMessage = $this->getRawBody(); Log::info($anyMessage); $this->delete(); } public function getName() { return ''; } }
使用您自己的连接
您可以通过扩展内置的 PhpAmqpLib\Connection\AMQPStreamConnection::class
或 PhpAmqpLib\Connection\AMQPSLLConnection::class
来扩展内置的类,并在连接配置中定义自己的类。当您在配置中指定 connection
键时,使用您自己的类名,每个连接都将使用您的类。
配置示例
'connections' => [ // ... 'rabbitmq' => [ // ... 'connection' = > \App\Queue\Connection\MyRabbitMQConnection::class, ], // ... ],
使用您自己的Worker类
如果您想使用自己的RabbitMQQueue::class
,可以通过扩展VladimirYuldashev\LaravelQueueRabbitMQ\Queue\RabbitMQQueue
来实现。然后通过设置RABBITMQ_WORKER
为\App\Queue\RabbitMQQueue::class
来告知Laravel使用您的类。
注意:Worker类必须扩展
VladimirYuldashev\LaravelQueueRabbitMQ\Queue\RabbitMQQueue
'connections' => [ // ... 'rabbitmq' => [ // ... /* Set to a class if you wish to use your own. */ 'worker' => \App\Queue\RabbitMQQueue::class, ], // ... ],
<?php namespace App\Queue; use VladimirYuldashev\LaravelQueueRabbitMQ\Queue\RabbitMQQueue as BaseRabbitMQQueue; class RabbitMQQueue extends BaseRabbitMQQueue { // ... }
例如:重连实现。
如果您希望在连接断开时重新连接到RabbitMQ,可以覆盖发布和创建通道的方法。
注意:这不是最佳实践,只是一个示例。
<?php namespace App\Queue; use PhpAmqpLib\Exception\AMQPChannelClosedException; use PhpAmqpLib\Exception\AMQPConnectionClosedException; use VladimirYuldashev\LaravelQueueRabbitMQ\Queue\RabbitMQQueue as BaseRabbitMQQueue; class RabbitMQQueue extends BaseRabbitMQQueue { protected function publishBasic($msg, $exchange = '', $destination = '', $mandatory = false, $immediate = false, $ticket = null): void { try { parent::publishBasic($msg, $exchange, $destination, $mandatory, $immediate, $ticket); } catch (AMQPConnectionClosedException|AMQPChannelClosedException) { $this->reconnect(); parent::publishBasic($msg, $exchange, $destination, $mandatory, $immediate, $ticket); } } protected function publishBatch($jobs, $data = '', $queue = null): void { try { parent::publishBatch($jobs, $data, $queue); } catch (AMQPConnectionClosedException|AMQPChannelClosedException) { $this->reconnect(); parent::publishBatch($jobs, $data, $queue); } } protected function createChannel(): AMQPChannel { try { return parent::createChannel(); } catch (AMQPConnectionClosedException) { $this->reconnect(); return parent::createChannel(); } } }
默认队列
当Laravel没有提供队列时,连接将使用默认队列,其值为'default'。您可以通过在连接配置中添加额外参数来更改默认队列。
'connections' => [ // ... 'rabbitmq' => [ // ... 'queue' => env('RABBITMQ_QUEUE', 'default'), ], // ... ],
心跳
默认情况下,您的连接将创建一个心跳设置为0
的连接。您可以通过更改配置来修改心跳设置。
'connections' => [ // ... 'rabbitmq' => [ // ... 'options' => [ // ... 'heartbeat' => 10, ], ], // ... ],
SSL安全
如果您需要与rabbitMQ服务器进行安全连接,需要添加这些额外的配置选项。
'connections' => [ // ... 'rabbitmq' => [ // ... 'secure' = > true, 'options' => [ // ... 'ssl_options' => [ 'cafile' => env('RABBITMQ_SSL_CAFILE', null), 'local_cert' => env('RABBITMQ_SSL_LOCALCERT', null), 'local_key' => env('RABBITMQ_SSL_LOCALKEY', null), 'verify_peer' => env('RABBITMQ_SSL_VERIFY_PEER', true), 'passphrase' => env('RABBITMQ_SSL_PASSPHRASE', null), ], ], ], // ... ],
数据库提交后的事件
指示Laravel工作器在所有数据库提交完成后派发事件。
'connections' => [ // ... 'rabbitmq' => [ // ... 'after_commit' => true, ], // ... ],
懒加载连接
默认情况下,您的连接将创建为懒加载连接。如果出于某种原因您不希望连接懒加载,可以通过以下配置将其关闭。
'connections' => [ // ... 'rabbitmq' => [ // ... 'lazy' = > false, ], // ... ],
网络协议
默认情况下,用于连接的网络协议是tcp。如果出于某种原因您想使用其他网络协议,可以在配置选项中添加额外值。支持的协议:tcp
、ssl
、tls
'connections' => [ // ... 'rabbitmq' => [ // ... 'network_protocol' => 'tcp', ], // ... ],
Octane支持
从13.3.0版本开始,此包默认支持Laravel Octane。首先,安装Octane,并不要忘记在octane配置中预热'rabbitmq'连接。
Laravel使用
完成配置后,您可以使用Laravel队列API。如果您使用过其他队列驱动程序,则不需要更改其他任何内容。如果您不知道如何使用队列API,请参阅官方Laravel文档:https://laravel.net.cn/docs/queues
Lumen使用
对于Lumen使用,应在bootstrap/app.php
中手动注册服务提供程序,如下所示:
$app->register(VladimirYuldashev\LaravelQueueRabbitMQ\LaravelQueueRabbitMQServiceProvider::class);
消费消息
有两种方式来消费消息。
-
queue:work
命令是Laravel内置的命令。该命令使用basic_get
。如果您想消费多个队列,请使用此命令。 -
rabbitmq:consume
命令是由此包提供的。该命令使用basic_consume
,比basic_get
性能高约2倍,但不支持多个队列。
测试
使用docker-compose
设置RabbitMQ
docker compose up -d
要运行测试套件,可以使用以下命令:
# To run both style and unit tests. composer test # To run only style tests. composer test:style # To run only unit tests. composer test:unit
如果您从样式测试中收到任何错误,您可以使用以下命令自动修复大多数,如果不是全部问题。
composer fix:style
贡献
您可以通过发现错误并打开问题来为此包做出贡献。请记住,在创建pull request或问题时要添加包的版本。(例如:[5.2] 延迟作业上的致命错误)