semnickolas / laravel-queue-rabbitmq
Laravel Queue 的 RabbitMQ 驱动程序
Requires
- php: ^8.0
- ext-json: *
- illuminate/queue: ^9.0|^10.0
- php-amqplib/php-amqplib: ^v3.5.2
Requires (Dev)
- laravel/framework: ^9.0|^10.0
- laravel/horizon: ^5.0
- laravel/pint: ^1.2
- mockery/mockery: ^1.0
- orchestra/testbench: ^7.0|^8.0
- phpunit/phpunit: ^9.3
Suggests
- ext-pcntl: Required to use all features of the queue consumer.
- v13.3.4
- v13.3.3
- v13.3.2
- v13.3.1
- v13.3.0
- v13.2.0
- v13.1.0
- dev-master / 13.0.x-dev
- v13.0.1
- v13.0.0
- v12.0.x-dev
- v12.0.1
- v12.0.0
- v11.3.0
- v11.2.0
- v11.1.2
- v11.1.1
- v11.1.0
- v11.0.x-dev
- v11.0.2
- v11.0.1
- v11.0.0
- v10.2.3
- v10.2.2
- v10.2.1
- v10.2.0
- v10.1.3
- v10.1.2
- v10.1.1
- v10.1.0
- v10.0.x-dev
- v10.0.2
- v10.0.1
- v10.0.0
- v9.1.2
- v9.1.1
- v9.1.0
- v9.0.x-dev
- v9.0.0
- v8.3.0
- v8.2.0
- v8.1.0
- v8.0.x-dev
- v8.0.0
- v7.5.0
- v7.4.2
- v7.4.1
- v7.4.0
- v7.3.0
- v7.2.0
- v7.1.2
- v7.1.1
- v7.1.0
- v7.0.x-dev
- v7.0.2
- v7.0.1
- v7.0.0
- v6.0.x-dev
- v6.0.3
- v6.0.2
- v6.0.1
- v6.0.0
- v5.5.x-dev
- 5.5
- v5.4.x-dev
- 5.4.2
- 5.4
- v5.3.x-dev
- 5.3
- v5.2.x-dev
- 5.2
- v5.1.x-dev
- 5.1
- v5.0.x-dev
- 5.0
- v4.2.x-dev
- 4.2
- 4.1
- 4.0
- dev-fix/pint-nullable-type-declarations
- dev-newverifypeer
- dev-adm-bome-patch-1
- dev-verifypeer
- dev-fix-lastpushed
- dev-changelog-1331
- dev-feature/support-octane-reconnect
- dev-changelog-13x-release-13_3_0
- dev-php82
- dev-newversions
- dev-octane
- dev-feature/abbility-to-hook-into-the-created-connection
- dev-after-creating-connection-callback
- dev-feature/use-custom-job-class-v10
This package is auto-updated.
Last update: 2024-09-24 17:01:04 UTC
README
支持政策
只有最新版本会获得新功能。将使用以下方案提供错误修复:
安装
您可以通过以下命令使用 composer 安装此包:
composer require vladimir-yuldashev/laravel-queue-rabbitmq
该包将自动注册自身。
配置
将连接添加到 config/queue.php
这是 rabbitMQ 连接/驱动程序工作的最小配置。
'connections' => [ // ... 'rabbitmq' => [ 'driver' => 'rabbitmq', 'hosts' => [ [ 'host' => env('RABBITMQ_HOST', '127.0.0.1'), 'port' => env('RABBITMQ_PORT', 5672), 'user' => env('RABBITMQ_USER', 'guest'), 'password' => env('RABBITMQ_PASSWORD', 'guest'), 'vhost' => env('RABBITMQ_VHOST', '/'), ], // ... ], // ... ], // ... ],
可选队列配置
可选地将队列选项添加到连接的配置中。为此连接创建的每个队列都将获得属性。
当您想对延迟的消息进行优先排序时,可以通过添加额外选项来实现。
- 如果省略了最大优先级,则在使用时将最大优先级设置为 2。
'connections' => [ // ... 'rabbitmq' => [ // ... 'options' => [ 'queue' => [ // ... 'prioritize_delayed' => false, 'queue_max_priority' => 10, ], ], ], // ... ],
当您想针对具有路由键的交换机发布消息时,可以通过添加额外选项来实现。
- 如果省略了交换机,RabbitMQ 将使用
amq.direct
交换机进行路由键。 - 如果省略了路由键,默认路由键是
queue
名称。 - 当在路由键中使用
%s
时,队列名称将被替换。
注意:当使用具有路由键的交换机时,您可能需要自己创建队列的绑定。
'connections' => [ // ... 'rabbitmq' => [ // ... 'options' => [ 'queue' => [ // ... 'exchange' => 'application-x', 'exchange_type' => 'topic', 'exchange_routing_key' => '', ], ], ], // ... ],
在 Laravel 中,失败的作业存储在数据库中。但您可能希望指示其他进程也对消息执行某些操作。当您想指示 RabbitMQ 将失败消息重新路由到交换机或特定队列时,可以通过添加额外选项来实现。
- 如果省略了交换机,RabbitMQ 将使用
amq.direct
交换机进行路由键。 - 如果省略了路由键,默认将
queue
名称替换为'.failed'
。 - 当在路由键中使用
%s
时,队列名称将被替换。
注意:当使用具有路由键的失败作业交换机时,您可能需要自己创建交换机/队列的绑定。
'connections' => [ // ... 'rabbitmq' => [ // ... 'options' => [ 'queue' => [ // ... 'reroute_failed' => true, 'failed_exchange' => 'failed-exchange', 'failed_routing_key' => 'application-x.%s', ], ], ], // ... ],
Horizon 支持
从 8.0 版本开始,此包支持 Laravel Horizon。首先安装 Horizon,然后设置 RABBITMQ_WORKER
为 horizon
。
Horizon 依赖于由工作进程发出的事件。这些事件通知 Horizon 对消息/作业做了什么处理。
此库支持 Horizon,但在配置中您必须通知 Laravel 使用与 horizon 兼容的 QueueApi。
'connections' => [ // ... 'rabbitmq' => [ // ... /* Set to "horizon" if you wish to use Laravel Horizon. */ 'worker' => env('RABBITMQ_WORKER', 'default'), ], // ... ],
使用您自己的 RabbitMQJob 类
有时您必须处理由其他应用程序发布的消息。
这些消息可能不会遵守 Laravel 的作业有效负载模式。这些消息的问题在于,Laravel 工作进程无法确定实际作业或要执行的类。
您可以通过扩展内置的 RabbitMQJob::class
并在队列连接配置中定义自己的类来定义自己的类。当您在配置中指定一个 job
键并使用自己的类名时,从代理中检索的每个消息都将由您的类包装。
配置示例
'connections' => [ // ... 'rabbitmq' => [ // ... 'options' => [ 'queue' => [ // ... 'job' => \App\Queue\Jobs\RabbitMQJob::class, ], ], ], // ... ],
您自己的作业类示例
<?php namespace App\Queue\Jobs; use VladimirYuldashev\LaravelQueueRabbitMQ\Queue\Jobs\RabbitMQJob as BaseJob; class RabbitMQJob extends BaseJob { /** * Fire the job. * * @return void */ public function fire() { $payload = $this->payload(); $class = WhatheverClassNameToExecute::class; $method = 'handle'; ($this->instance = $this->resolve($class))->{$method}($this, $payload); $this->delete(); } }
或者,您可能想要添加额外的属性到有效负载中
<?php namespace App\Queue\Jobs; use VladimirYuldashev\LaravelQueueRabbitMQ\Queue\Jobs\RabbitMQJob as BaseJob; class RabbitMQJob extends BaseJob { /** * Get the decoded body of the job. * * @return array */ public function payload() { return [ 'job' => 'WhatheverFullyQualifiedClassNameToExecute@handle', 'data' => json_decode($this->getRawBody(), true) ]; } }
如果您想处理原始消息,而不是 JSON 格式的消息或在 JSON 中没有 'job' 键,您应该为 getName
方法添加存根
<?php namespace App\Queue\Jobs; use Illuminate\Support\Facades\Log; use VladimirYuldashev\LaravelQueueRabbitMQ\Queue\Jobs\RabbitMQJob as BaseJob; class RabbitMQJob extends BaseJob { public function fire() { $anyMessage = $this->getRawBody(); Log::info($anyMessage); $this->delete(); } public function getName() { return ''; } }
使用您自己的连接
您可以通过扩展内置的 PhpAmqpLib\Connection\AMQPStreamConnection::class
或 PhpAmqpLib\Connection\AMQPSLLConnection::class
来扩展内置的类,并在连接配置中定义自己的类。当您在配置中指定一个 connection
键并使用自己的类名时,每个连接都将使用您的类。
配置示例
'connections' => [ // ... 'rabbitmq' => [ // ... 'connection' = > \App\Queue\Connection\MyRabbitMQConnection::class, ], // ... ],
使用您自己的 Worker 类
如果您想使用自己的 RabbitMQQueue::class
,可以通过扩展 VladimirYuldashev\LaravelQueueRabbitMQ\Queue\RabbitMQQueue
来实现。并通过将 RABBITMQ_WORKER
设置为 \App\Queue\RabbitMQQueue::class
来通知 Laravel 使用您的类。
注意:工作类 必须 扩展
VladimirYuldashev\LaravelQueueRabbitMQ\Queue\RabbitMQQueue
'connections' => [ // ... 'rabbitmq' => [ // ... /* Set to a class if you wish to use your own. */ 'worker' => \App\Queue\RabbitMQQueue::class, ], // ... ],
<?php namespace App\Queue; use VladimirYuldashev\LaravelQueueRabbitMQ\Queue\RabbitMQQueue as BaseRabbitMQQueue; class RabbitMQQueue extends BaseRabbitMQQueue { // ... }
例如:重连实现。
如果您想在连接断开时重新连接到 RabbitMQ,可以覆盖发布和创建通道的方法。
注意:这不是最佳实践,只是一个例子。
<?php namespace App\Queue; use PhpAmqpLib\Exception\AMQPChannelClosedException; use PhpAmqpLib\Exception\AMQPConnectionClosedException; use VladimirYuldashev\LaravelQueueRabbitMQ\Queue\RabbitMQQueue as BaseRabbitMQQueue; class RabbitMQQueue extends BaseRabbitMQQueue { protected function publishBasic($msg, $exchange = '', $destination = '', $mandatory = false, $immediate = false, $ticket = null): void { try { parent::publishBasic($msg, $exchange, $destination, $mandatory, $immediate, $ticket); } catch (AMQPConnectionClosedException|AMQPChannelClosedException) { $this->reconnect(); parent::publishBasic($msg, $exchange, $destination, $mandatory, $immediate, $ticket); } } protected function publishBatch($jobs, $data = '', $queue = null): void { try { parent::publishBatch($jobs, $data, $queue); } catch (AMQPConnectionClosedException|AMQPChannelClosedException) { $this->reconnect(); parent::publishBatch($jobs, $data, $queue); } } protected function createChannel(): AMQPChannel { try { return parent::createChannel(); } catch (AMQPConnectionClosedException) { $this->reconnect(); return parent::createChannel(); } } }
默认队列
当 Laravel 未提供队列时,连接将使用默认队列,其值为 'default'。您可以通过在连接配置中添加额外参数来更改默认队列。
'connections' => [ // ... 'rabbitmq' => [ // ... 'queue' => env('RABBITMQ_QUEUE', 'default'), ], // ... ],
心跳
默认情况下,您的连接将创建为一个带有心跳设置 0
的连接。您可以通过更改配置来修改心跳设置。
'connections' => [ // ... 'rabbitmq' => [ // ... 'options' => [ // ... 'heartbeat' => 10, ], ], // ... ],
SSL 安全
如果您需要与 RabbitMQ 服务器建立安全连接,您需要添加这些额外的配置选项。
'connections' => [ // ... 'rabbitmq' => [ // ... 'secure' = > true, 'options' => [ // ... 'ssl_options' => [ 'cafile' => env('RABBITMQ_SSL_CAFILE', null), 'local_cert' => env('RABBITMQ_SSL_LOCALCERT', null), 'local_key' => env('RABBITMQ_SSL_LOCALKEY', null), 'verify_peer' => env('RABBITMQ_SSL_VERIFY_PEER', true), 'passphrase' => env('RABBITMQ_SSL_PASSPHRASE', null), ], ], ], // ... ],
数据库提交后的事件
指导 Laravel 工作者在所有数据库提交完成后分发事件。
'connections' => [ // ... 'rabbitmq' => [ // ... 'after_commit' => true, ], // ... ],
懒加载连接
默认情况下,您的连接将创建为一个懒加载连接。如果出于某些原因您不想使用懒加载连接,可以通过以下配置将其关闭。
'connections' => [ // ... 'rabbitmq' => [ // ... 'lazy' = > false, ], // ... ],
网络协议
默认情况下,用于连接的网络协议是 tcp。如果出于某些原因您想使用其他网络协议,您可以在配置选项中添加额外的值。支持的协议:tcp
、ssl
、tls
'connections' => [ // ... 'rabbitmq' => [ // ... 'network_protocol' => 'tcp', ], // ... ],
Octane 支持
从 13.3.0 版本开始,此软件包默认支持 Laravel Octane。首先安装 Octane,并别忘了在 octane 配置中预热 'rabbitmq' 连接。
Laravel 使用
完成配置后,您可以使用 Laravel 队列 API。如果您使用了其他队列驱动,您不需要更改其他任何内容。如果您不知道如何使用队列 API,请参阅官方 Laravel 文档:https://laravel.net.cn/docs/queues
Lumen 使用
对于 Lumen 使用,应在 bootstrap/app.php
中手动注册服务提供者,如下所示。
$app->register(VladimirYuldashev\LaravelQueueRabbitMQ\LaravelQueueRabbitMQServiceProvider::class);
消费消息
有两种方式可以消费消息。
-
queue:work
命令是 Laravel 内置的命令。此命令使用basic_get
。如果您想消费多个队列,请使用此命令。 -
由本软件包提供的
rabbitmq:consume
命令。此命令使用basic_consume
,比basic_get
性能高出约 2 倍,但不支持多个队列。
测试
使用 docker-compose
设置 RabbitMQ
docker compose up -d
要运行测试套件,可以使用以下命令
# To run both style and unit tests. composer test # To run only style tests. composer test:style # To run only unit tests. composer test:unit
如果您收到样式测试的任何错误,您可以使用以下命令自动修复大多数,如果不是所有的问题
composer fix:style
贡献
您可以通过发现错误和打开问题来为此软件包做出贡献。请记住在您创建拉取请求或问题中添加软件包的版本。(例如:[5.2] 延迟作业上的致命错误)