luoxiaojun/laravel-queue-rabbitmq

Laravel Queue 的 RabbitMQ 驱动。支持 Laravel Horizon

v8.2.0 2019-05-14 18:30 UTC

README

Latest Stable Version Build Status Total Downloads License

安装

您可以通过以下命令使用 composer 安装此包:

composer require luoxiaojun/laravel-queue-rabbitmq:dev-master -vvv

该包将自动使用 Laravel 自动发现功能注册自己。

config/queue.php 中设置连接

'connections' => [
    // ...
    'rabbitmq' => [
        'driver' => 'rabbitmq',
    
        'worker' => 'default',
    
        'dsn' => env('RABBITMQ_DSN', null),
    
        /*
         * Could be one a class that implements \Interop\Amqp\AmqpConnectionFactory for example:
         *  - \VladimirYuldashev\LaravelQueueRabbitMQ\Queue\Lib\Ext\AmqpConnectionFactory if you install enqueue/amqp-ext
         *  - \VladimirYuldashev\LaravelQueueRabbitMQ\Queue\Lib\Amqp\AmqpConnectionFactory if you install enqueue/amqp-lib
         */
        'factory_class' => env('RABBITMQ_CONN_FACTORY', \VladimirYuldashev\LaravelQueueRabbitMQ\Queue\Lib\Ext\AmqpConnectionFactory::class),
    
        'host' => env('RABBITMQ_HOST', '127.0.0.1'),
        'port' => env('RABBITMQ_PORT', 5672),
    
        'vhost' => env('RABBITMQ_VHOST', '/'),
        'login' => env('RABBITMQ_LOGIN', 'guest'),
        'password' => env('RABBITMQ_PASSWORD', 'guest'),
    
        //For Aliyun
        'aliyun_access_key' => env('RABBITMQ_ALIYUN_AK'),
        'aliyun_access_secret' => env('RABBITMQ_ALIYUN_AS'),
        'aliyun_resouce_owner_id' => intval(env('RABBITMQ_ALIYUN_OWNER')),
        'max_delay' => intval(env('RABBITMQ_MAX_DELAY', 86400000)), //ms
        'delay_strategy' => \Enqueue\AmqpTools\RabbitMqDelayPluginDelayStrategy::class, // or null for Aliyun
    
        'queue' => env('QUEUE_NAME', 'default'),
    
        'queue_name_prefix' => env('RABBITMQ_QUEUE_NAME_PREFIX'),
    
        'persisted' => boolval((int)env('RABBITMQ_PERSISTED', false)), //only for amqp-ext
    
        'options' => [
    
            'producer' => [
                'queueTopicExchangeMapping' => [
                    'test_queue' => [
                        'topic' => 'test_topic', // string, default is queue name
                        'exchange' => 'test_exchange', //string, default is global name or queue name
                    ],
                ],
    
                'retry_on_connect_error' => boolval((int)env('RABBITMQ_RETRY_ON_CONN_ERROR', true)),
                
                'aggQueue' => [
                    //
                ],
            ],
    
            'consumer' => [
                'queueTopicExchangeMapping' => [
                    'test_queue' => [
                        [
                            'topic' => 'test_topic', // string or array, default is queue name
                            'exchange' => 'test_exchange', //string, default is global name or queue name
                        ],
                    ],
                ],
                'queueJobMapping' => [
                    'test_queue' => [
                        '^test_topic$' => 'TestJob',
                    ],
                    'test_queue2' => 'TestJob',
                ],
            ],
    
            'exchange' => [
    
                'name' => env('RABBITMQ_EXCHANGE_NAME'),
    
                'exchange_name_prefix' => env('RABBITMQ_EXCHANGE_NAME_PREFIX'),
    
                'routing_key_prefix' => env('RABBITMQ_ROUTING_KEY_PREFIX'),
    
                /*
                * Determine if exchange should be created if it does not exist.
                */
                'declare' => env('RABBITMQ_EXCHANGE_DECLARE', true),
    
                //后缀为-delay的exchange也需要单独配置
                'declareMapping' => [
                    'test_exchange' => false,
                ],
    
                /*
                * Read more about possible values at https://rabbitmq.cn/tutorials/amqp-concepts.html
                */
                'type' => env('RABBITMQ_EXCHANGE_TYPE', \Interop\Amqp\AmqpTopic::TYPE_TOPIC),
                'passive' => env('RABBITMQ_EXCHANGE_PASSIVE', false),
                'durable' => env('RABBITMQ_EXCHANGE_DURABLE', true),
                'auto_delete' => env('RABBITMQ_EXCHANGE_AUTODELETE', false),
                'arguments' => env('RABBITMQ_EXCHANGE_ARGUMENTS'),
    
                //后缀为-delay的exchange也需要单独配置
                'optionsMapping' => [
                    'test_exchange' => [
                        'type' => env('RABBITMQ_EXCHANGE_TYPE', \Interop\Amqp\AmqpTopic::TYPE_TOPIC),
                        'passive' => env('RABBITMQ_EXCHANGE_PASSIVE', false),
                        'durable' => env('RABBITMQ_EXCHANGE_DURABLE', true),
                        'auto_delete' => env('RABBITMQ_EXCHANGE_AUTODELETE', false),
                        'arguments' => env('RABBITMQ_EXCHANGE_ARGUMENTS'),
                    ],
                ],
            ],
    
            'queue' => [
    
                /*
                * Determine if queue should be created if it does not exist.
                */
                'declare' => env('RABBITMQ_QUEUE_DECLARE', true),
    
                'declareMapping' => [
                    'test_queue' => false,
                ],
    
                /*
                * Determine if queue should be binded to the exchange created.
                */
                'bind' => env('RABBITMQ_QUEUE_DECLARE_BIND', true),
    
                //后缀为-delay的exchange也需要单独配置
                'bindMapping' => [
                    'test_queue' => [
                        'test_exchange' => [
                            'test_topic' => true,
                        ],
                    ],
                ],
    
                /*
                * Read more about possible values at https://rabbitmq.cn/tutorials/amqp-concepts.html
                */
                'passive' => env('RABBITMQ_QUEUE_PASSIVE', false),
                'durable' => env('RABBITMQ_QUEUE_DURABLE', true),
                'exclusive' => env('RABBITMQ_QUEUE_EXCLUSIVE', false),
                'auto_delete' => env('RABBITMQ_QUEUE_AUTODELETE', false),
                'arguments' => env('RABBITMQ_QUEUE_ARGUMENTS'),
    
                'optionsMapping' => [
                    'test_queue' => [
                        'passive' => env('RABBITMQ_QUEUE_PASSIVE', false),
                        'durable' => env('RABBITMQ_QUEUE_DURABLE', true),
                        'exclusive' => env('RABBITMQ_QUEUE_EXCLUSIVE', false),
                        'auto_delete' => env('RABBITMQ_QUEUE_AUTODELETE', false),
                        'arguments' => env('RABBITMQ_QUEUE_ARGUMENTS'),
                    ],
                ],
            ],
        ],
    
        /*
         * Determine the number of seconds to sleep if there's an error communicating with rabbitmq
         * If set to false, it'll throw an exception rather than doing the sleep for X seconds.
         */
        'sleep_on_error' => env('RABBITMQ_ERROR_SLEEP', false),
    
        /*
         * Optional SSL params if an SSL connection is used
         */
        'ssl_params' => [
            'ssl_on' => env('RABBITMQ_SSL', false),
            'cafile' => env('RABBITMQ_SSL_CAFILE', null),
            'local_cert' => env('RABBITMQ_SSL_LOCALCERT', null),
            'local_key' => env('RABBITMQ_SSL_LOCALKEY', null),
            'verify_peer' => env('RABBITMQ_SSL_VERIFY_PEER', true),
            'passphrase' => env('RABBITMQ_SSL_PASSPHRASE', null),
        ],
    ],
    // ...    
],

Laravel 使用

完成配置后,您可以使用 Laravel Queue API。如果您使用过其他队列驱动,则无需进行任何其他更改。如果您不知道如何使用 Queue API,请参阅官方 Laravel 文档:https://laravel.net.cn/docs/queues

Lumen 使用

对于 Lumen 的使用,应在 bootstrap/app.php 中手动注册服务提供者,如下所示:

$app->register(VladimirYuldashev\LaravelQueueRabbitMQ\LaravelQueueRabbitMQServiceProvider::class);

限制

  1. 不支持重试命令

延迟 > 1d(仅限阿里云)

Delay > 1d

使用其他 AMQP 转发器

该包使用基于 php-amqplibenqueue/amqp-lib 转发器。存在使用任何 amqp 兼容 转发器的可能性,例如 enqueue/amqp-extenqueue/amqp-bunny。以下是一个如何将传输更改为 enqueue/amqp-bunny 的示例。

首先,安装所需的传输包

composer require enqueue/amqp-bunny:^0.8

更改 config/queue.php 中的工厂类

    // ...
    'connections' => [
        'rabbitmq' => [
            'driver' => 'rabbitmq',
            'factory_class' => Enqueue\AmqpBunny\AmqpConnectionFactory::class,
        ],
    ],

测试

使用 docker-compose 设置 RabbitMQ

docker-compose up -d

运行测试

composer test

贡献

您可以通过发现错误和提交问题来为此包做出贡献。请指出您创建的拉取请求或问题所在的包版本。(例如,[5.2] 延迟作业上的致命错误)