littlesqx/aint-queue

此包已被废弃,不再维护。未建议替代包。

基于swoole构建的异步队列库,支持灵活的多消费者,协程。

v1.2.2 2020-06-01 03:46 UTC

README

Build Status License Php version Version

基于swoole构建的异步队列库,支持灵活的多消费者,协程。 中文说明

queue_status.gif

特性

  • 默认Redis驱动
  • 延迟任务
  • 自定义任务重试次数和时间
  • 自定义失败回调
  • 任务中间件
  • 队列快照事件
  • 并发处理,灵活的多工作者
  • 工作者协程支持
  • 美观的仪表板

要求

  • PHP 7.2+
  • Swoole 4.4+
  • Redis 3.2+ (redis驱动)

安装

composer require littlesqx/aint-queue -vvv

使用

配置

默认情况下,aint-queue将需要config/aint-queue.php作为默认配置。如果不存在,则/vendor/littlesqx/aint-queue/src/Config/config.php将是最终的配置文件。

<?php

use Littlesqx\AintQueue\Driver\Redis\Queue as RedisQueue;
use Littlesqx\AintQueue\Logger\DefaultLogger;

return [
    // channel_name => [...config]
    'default' => [
        'driver' => [
            'class' => RedisQueue::class,
            'connection' => [
                'host' => '127.0.0.1',
                'port' => 6379,
                'database' => '0',
                // 'password' => 'password',
            ],
        ],
        'logger' => [
            'class' => DefaultLogger::class,
            'options' => [
                'level' => \Monolog\Logger::DEBUG,
            ],
        ],
        'pid_path' => '/var/run/aint-queue',
        'consumer' => [
            'sleep_seconds' => 1,
            'memory_limit' => 96,
            'dynamic_mode' => true,
            'capacity' => 6,
            'flex_interval' => 5 * 60,
            'min_worker_number' => 5,
            'max_worker_number' => 30,
            'max_handle_number' => 0,
        ],
        'job_snapshot' => [
            'interval' => 5 * 60,
            'handler' => [],
        ],
    ],
];

所有选项

名称 类型 注释 默认值
channel 字符串 队列单元,每个队列推送者和队列监听器都在其上工作。支持多个通道,使用--channel选项。 默认值
driver.class 字符串 队列驱动类,实现了QueueInterface。 Redis
driver.connection 映射 队列驱动器的配置。
pid_path 字符串 监听器主进程pid文件的路径。请注意,需要权限。 /var/run/aint-queue
consumer.sleep_seconds 整数 每次从队列中弹出空数据后的睡眠秒数。 1
consumer.memory_limit 整数 Mb。当工作进程的内存使用超过限制时,工作进程将重新加载。 96
consumer.dynamic_mode 布尔值 确定工作进程的数量是否灵活地动态调整。 true
consumer.capacity 整数 每个消费者在健康和短时间内可以处理的能力,它影响动态模式时的工作者数量。 5
consumer.flex_interval 整数 flex_interval秒监控进程尝试调整工作者数量。只有在consumer.dynamic_mode = true时才工作。 5
consumer.min_worker_number 整数 最小扩展。 5
consumer.max_worker_number 整数 最大扩展。 30
consumer.max_handle_number 整数 当前消费者的最大任务处理时间。0表示没有限制。 0
job_snapshot 映射 interval秒执行一次handles。处理必须实现JobSnapshotterInterface。

推送任务

您可以在通过fpm/cli运行的项目中使用它。

<?php

use Littlesqx\AintQueue\Driver\DriverFactory;

$queue = DriverFactory::make($channel, $options);

// push a job
$queue->push(function () {
    echo "Hello aint-queue\n";
});

// push a delay job
$closureJob = function () {
    echo "Hello aint-queue delayed\n";
};
$queue->push($closureJob, 5);

// And class job are allowed.
// 1. Create a class which implements JobInterface, you can see the example in `/example`.
// 2. Noted that job pushed should be un-serialize by queue-listener,
//    it means queue-pusher and queue-listener are required to in the same project.                                          
// 3. You can see more examples in `example` directory.

管理队列

我们建议使用Supervisor来监控和控制监听器。

vendor/bin/aint-queue
AintQueue Console Tool

Usage:
  command [options] [arguments]

Options:
  -h, --help            Display this help message
  -q, --quiet           Do not output any message
  -V, --version         Display this application version
      --ansi            Force ANSI output
      --no-ansi         Disable ANSI output
  -n, --no-interaction  Do not ask any interactive question
  -v|vv|vvv, --verbose  Increase the verbosity of messages: 1 for normal output, 2 for more verbose output and 3 for debug

Available commands:
  help                 Displays help for a command
  list                 Lists commands
 queue
  queue:clear          Clear the queue.
  queue:dashboard      Start http server for dashboard.
  queue:reload-failed  Reload all the failed jobs onto the waiting queue.
  queue:status         Get the execute status of specific queue.
 worker
  worker:listen        Listen the queue.
  worker:reload        Reload worker for the queue.
  worker:run           Run the specific job.
  worker:stop          Stop listening the queue.

测试

composer test

贡献

您可以通过以下三种方式之一进行贡献

  1. 使用问题跟踪器提交错误报告。
  2. 问题跟踪器上回答问题或修复错误。
  3. 贡献新功能或更新wiki。

代码贡献过程并不十分正式。您只需确保遵循PSR-2、PSR-12编码规范。任何新的代码贡献必须附带相应的单元测试(如有适用)。

许可证

MIT