avto-dev / amqp-rabbit-laravel-queue
RabbitMQ Laravel 队列驱动程序
Requires
- php: ^8.1
- ext-amqp: *
- ext-json: *
- avto-dev/amqp-rabbit-manager: ^2.10
- illuminate/container: ^10.0 || ^11.0
- illuminate/contracts: ^10.0 || ^11.0
- illuminate/queue: ^10.0 || ^11.0
- illuminate/support: ^10.0 || ^11.0
- symfony/console: ^6.0 || ^7.0
Requires (Dev)
- laravel/laravel: ^10.0 || ^11.0
- mockery/mockery: ^1.6
- nesbot/carbon: ^2.66
- phpstan/phpstan: ^1.10
- phpunit/phpunit: ^10.5
- symfony/process: ^6.0 || ^7.0
README
RabbitMQ 基于的 Laravel 队列驱动程序
此包允许使用 RabbitMQ 队列来处理 Laravel(优先级)作业。完全可配置。
需要安装 PHP 扩展 ext-amqp
。安装步骤可以在 Dockerfile 中找到。
对于需要延迟的作业,还应安装 RabbitMQ 的 rabbitmq-delayed-message-exchange
插件。延迟是可选功能。
安装
重要:在开始使用此包之前,您应该将
avto-dev/amqp-rabbit-manager
安装到您的应用程序中。安装步骤可以在 此处 找到。
使用以下命令通过 composer 安装此包
$ composer require avto-dev/amqp-rabbit-laravel-queue "^2.0"
需要安装
composer
(如何安装 composer)。您还需要解决包的主要版本问题。
您需要解决包的主要版本问题。
之后,您应该修改配置文件
./config/rabbitmq.php
RabbitMQ 队列和交换配置
<?php use Interop\Amqp\AmqpQueue; use Interop\Amqp\AmqpTopic; return [ // ... 'queues' => [ 'jobs' => [ 'name' => env('JOBS_QUEUE_NAME', 'jobs'), 'flags' => AmqpQueue::FLAG_DURABLE, // Remain queue active when a server restarts 'arguments' => [ 'x-max-priority' => 255, // @link <https://rabbitmq.cn/priority.html> ], 'consumer_tag' => null, ], 'failed' => [ 'name' => env('FAILED_JOBS_QUEUE_NAME', 'failed-jobs'), 'flags' => AmqpQueue::FLAG_DURABLE, 'arguments' => [ 'x-message-ttl' => 604800000, // 7 days, @link <https://rabbitmq.cn/ttl.html> 'x-queue-mode' => 'lazy', // @link <https://rabbitmq.cn/lazy-queues.html> ], 'consumer_tag' => null, ], ], // ... 'exchanges' => [ // RabbitMQ Delayed Message Plugin is required (@link: <https://git.io/fj4SE>) 'delayed-jobs' => [ 'name' => env('DELAYED_JOBS_EXCHANGE_NAME', 'jobs.delayed'), 'type' => 'x-delayed-message', 'flags' => AmqpTopic::FLAG_DURABLE, // Remain active when a server restarts 'arguments' => [ 'x-delayed-type' => AmqpTopic::TYPE_DIRECT, ], ], ], // ... 'setup' => [ 'rabbit-default' => [ 'queues' => [ 'jobs', 'failed', ], 'exchanges' => [ 'delayed-jobs' ], ], ], ];
./config/queue.php
Laravel 队列设置
<?php use AvtoDev\AmqpRabbitLaravelQueue\Connector; return [ // ... 'default' => env('QUEUE_DRIVER', 'rabbitmq'), // ... 'connections' => [ // ... 'rabbitmq' => [ 'driver' => Connector::NAME, 'connection' => 'rabbit-default', 'queue_id' => 'jobs', 'delayed_exchange_id' => 'delayed-jobs', 'timeout' => (int) env('QUEUE_TIMEOUT', 0), // The timeout is in milliseconds 'resume' => (bool) env('QUEUE_RESUME', false), // Resume consuming when timeout is over ], ], // ... 'failed' => [ 'connection' => 'rabbit-default', 'queue_id' => 'failed', ], ];
resume
可以与非零timeout
值一起使用,用于周期性连接重载 (例如,如果您设置'timeout' => 30000
和'resume' => true
,队列工作进程将在 30 秒内取消订阅并重新订阅队列,而无需退出进程)。
您可以通过删除 delayed_exchange_id
来禁用延迟作业功能。
最后,别忘了执行命令 php ./artisan rabbit:setup
。
延迟作业如何工作?
非常简单
用法
您可以使用常规方式调度作业(dispatch(new Job)
或 dispatch(new Job)->delay(10)
),如 queue:work
、queue:failed
、queue:retry
等命令都能正常工作。
附加功能
- 作业延迟(需要 RabbitMQ 服务器上的
rabbitmq_delayed_message_exchange
插件); - 作业优先级(作业应实现
PrioritizedJobInterface
接口); - 自动延迟消息交换绑定(仅当您使用
rabbit:setup
命令创建队列和交换时); - 存储
job
状态的能力
状态存储
使用此包,您可以在作业重启之间存储任何变量(除了资源和可调用实体)(只需在您的作业类中使用 trait WithJobStateTrait
)。但您应该记住 - 状态仅在作业 handle
方法内部可用。
消费者自定义标签前缀
每个消费者都有一个标识符,客户端库使用该标识符来确定针对给定投递调用哪个处理程序。它们的名称因协议而异。消费者标签和订阅 ID 是最常用的两个术语。
如果您想为消费者标签添加自定义前缀,您可以在 AvtoDev\AmqpRabbitLaravelQueue\Worker::__construct
方法中指定它。
⚠️ 警告
小心使用 queue:failed
和 queue:retry
命令。如果在命令执行期间发生某些情况(丢失连接等),您可能会丢失所有失败的作业!
您应该避免使用以下方法 (代理不保证操作顺序,因此调用结果可能不正确)
\AvtoDev\AmqpRabbitLaravelQueue\Queue::size()
\AvtoDev\AmqpRabbitLaravelQueue\Failed\RabbitQueueFailedJobProvider::count()
测试
对于包测试,我们使用 phpunit
框架以及 docker-ce
+ docker-compose
作为开发环境。所以,在克隆仓库后,只需在您的终端中输入以下命令
$ make build $ make latest # or 'make lowest' $ make test
变更日志
变更日志可以在此处找到。
支持
如果您发现任何包错误,请在该仓库中提交一个issue。
许可证
这是一个开源软件,受MIT许可证许可。