943626451/my-queue

基于 swoole 的异步队列库,支持灵活的多消费者,协程。

v1.1.4 2020-04-30 01:56 UTC

README

Build Status License Php version Version

一个基于 swoole 的异步队列库,支持灵活的多消费者,协程。 中文说明

queue_status.gif

特性

  • 默认 Redis 驱动器
  • 延迟任务
  • 自定义任务重试次数和时间
  • 自定义失败回调
  • 任务中间件
  • 队列快照事件
  • 并发处理,灵活的多工作者
  • 工作者协程支持
  • 美观的控制台

要求

  • PHP 7.2+
  • Swoole 4.4+
  • Redis 3.2+ (redis 驱动器)

安装

composer require littlesqx/aint-queue -vvv

用法

配置

默认情况下,aint-queue 将需要 config/aint-queue.php 作为默认配置。如果不存在,则 /vendor/littlesqx/aint-queue/src/Config/config.php 将是最终的配置文件。

<?php

use Littlesqx\AintQueue\Driver\Redis\Queue as RedisQueue;
use Littlesqx\AintQueue\Logger\DefaultLogger;

return [
    // channel_name => [...config]
    'default' => [
        'driver' => [
            'class' => RedisQueue::class,
            'connection' => [
                'host' => '127.0.0.1',
                'port' => 6379,
                'database' => '0',
                // 'password' => 'password',
            ],
        ],
        'logger' => [
            'class' => DefaultLogger::class,
            'options' => [
                'level' => \Monolog\Logger::DEBUG,
            ],
        ],
        'pid_path' => '/var/run/aint-queue',
        'consumer' => [
            'sleep_seconds' => 1,
            'memory_limit' => 96,
            'dynamic_mode' => true,
            'capacity' => 6,
            'flex_interval' => 5 * 60,
            'min_worker_number' => 5,
            'max_worker_number' => 30,
            'max_handle_number' => 0,
        ],
        'job_snapshot' => [
            'interval' => 5 * 60,
            'handler' => [],
        ],
    ],
];

所有选项

推送任务

您可以在通过 fpm/cli 运行的项目中使用它。

<?php

use Littlesqx\AintQueue\Driver\DriverFactory;

$queue = DriverFactory::make($channel, $options);

// push a job
$queue->push(function () {
    echo "Hello aint-queue\n";
});

// push a delay job
$closureJob = function () {
    echo "Hello aint-queue delayed\n";
};
$queue->push($closureJob, 5);

// And class job are allowed.
// 1. Create a class which implements JobInterface, you can see the example in `/example`.
// 2. Noted that job pushed should be un-serialize by queue-listener,
//    it means queue-pusher and queue-listener are required to in the same project.                                          
// 3. You can see more examples in `example` directory.

管理队列

我们建议使用 Supervisor 监控和控制监听器。

vendor/bin/aint-queue
AintQueue Console Tool

Usage:
  command [options] [arguments]

Options:
  -h, --help            Display this help message
  -q, --quiet           Do not output any message
  -V, --version         Display this application version
      --ansi            Force ANSI output
      --no-ansi         Disable ANSI output
  -n, --no-interaction  Do not ask any interactive question
  -v|vv|vvv, --verbose  Increase the verbosity of messages: 1 for normal output, 2 for more verbose output and 3 for debug

Available commands:
  help                 Displays help for a command
  list                 Lists commands
 queue
  queue:clear          Clear the queue.
  queue:dashboard      Start http server for dashboard.
  queue:reload-failed  Reload all the failed jobs onto the waiting queue.
  queue:status         Get the execute status of specific queue.
 worker
  worker:listen        Listen the queue.
  worker:reload        Reload worker for the queue.
  worker:run           Run the specific job.
  worker:stop          Stop listening the queue.

测试

composer test

贡献

您可以通过以下三种方式之一进行贡献

  1. 使用 问题跟踪器 提交错误报告。
  2. 问题跟踪器 上回答问题或修复错误。
  3. 贡献新功能或更新 wiki。

代码贡献流程并不非常正式。您只需要确保您遵循 PSR-2、PSR-12 编码指南。任何新的代码贡献都必须附带相应的单元测试(如果适用)。

许可

MIT