libcast / jobqueue
JobQueue 是一个 PHP 组件,用于队列定义 `jobs` 的 `tasks`,这些任务可以由异步和/或分布式 `workers` 执行。
Requires
- php: ^7.1
- http-interop/response-sender: ^1.0
- middlewares/error-handler: ^1.0
- middlewares/fast-route: ^1.0
- middlewares/payload: ^1.0
- middlewares/request-handler: ^1.0
- predis/predis: ^1.0
- psr/log: ^1.0
- ramsey/uuid: ^3.0
- relay/relay: ^2.0
- symfony/config: ^4.0
- symfony/console: ^4.0
- symfony/dependency-injection: ^4.0
- symfony/event-dispatcher: ^4.0
- symfony/yaml: ^4.0
- vlucas/phpdotenv: ^2.0
- zendframework/zend-diactoros: ^1.0
Requires (Dev)
- guzzlehttp/guzzle: ^6.1
- phpunit/phpunit: ^6
- symfony/process: ^4.0
README
简单的 Job/Queue PHP 组件,帮助应用通过多个 worker 分布式任务。
安装
此包可通过 Composer 安装和自动加载。
$ composer require bricev/jobqueue
应用程序提供了一个 webservice 和两个 CLIs。请参阅以下文档以获取更多信息。
配置
- 必须安装 PHP 7.1
- 必须安装 Redis(用于队列数据持久化)
JOBQUEUE_ENV
环境变量可以设置为dev
、prod
(或任何字符串,未设置时默认为dev
)JOBQUEUE_REDIS_DSN
环境变量必须定义 Redis DSN(例如:'tcp://127.0.0.1:6379')
使用
定义一个 job
job
包含了由 worker 执行任务的代码。
每个 job 必须在实现 ExecutableJob
接口的 PHP 类中定义
<?php use JobQueue\Domain\Job\ExecutableJob; use JobQueue\Domain\Task\Task; use Psr\Log\LoggerAwareTrait; final class DummyJob implements ExecutableJob { use LoggerAwareTrait; /** * * @param Task $task */ function setUp(Task $task) { // This is called before the `perform()` method /** @todo prepare the job execution */ } /** * * @param Task $task */ function perform(Task $task) { /** @todo do the job! */ } /** * * @param Task $task */ function tearDown(Task $task) { // This is called after the `perform()` method /** @todo clean up after the job execution */ } }
注意:ExecutableJob
接口扩展了 LoggerAwareInterface
,可用于设置日志记录器。此文档提供了有关 PSR-3 日志接口的更多信息。
提示:可以通过运行以下命令将包 psr/log
添加到 composer 项目中(仓库)
composer require psr/log
\Psr\Log\LoggerAwareTrait
可以用来轻松添加日志设置器并符合 LoggerAwareInterface
。
定义一个 task
创建一个 task
需要以下元素
- 一个
profile
- 它可以是任何东西,它将任务分组到队列分区中,以便 JobQueueworker
应用程序可以从一个(仅一个)配置文件中消费任务 - 一个
job
- 包含执行任务的代码 - 可选的
parameters
,可以在 job 执行期间使用
在 PHP 中定义任务
<?php use JobQueue\Domain\Task\Profile; use JobQueue\Domain\Task\Task; use JobQueue\Tests\Domain\Job\DummyJob; $task = new Task( new Profile('foobar'), new DummyJob, [ 'foo' => 'bar', // [...] ] );
将任务添加到队列
首先,需要实例化队列。
这可以手动完成
<?php use JobQueue\Infrastructure\RedisQueue; use Predis\Client; $predis = new Client('tcp://:6379'); $queue = new RedisQueue($predis);
或者通过使用 ServiceContainer(这需要适当的配置,请参阅上面的 Configuration
部分)
<?php use JobQueue\Infrastructure\ServiceContainer; $queue = ServiceContainer::getInstance()->queue;
然后,可以轻松地将任务入队
<?php /** @var \JobQueue\Domain\Task\Queue $queue */ /** @var \JobQueue\Domain\Task\Task $task */ $queue->add($task);
任务的 job 将在 worker 开始消费任务配置文件时执行。此组件包含一个 PHP 可执行 worker。请参阅 CLI
部分以获取更多详细信息。
Worker 事件
worker 会发出一些可以监听的事件
事件名称 | 描述 | 事件属性 |
---|---|---|
worker.start |
在 worker 启动时触发 | $event->getWorker() |
worker.finished |
worker 完成运行后触发 | $event->getWorker() |
task.fetched |
每次 worker 从队列中提取任务时触发 | 获取任务:$event->getTask() |
task.executed |
当任务作业成功执行时触发 | 获取任务:$event->getTask() |
task.failed |
当任务作业执行失败时触发 | 获取任务:$event->getTask() |
要拦截事件,可以使用服务容器中的EventDispatcher
<?php use JobQueue\Infrastructure\ServiceContainer; $dispatcher = ServiceContainer::getInstance()->dispatcher; $dispatcher->addListener('task.failed', function ($event) { /** @var \JobQueue\Domain\Task\TaskHasFailed $event */ $task = $event->getTask(); // Do something... });
命令行界面
这些功能需要适当的配置,请参阅上文的配置
部分。
manager
应用程序可用于对任务执行CRUD操作。
用法
$ bin/manager list # lists all commands $ bin/manager {command} --help # display the command help
worker
应用程序可用于消费队列中的任务。
用法
$ bin/worker --help
worker
应用程序可作为操作系统服务(例如Unix上的upstart、systemd等)在服务器上运行。
网络服务
配置
应该配置一个网络服务器,以将public/index.php
作为路由脚本提供服务。此功能需要适当的配置,请参阅上文的配置
部分。
API
列出所有任务
GET /tasks
profile: string (a profile name that filters tasks)
status: waiting|running|finished|failed (a status that filters tasks)
order: date|profile|status (sort order, default: status)
返回任务数组
HTTP/1.1 200 Ok
Content-Type: application/json
[
{
"identifier": "47a5b21d-0a02-4e6e-b8c9-51dc1534cb68",
"status": "waiting",
"profile": "foobar",
"job": "JobQueue\\Tests\\Domain\\Job\\DummyJob",
"date": "Fri, 23 Feb 2018 13:45:22 +0000",
"parameters": {
"name_1": "value_1",
"name_2": "value_2"
}
}
]
错误
- 如果其中一个参数错误,将返回
400 Bad Request
- 如果状态不等于
waiting|running|finished|failed
,则返回“Status "foo" does not exists
” - 如果配置文件格式不正确,则返回“
Profile name only allows lowercase alphanumerical, dash and underscore characters
” - 如果排序不等于
date|profile|status
,则返回“Impossible to order by "foobar"
”
- 如果状态不等于
- 如果出现技术错误,则返回
500 Internal Server Error
获取任务信息
GET /task/{identifier}
返回任务定义
HTTP/1.1 200 Ok
Content-Type: application/json
{
"identifier": "47a5b21d-0a02-4e6e-b8c9-51dc1534cb68",
"status": "waiting",
"profile": "foobar",
"job": "JobQueue\\Tests\\Domain\\Job\\DummyJob",
"date": "Fri, 23 Feb 2018 13:45:22 +0000",
"parameters": {
"name_1": "value_1",
"name_2": "value_2"
}
}
错误
- 如果没有任务与标识符相对应,则返回
404 Not Found
- 如果出现技术错误,则返回
500 Internal Server Error
创建新的任务
POST /tasks
{
"profile": "foobar",
"job": "JobQueue\\Tests\\Domain\\Job\\DummyJob",
"parameters": {
"name_1": "value_1",
"name_2": "value_2"
}
}
返回任务定义
HTTP/1.1 201 Created
Content-Type: application/json
{
"identifier": "47a5b21d-0a02-4e6e-b8c9-51dc1534cb68",
"status": "waiting",
"profile": "foobar",
"job": "JobQueue\\Tests\\Domain\\Job\\DummyJob",
"date": "Fri, 23 Feb 2018 13:45:22 +0000",
"parameters": {
"name_1": "value_1",
"name_2": "value_2"
}
}
错误
- 如果JSON主体格式不正确,则返回
400 Bad Request
- 如果JSON对象缺少
job
值,则返回“Missing job
” - 如果JSON对象缺少
profile
值,则返回“Missing profile
” - 如果配置文件格式不正确,则返回“
Profile name only allows lowercase alphanumerical, dash and underscore characters
” - 如果JSON对象中的
parameters
值不是键值数组,则返回“Malformed parameters
”
- 如果JSON对象缺少
- 如果出现技术错误,则返回
500 Internal Server Error
测试
首先,本地必须运行一个Redis服务器(127.0.0.1上的6379端口)。
然后,要运行测试,请使用以下命令
$ php vendor/bin/phpunit
MIT许可证(MIT)。有关更多信息,请参阅LICENSE