vnay92 / laravel-custom-queue
非 Laravel 作业的消息队列处理器。在 Laravel 与外部系统通信时非常有用
Requires
- laravel/framework: ^5.1
- php-amqplib/php-amqplib: ^2.6
This package is not auto-updated.
Last update: 2024-09-29 05:48:26 UTC
README
Laravel 自定义队列处理器是一个简单的实现,用于处理 Laravel 框架外的所有队列和消息。
有时您的应用程序可能希望以 JSON 格式消费消息,这些消息是通过不实现 Laravel 作业的队列发送的。
本软件包旨在解决此类用例。
支持与 Laravel 相同的命令,具有相同的参数,并增加一个额外参数
-
custom-queue:work --handler="Class\Path\To\Handler"
-
custom-queue:listen --handler="Class\Path\To\Handler"
-
custom-queue:restart
当前支持的队列
- RabbitMQ
Laravel 版本兼容性
- Laravel
4.x
不受支持。 - Laravel
5.x.x
从5.1
版本开始支持,分别在不同的分支。
安装
Laravel 5.x
安装 vnay92/laravel-custom-queue
包
$ composer require vnay92/laravel-custom-queue
您需要在 config/app.php
中添加以下内容
'providers' => array( // ... Vnay92\CustomQueue\CustomQueueServiceProvider::class, )
配置
创建自定义队列配置文件(config/custom-queue.php
)
$ php artisan vendor:publish --provider="Vnay92\CustomQueue\CustomQueueServiceProvider"
并将以下属性添加到 .env
文件中,并使用正确的值
CUSTOM_QUEUE_DRIVER=rabbitmq
RABBITMQ_HOST=127.0.0.1
RABBITMQ_PORT=5672
RABBITMQ_VHOST=/
RABBITMQ_LOGIN=guest
RABBITMQ_PASSWORD=guest
RABBITMQ_QUEUE=queue_name
上述命令和配置的使用方法如下
/* path/to/project/config/custom-queue.php */ return [ /* |-------------------------------------------------------------------------- | Default Queue Driver |-------------------------------------------------------------------------- | | The API, based on the Laravel queue API supports a variety of back-ends via an unified | API, giving you convenient access to each back-end using the same | syntax for each one. Here you may set the default queue driver. | | Supported: "rabbitmq" | */ 'default' => env('CUSTOM_QUEUE_DRIVER', 'rabbitmq'), /* |-------------------------------------------------------------------------- | Queue Connections |-------------------------------------------------------------------------- | | Here you may configure the connection information for each server that | is used by your application. A default configuration has been added | for each back-end shipped with Laravel. You are free to add more. | */ 'connections' => [ 'rabbitmq' => [ 'driver' => 'rabbitmq', 'host' => env('RABBITMQ_HOST', 'localhost'), 'port' => env('RABBITMQ_PORT', 5672), 'vhost' => env('RABBITMQ_VHOST', '/'), 'login' => env('RABBITMQ_LOGIN', 'guest'), 'password' => env('RABBITMQ_PASSWORD', 'guest'), 'queue' => env('RABBITMQ_QUEUE', 'custom_default'), // name of the default queue, 'exchange_declare' => env('RABBITMQ_EXCHANGE_DECLARE', true), // create the exchange if not exists 'queue_declare_bind' => env('RABBITMQ_QUEUE_DECLARE_BIND', true), // create the queue if not exists and bind to the exchange 'queue_params' => [ 'passive' => env('RABBITMQ_QUEUE_PASSIVE', false), 'durable' => env('RABBITMQ_QUEUE_DURABLE', true), 'exclusive' => env('RABBITMQ_QUEUE_EXCLUSIVE', false), 'auto_delete' => env('RABBITMQ_QUEUE_AUTODELETE', false), ], 'exchange_params' => [ 'name' => env('RABBITMQ_EXCHANGE_NAME', null), 'type' => env('RABBITMQ_EXCHANGE_TYPE', 'direct'), // more info at https://rabbitmq.cn/tutorials/amqp-concepts.html 'passive' => env('RABBITMQ_EXCHANGE_PASSIVE', false), 'durable' => env('RABBITMQ_EXCHANGE_DURABLE', true), // the exchange will survive server restarts 'auto_delete' => env('RABBITMQ_EXCHANGE_AUTODELETE', false), ], ], ], /* |-------------------------------------------------------------------------- | Failed Queue Jobs |-------------------------------------------------------------------------- | | These options configure the behavior of failed queue job logging so you | can control which database and table are used to store the jobs that | have failed. You may change them to any database / table you wish. | */ 'failed' => [ 'database' => 'mysql', 'table' => 'failed_jobs', ], ];
示例
创建一个实现 Vnay92\Contracts\HandlerInterface
的处理器类,并实现 handle()
方法。
<?php namespace App\Services\ExternalMessagesService; use App\Utility\UserAuth; /** * Class ExternalMessagesService * * @package App\Services\ExternalMessagesService */ class ExternalMessagesService { /** * @var array */ private $serviceTableMap = []; /** * @var UserAuth */ private $userAuth; /** * ExternalMessagesService constructor. * * @param UserAuth $userAuth */ public function __construct(UserAuth $userAuth) { $this->userAuth = $userAuth; $this->serviceTableMap = [ 'mail_service' => \App::make(StoreService::class), ]; } /** * Handle External Service messages. * * @param array $message * * @return void * @throws \Exception */ public function handle(array $message) { $source = array_get($message, 'source'); if (!isset($this->serviceTableMap[$source])) { \Log::error('[EXTERNAL_SERVICE] Event Not Recognised: ', [$source]); return; } // set seller context if ($this->userAuth->login(null, $message['userId'])) { // Handle the data in the respective class. return $this->serviceTableMap[$message['source']]->handle($message); } return; } }
然后按照以下方式开始监听队列
$ php artisan custom-queue:work --handler="App\Services\ExternalMessagesService\ExternalMessagesService" --queue=custom_queue
贡献
依赖项通过 composer 管理
$ composer install
待办事项
- 提供许多队列连接器
- 使用 PHPUnit 进行测试
- 为所有方法和成员提供 DocBlokr
- 为每个 Laravel 版本提供支持。