bricev / jobqueue
JobQueue 是一个 PHP 组件,用于排队 `任务` 定义 `工作`,这些工作可以由异步和/或分布式 `工作者` 执行
Requires
- php: ^7.1
- http-interop/response-sender: ^1.0
- middlewares/error-handler: ^1.0
- middlewares/fast-route: ^1.0
- middlewares/payload: ^1.0
- middlewares/request-handler: ^1.0
- predis/predis: ^1.0
- psr/log: ^1.0
- ramsey/uuid: ^3.0
- relay/relay: ^2.0
- symfony/config: ^4.0
- symfony/console: ^4.0
- symfony/dependency-injection: ^4.0
- symfony/event-dispatcher: ^4.0
- symfony/yaml: ^4.0
- vlucas/phpdotenv: ^2.0
- zendframework/zend-diactoros: ^1.0
Requires (Dev)
- guzzlehttp/guzzle: ^6.1
- phpunit/phpunit: ^6
- symfony/process: ^4.0
README
简单的 Job/Queue PHP 组件,帮助应用程序通过多个工作者分发任务。
安装
此软件包可以通过 Composer 安装和自动加载
$ composer require bricev/jobqueue
应用程序提供一项 Web 服务和两个 CLIs。请阅读下面的文档以获取更多信息。
配置
- 必须安装 PHP 7.1
- 必须安装 Redis(用于队列数据持久化)
JOBQUEUE_ENV
环境变量可以设置为dev
、prod
(或任何字符串,默认未设置时为dev
)JOBQUEUE_REDIS_DSN
环境变量必须定义 Redis DSN(例如:'tcp://127.0.0.1:6379')
用法
定义一个工作
一个 工作
包含了工作者将要执行的代码。
每个工作必须在实现 ExecutableJob
接口的 PHP 类中定义
<?php use JobQueue\Domain\Job\ExecutableJob; use JobQueue\Domain\Task\Task; use Psr\Log\LoggerAwareTrait; final class DummyJob implements ExecutableJob { use LoggerAwareTrait; /** * * @param Task $task */ function setUp(Task $task) { // This is called before the `perform()` method /** @todo prepare the job execution */ } /** * * @param Task $task */ function perform(Task $task) { /** @todo do the job! */ } /** * * @param Task $task */ function tearDown(Task $task) { // This is called after the `perform()` method /** @todo clean up after the job execution */ } }
注意:ExecutableJob
接口扩展了 LoggerAwareInterface
,可以用来设置一个记录器。此文档提供了有关 PSR-3 日志接口的更多信息。
提示:可以通过运行此命令将软件包 psr/log
(repo)添加到 composer 项目中
composer require psr/log
可以使用 \Psr\Log\LoggerAwareTrait
来轻松添加一个记录器设置器并符合 LoggerAwareInterface
。
定义一个任务
创建一个 任务
需要以下元素
- 一个
配置文件
- 它可以是任何东西,它将任务分组到队列分区中,以便 JobQueue工作者
应用程序可以从一个(仅一个)配置文件中消费任务 - 一个
工作
- 包含执行任务所需的代码 - 可选的
参数
,在工作执行期间可以使用
在 PHP 中定义一个任务
<?php use JobQueue\Domain\Task\Profile; use JobQueue\Domain\Task\Task; use JobQueue\Tests\Domain\Job\DummyJob; $task = new Task( new Profile('foobar'), new DummyJob, [ 'foo' => 'bar', // [...] ] );
将任务添加到队列
首先,需要实例化队列。
这可以手动完成
<?php use JobQueue\Infrastructure\RedisQueue; use Predis\Client; $predis = new Client('tcp://:6379'); $queue = new RedisQueue($predis);
或使用 ServiceContainer(这需要适当的配置,请参阅上面的 配置
部分)
<?php use JobQueue\Infrastructure\ServiceContainer; $queue = ServiceContainer::getInstance()->queue;
然后,可以轻松地将任务入队
<?php /** @var \JobQueue\Domain\Task\Queue $queue */ /** @var \JobQueue\Domain\Task\Task $task */ $queue->add($task);
任务的工作将在工作者开始消费任务的配置文件时执行。此组件包含一个可执行的 PHP 工作者。有关其用法的更多详细信息,请参阅 CLI
部分。
工作者事件
工作者发出一些可以监听的事件
要拦截一个事件,可以使用服务容器中的 EventDispatcher
<?php use JobQueue\Infrastructure\ServiceContainer; $dispatcher = ServiceContainer::getInstance()->dispatcher; $dispatcher->addListener('task.failed', function ($event) { /** @var \JobQueue\Domain\Task\TaskHasFailed $event */ $task = $event->getTask(); // Do something... });
CLI
这些功能需要适当的配置,请参阅上面的 配置
部分。
可以使用 manager
应用程序对任务执行 CRUD 操作。
用法
$ bin/manager list # lists all commands $ bin/manager {command} --help # display the command help
可以使用 worker
应用程序消费入队的任务。
用法
$ bin/worker --help
可以将 worker
应用程序用作操作系统服务(例如,Unix 上的 upstart、systemd...)以在服务器上运行。
Web 服务
配置
应配置 Web 服务器以将 public/index.php
作为路由脚本提供服务。此功能需要适当的配置,请参阅上面的 配置
部分。
API
列出所有任务
GET /tasks
profile: string (a profile name that filters tasks)
status: waiting|running|finished|failed (a status that filters tasks)
order: date|profile|status (sort order, default: status)
返回一个任务数组
HTTP/1.1 200 Ok
Content-Type: application/json
[
{
"identifier": "47a5b21d-0a02-4e6e-b8c9-51dc1534cb68",
"status": "waiting",
"profile": "foobar",
"job": "JobQueue\\Tests\\Domain\\Job\\DummyJob",
"date": "Fri, 23 Feb 2018 13:45:22 +0000",
"parameters": {
"name_1": "value_1",
"name_2": "value_2"
}
}
]
错误
400 Bad Request
如果参数之一错误状态 "foo" 不存在
,状态不等于waiting|running|finished|failed
配置文件名只允许小写字母数字、破折号和下划线字符
,配置文件格式错误无法按 "foobar" 排序
,排序不等于date|profile|status
- 出现技术错误时返回
500 内部服务器错误
获取任务信息
GET /task/{identifier}
返回任务定义
HTTP/1.1 200 Ok
Content-Type: application/json
{
"identifier": "47a5b21d-0a02-4e6e-b8c9-51dc1534cb68",
"status": "waiting",
"profile": "foobar",
"job": "JobQueue\\Tests\\Domain\\Job\\DummyJob",
"date": "Fri, 23 Feb 2018 13:45:22 +0000",
"parameters": {
"name_1": "value_1",
"name_2": "value_2"
}
}
错误
- 如果无任务与标识符对应,则返回
404 未找到
- 出现技术错误时返回
500 内部服务器错误
创建新任务
POST /tasks
{
"profile": "foobar",
"job": "JobQueue\\Tests\\Domain\\Job\\DummyJob",
"parameters": {
"name_1": "value_1",
"name_2": "value_2"
}
}
返回任务定义
HTTP/1.1 201 Created
Content-Type: application/json
{
"identifier": "47a5b21d-0a02-4e6e-b8c9-51dc1534cb68",
"status": "waiting",
"profile": "foobar",
"job": "JobQueue\\Tests\\Domain\\Job\\DummyJob",
"date": "Fri, 23 Feb 2018 13:45:22 +0000",
"parameters": {
"name_1": "value_1",
"name_2": "value_2"
}
}
错误
- 如果 JSON 主体格式错误,则返回
400 请求错误
- 如果 JSON 对象缺少
job
值,则返回缺少作业
- 如果 JSON 对象缺少
profile
值,则返回缺少配置文件
配置文件名只允许小写字母数字、破折号和下划线字符
,配置文件格式错误参数格式错误
,如果 JSON 对象中的parameters
值不是键/值数组
- 如果 JSON 对象缺少
- 出现技术错误时返回
500 内部服务器错误
测试
首先,本地必须运行 Redis 服务器(127.0.0.1 的 6379 端口)。
然后,要运行测试,请使用以下命令
$ php vendor/bin/phpunit
MIT 许可证 (MIT)。请参阅 LICENSE 获取更多信息。