金融科技-fab / laravel-queue-rabbitmq
Laravel Queue的RabbitMQ驱动。支持Laravel Horizon。
Requires
- php: ^7.3|^8.0
- ext-json: *
- illuminate/queue: ^8.0
- php-amqplib/php-amqplib: ^2.12|^3.0
Requires (Dev)
- friendsofphp/php-cs-fixer: ^2.17
- laravel/horizon: ^5.0
- mockery/mockery: ^1.0
- orchestra/testbench: ^6.0
- phpunit/phpunit: ^9.3
Suggests
- ext-pcntl: Required to use all features of the queue consumer.
- v11.3.0
- v11.2.0
- v11.1.2
- v11.1.1
- v11.1.0
- dev-master / 11.0.x-dev
- v11.0.2
- v11.0.1
- v11.0.0
- v10.2.3
- v10.2.2
- v10.2.1
- v10.2.0
- v10.1.3
- v10.1.2
- v10.1.1
- v10.1.0
- v10.0.x-dev
- v10.0.2
- v10.0.1
- v10.0.0
- v9.1.2
- v9.1.1
- v9.1.0
- v9.0.x-dev
- v9.0.0
- v8.3.0
- v8.2.0
- v8.1.0
- v8.0.x-dev
- v8.0.0
- v7.5.0
- v7.4.2
- v7.4.1
- v7.4.0
- v7.3.0
- v7.2.0
- v7.1.2
- v7.1.1
- v7.1.0
- v7.0.x-dev
- v7.0.2
- v7.0.1
- v7.0.0
- v6.0.x-dev
- v6.0.3
- v6.0.2
- v6.0.1
- v6.0.0
- v5.5.x-dev
- 5.5
- v5.4.x-dev
- 5.4
- v5.3.x-dev
- 5.3
- v5.2.x-dev
- 5.2
- v5.1.x-dev
- 5.1
- v5.0.x-dev
- 5.0
- v4.2.x-dev
- 4.2
- 4.1
- 4.0
- dev-feature/abbility-to-hook-into-the-created-connection
- dev-after-creating-connection-callback
- dev-feature/use-custom-job-class-v10
This package is not auto-updated.
Last update: 2022-02-01 12:31:00 UTC
README
支持策略
只有最新版本会获得新功能。修复bug将按照以下方案进行
包版本 | Laravel版本 | bug修复至 | |
---|---|---|---|
9 | 6 | 2021年10月5日 | 文档 |
10 | 6, 7 | 2021年10月5日 | 文档 |
11 | 8 | 2021年4月6日 | 文档 |
安装
您可以使用以下命令通过composer安装此包
composer require vladimir-yuldashev/laravel-queue-rabbitmq
此包将自动注册自身。
将连接添加到config/queue.php
'connections' => [ // ... 'rabbitmq' => [ 'driver' => 'rabbitmq', 'queue' => env('RABBITMQ_QUEUE', 'default'), 'connection' => PhpAmqpLib\Connection\AMQPLazyConnection::class, 'hosts' => [ [ 'host' => env('RABBITMQ_HOST', '127.0.0.1'), 'port' => env('RABBITMQ_PORT', 5672), 'user' => env('RABBITMQ_USER', 'guest'), 'password' => env('RABBITMQ_PASSWORD', 'guest'), 'vhost' => env('RABBITMQ_VHOST', '/'), ], ], 'options' => [ 'ssl_options' => [ 'cafile' => env('RABBITMQ_SSL_CAFILE', null), 'local_cert' => env('RABBITMQ_SSL_LOCALCERT', null), 'local_key' => env('RABBITMQ_SSL_LOCALKEY', null), 'verify_peer' => env('RABBITMQ_SSL_VERIFY_PEER', true), 'passphrase' => env('RABBITMQ_SSL_PASSPHRASE', null), ], 'queue' => [ 'job' => VladimirYuldashev\LaravelQueueRabbitMQ\Queue\Jobs\RabbitMQJob::class, ], ], /* * Set to "horizon" if you wish to use Laravel Horizon. */ 'worker' => env('RABBITMQ_WORKER', 'default'), ], // ... ],
可选配置
可选地,将队列选项添加到连接的配置中。为该连接创建的每个队列都将获得这些属性。
当您想对延迟的消息进行优先级排序时,可以通过添加额外的选项来实现。
- 当省略max-priority时,使用时最大优先级设置为2。
'connections' => [ // ... 'rabbitmq' => [ // ... 'options' => [ 'queue' => [ // ... 'prioritize_delayed' => false, 'queue_max_priority' => 10, ], ], ], // ... ],
当您想针对带有routing-key的交换机发布消息时,可以通过添加额外的选项来实现。
- 当省略交换机时,RabbitMQ将使用
amq.direct
交换机进行路由键。 - 当省略路由键时,默认路由键是队列名称。
- 当在路由键中使用
%s
时,队列名称将被替换。
注意:当使用带有路由键的交换机时,您可能需要自己创建队列的绑定。
'connections' => [ // ... 'rabbitmq' => [ // ... 'options' => [ 'queue' => [ // ... 'exchange' => 'application-x', 'exchange_type' => 'topic', 'exchange_routing_key' => '', ], ], ], // ... ],
在Laravel中,失败的任务存储在数据库中。但也许您想指示其他进程也处理消息。当您想指示RabbitMQ将失败的消息重路由到交换机或特定队列时,可以通过添加额外的选项来实现。
- 当省略交换机时,RabbitMQ将使用
amq.direct
交换机进行路由键。 - 当省略路由键时,默认路由键将用队列名称替换为
'failed'
。 - 当在路由键中使用
%s
时,队列名称将被替换。
注意:当使用带有路由键的失败任务交换机时,您可能需要自己创建交换机/队列的绑定。
'connections' => [ // ... 'rabbitmq' => [ // ... 'options' => [ 'queue' => [ // ... 'reroute_failed' => true, 'failed_exchange' => 'failed-exchange', 'failed_routing_key' => 'application-x.%s', ], ], ], // ... ],
使用自己的RabbitMQJob类
有时您必须处理另一个应用程序发布的消息。
这些消息可能不会遵守Laravel的任务负载模式。这些消息的问题在于,Laravel工作进程无法确定实际的任务或类来执行。
您可以通过在队列连接配置中定义自己的类来扩展内置的RabbitMQJob::class
。当您在配置中指定一个具有自己的类名的job
键时,从代理检索的每个消息都将被您的类封装。
配置示例
'connections' => [ // ... 'rabbitmq' => [ // ... 'options' => [ 'queue' => [ // ... 'job' => \App\Queue\Jobs\RabbitMQJob::class, ], ], ], // ... ],
您自己的任务类的示例
<?php namespace App\Queue\Jobs; use VladimirYuldashev\LaravelQueueRabbitMQ\Queue\Jobs\RabbitMQJob as BaseJob; class RabbitMQJob extends BaseJob { /** * Fire the job. * * @return void */ public function fire() { $payload = $this->payload(); $class = WhatheverClassNameToExecute::class; $method = 'handle'; ($this->instance = $this->resolve($class))->{$method}($this, $payload); $this->delete(); } }
或者,您可能想向负载中添加额外的属性
<?php namespace App\Queue\Jobs; use VladimirYuldashev\LaravelQueueRabbitMQ\Queue\Jobs\RabbitMQJob as BaseJob; class RabbitMQJob extends BaseJob { /** * Get the decoded body of the job. * * @return array */ public function payload() { return [ 'job' => 'WhatheverFullyQualifiedClassNameToExecute@handle', 'data' => json_decode($this->getRawBody(), true) ]; } }
Laravel使用
一旦完成配置,您就可以使用Laravel队列API。如果您使用了其他队列驱动程序,您不需要更改任何其他内容。如果您不知道如何使用队列API,请参阅官方Laravel文档:https://laravel.net.cn/docs/queues
Laravel Horizon 使用
从8.0版本开始,此包默认支持Laravel Horizon。首先安装Horizon,然后设置RABBITMQ_WORKER
为horizon
。
Lumen 使用
对于Lumen使用,应该在bootstrap/app.php
中手动注册服务提供者。
$app->register(VladimirYuldashev\LaravelQueueRabbitMQ\LaravelQueueRabbitMQServiceProvider::class);
消费消息
有两种方式可以消费消息。
-
queue:work
命令是Laravel的内置命令。此命令使用basic_get
。 -
由此包提供的
rabbitmq:consume
命令。此命令使用basic_consume
,比basic_get
性能高出约2倍。
测试
使用docker-compose
设置RabbitMQ
docker-compose up -d rabbitmq
要运行测试套件,可以使用以下命令
# To run both style and unit tests. composer test # To run only style tests. composer test:style # To run only unit tests. composer test:unit
如果您从样式测试中收到任何错误,可以使用以下命令自动修复大多数,如果不是所有的问题
composer fix:style
贡献
您可以通过发现错误并创建问题来为此包做出贡献。请说明您创建的pull request或问题针对的是该包的哪个版本。(例如:[5.2] 延迟作业上的致命错误)