avto-dev/amqp-rabbit-laravel-queue

RabbitMQ Laravel 队列驱动程序

v2.9.0 2024-04-26 10:14 UTC

README

Laravel

RabbitMQ 基于的 Laravel 队列驱动程序

Version PHP Version Build Status Coverage Downloads count License

此包允许使用 RabbitMQ 队列来处理 Laravel(优先级)作业。完全可配置。

需要安装 PHP 扩展 ext-amqp。安装步骤可以在 Dockerfile 中找到。

对于需要延迟的作业,还应安装 RabbitMQ 的 rabbitmq-delayed-message-exchange 插件。延迟是可选功能。

安装

重要:在开始使用此包之前,您应该将 avto-dev/amqp-rabbit-manager 安装到您的应用程序中。安装步骤可以在 此处 找到。

使用以下命令通过 composer 安装此包

$ composer require avto-dev/amqp-rabbit-laravel-queue "^2.0"

需要安装 composer如何安装 composer)。您还需要解决包的主要版本问题。

您需要解决包的主要版本问题。

之后,您应该修改配置文件

./config/rabbitmq.php

RabbitMQ 队列和交换配置

<?php

use Interop\Amqp\AmqpQueue;
use Interop\Amqp\AmqpTopic;

return [

    // ...

    'queues' => [

        'jobs' => [
            'name'         => env('JOBS_QUEUE_NAME', 'jobs'),
            'flags'        => AmqpQueue::FLAG_DURABLE, // Remain queue active when a server restarts
            'arguments'    => [
                'x-max-priority' => 255, // @link <https://rabbitmq.cn/priority.html>
            ],
            'consumer_tag' => null,
        ],

        'failed' => [
            'name'         => env('FAILED_JOBS_QUEUE_NAME', 'failed-jobs'),
            'flags'        => AmqpQueue::FLAG_DURABLE,
            'arguments'    => [
                'x-message-ttl' => 604800000, // 7 days, @link <https://rabbitmq.cn/ttl.html>
                'x-queue-mode'  => 'lazy', // @link <https://rabbitmq.cn/lazy-queues.html>
            ],
            'consumer_tag' => null,
        ],

    ],

    // ...

    'exchanges' => [

        // RabbitMQ Delayed Message Plugin is required (@link: <https://git.io/fj4SE>)
        'delayed-jobs' => [
            'name'      => env('DELAYED_JOBS_EXCHANGE_NAME', 'jobs.delayed'),
            'type'      => 'x-delayed-message',
            'flags'     => AmqpTopic::FLAG_DURABLE, // Remain active when a server restarts
            'arguments' => [
                'x-delayed-type' => AmqpTopic::TYPE_DIRECT,
            ],
        ],

    ],

    // ...

    'setup' => [
        'rabbit-default' => [
            'queues' => [
                'jobs',
                'failed',
            ],
            'exchanges' => [
                'delayed-jobs'
            ],
        ],
    ],
];

./config/queue.php

Laravel 队列设置

<?php

use AvtoDev\AmqpRabbitLaravelQueue\Connector;

return [

    // ...

    'default' => env('QUEUE_DRIVER', 'rabbitmq'),

    // ...

    'connections' => [

        // ...

        'rabbitmq' => [
            'driver'              => Connector::NAME,
            'connection'          => 'rabbit-default',
            'queue_id'            => 'jobs',
            'delayed_exchange_id' => 'delayed-jobs',
            'timeout'             => (int) env('QUEUE_TIMEOUT', 0), // The timeout is in milliseconds
            'resume'              => (bool) env('QUEUE_RESUME', false), // Resume consuming when timeout is over
        ],
    ],

    // ...

    'failed' => [
        'connection' => 'rabbit-default',
        'queue_id'   => 'failed',
    ],
];

resume 可以与非零 timeout 值一起使用,用于周期性连接重载 (例如,如果您设置 'timeout' => 30000'resume' => true,队列工作进程将在 30 秒内取消订阅并重新订阅队列,而无需退出进程)

您可以通过删除 delayed_exchange_id 来禁用延迟作业功能。

最后,别忘了执行命令 php ./artisan rabbit:setup

延迟作业如何工作?

非常简单

用法

您可以使用常规方式调度作业(dispatch(new Job)dispatch(new Job)->delay(10)),如 queue:workqueue:failedqueue:retry 等命令都能正常工作。

附加功能

  • 作业延迟(需要 RabbitMQ 服务器上的 rabbitmq_delayed_message_exchange 插件);
  • 作业优先级(作业应实现 PrioritizedJobInterface 接口);
  • 自动延迟消息交换绑定(仅当您使用 rabbit:setup 命令创建队列和交换时);
  • 存储 job 状态的能力

状态存储

使用此包,您可以在作业重启之间存储任何变量(除了资源和可调用实体)(只需在您的作业类中使用 trait WithJobStateTrait)。但您应该记住 - 状态仅在作业 handle 方法内部可用。

消费者自定义标签前缀

每个消费者都有一个标识符,客户端库使用该标识符来确定针对给定投递调用哪个处理程序。它们的名称因协议而异。消费者标签和订阅 ID 是最常用的两个术语。

如果您想为消费者标签添加自定义前缀,您可以在 AvtoDev\AmqpRabbitLaravelQueue\Worker::__construct 方法中指定它。

⚠️ 警告

小心使用 queue:failedqueue:retry 命令。如果在命令执行期间发生某些情况(丢失连接等),您可能会丢失所有失败的作业!

您应该避免使用以下方法 (代理不保证操作顺序,因此调用结果可能不正确)

  • \AvtoDev\AmqpRabbitLaravelQueue\Queue::size()
  • \AvtoDev\AmqpRabbitLaravelQueue\Failed\RabbitQueueFailedJobProvider::count()

测试

对于包测试,我们使用 phpunit 框架以及 docker-ce + docker-compose 作为开发环境。所以,在克隆仓库后,只需在您的终端中输入以下命令

$ make build
$ make latest # or 'make lowest'
$ make test

变更日志

Release date Commits since latest release

变更日志可以在此处找到

支持

Issues Issues

如果您发现任何包错误,请在该仓库中提交一个issue

许可证

这是一个开源软件,受MIT许可证许可。