littlesqx / aint-queue
此包已被废弃,不再维护。未建议替代包。
基于swoole构建的异步队列库,支持灵活的多消费者,协程。
v1.2.2
2020-06-01 03:46 UTC
Requires
- php: >=7.2
- ext-json: *
- ext-swoole: >=4.4
- illuminate/pipeline: ^6.16
- jeremeamia/superclosure: ^2.4
- monolog/monolog: ^2.0
- predis/predis: ^1.1
- psr/log: ^1.1
- symfony/console: ^4.4 | ^5.0
Requires (Dev)
- phpunit/phpunit: ^8.3
- swoole/ide-helper: @dev
Suggests
- php: >=7.3
- dev-master
- v1.2.2
- v1.2.1
- v1.2.0
- v1.1.4
- v1.1.3
- v1.1.2
- v1.1.1
- v1.1.0
- v1.0.2
- v1.0.1
- v1.0.0
- v0.9.2
- v0.9.1
- v0.9.0
- v0.8.2
- v0.8.1
- v0.8.0
- v0.7.2
- v0.7.1
- v0.7.0
- v0.6.0
- dev-optimize/failed-job-storage
- dev-pref/connection
- dev-fix/coroutine-exception-catching
- dev-feature/dashboard
- dev-feature/job-middleware
- dev-master-dev
- dev-pref/logger
- dev-pref/docs
This package is auto-updated.
Last update: 2023-06-04 08:41:35 UTC
README
基于swoole构建的异步队列库,支持灵活的多消费者,协程。 中文说明
特性
- 默认Redis驱动
- 延迟任务
- 自定义任务重试次数和时间
- 自定义失败回调
- 任务中间件
- 队列快照事件
- 并发处理,灵活的多工作者
- 工作者协程支持
- 美观的仪表板
要求
- PHP 7.2+
- Swoole 4.4+
- Redis 3.2+ (redis驱动)
安装
composer require littlesqx/aint-queue -vvv
使用
配置
默认情况下,aint-queue将需要config/aint-queue.php
作为默认配置。如果不存在,则/vendor/littlesqx/aint-queue/src/Config/config.php
将是最终的配置文件。
<?php use Littlesqx\AintQueue\Driver\Redis\Queue as RedisQueue; use Littlesqx\AintQueue\Logger\DefaultLogger; return [ // channel_name => [...config] 'default' => [ 'driver' => [ 'class' => RedisQueue::class, 'connection' => [ 'host' => '127.0.0.1', 'port' => 6379, 'database' => '0', // 'password' => 'password', ], ], 'logger' => [ 'class' => DefaultLogger::class, 'options' => [ 'level' => \Monolog\Logger::DEBUG, ], ], 'pid_path' => '/var/run/aint-queue', 'consumer' => [ 'sleep_seconds' => 1, 'memory_limit' => 96, 'dynamic_mode' => true, 'capacity' => 6, 'flex_interval' => 5 * 60, 'min_worker_number' => 5, 'max_worker_number' => 30, 'max_handle_number' => 0, ], 'job_snapshot' => [ 'interval' => 5 * 60, 'handler' => [], ], ], ];
所有选项
名称 | 类型 | 注释 | 默认值 |
---|---|---|---|
channel | 字符串 | 队列单元,每个队列推送者和队列监听器都在其上工作。支持多个通道,使用--channel 选项。 |
默认值 |
driver.class | 字符串 | 队列驱动类,实现了QueueInterface。 | Redis |
driver.connection | 映射 | 队列驱动器的配置。 | |
pid_path | 字符串 | 监听器主进程pid文件的路径。请注意,需要权限。 | /var/run/aint-queue |
consumer.sleep_seconds | 整数 | 每次从队列中弹出空数据后的睡眠秒数。 | 1 |
consumer.memory_limit | 整数 | Mb。当工作进程的内存使用超过限制时,工作进程将重新加载。 | 96 |
consumer.dynamic_mode | 布尔值 | 确定工作进程的数量是否灵活地动态调整。 | true |
consumer.capacity | 整数 | 每个消费者在健康和短时间内可以处理的能力,它影响动态模式时的工作者数量。 | 5 |
consumer.flex_interval | 整数 | 每flex_interval 秒监控进程尝试调整工作者数量。只有在consumer.dynamic_mode = true时才工作。 |
5 |
consumer.min_worker_number | 整数 | 最小扩展。 | 5 |
consumer.max_worker_number | 整数 | 最大扩展。 | 30 |
consumer.max_handle_number | 整数 | 当前消费者的最大任务处理时间。0 表示没有限制。 |
0 |
job_snapshot | 映射 | 每interval 秒执行一次handles 。处理必须实现JobSnapshotterInterface。 |
推送任务
您可以在通过fpm/cli运行的项目中使用它。
<?php use Littlesqx\AintQueue\Driver\DriverFactory; $queue = DriverFactory::make($channel, $options); // push a job $queue->push(function () { echo "Hello aint-queue\n"; }); // push a delay job $closureJob = function () { echo "Hello aint-queue delayed\n"; }; $queue->push($closureJob, 5); // And class job are allowed. // 1. Create a class which implements JobInterface, you can see the example in `/example`. // 2. Noted that job pushed should be un-serialize by queue-listener, // it means queue-pusher and queue-listener are required to in the same project. // 3. You can see more examples in `example` directory.
管理队列
我们建议使用Supervisor
来监控和控制监听器。
vendor/bin/aint-queue
AintQueue Console Tool Usage: command [options] [arguments] Options: -h, --help Display this help message -q, --quiet Do not output any message -V, --version Display this application version --ansi Force ANSI output --no-ansi Disable ANSI output -n, --no-interaction Do not ask any interactive question -v|vv|vvv, --verbose Increase the verbosity of messages: 1 for normal output, 2 for more verbose output and 3 for debug Available commands: help Displays help for a command list Lists commands queue queue:clear Clear the queue. queue:dashboard Start http server for dashboard. queue:reload-failed Reload all the failed jobs onto the waiting queue. queue:status Get the execute status of specific queue. worker worker:listen Listen the queue. worker:reload Reload worker for the queue. worker:run Run the specific job. worker:stop Stop listening the queue.
测试
composer test
贡献
您可以通过以下三种方式之一进行贡献
代码贡献过程并不十分正式。您只需确保遵循PSR-2、PSR-12编码规范。任何新的代码贡献必须附带相应的单元测试(如有适用)。
许可证
MIT