openclerk / jobs
此软件包最新版本(0.1.0)没有可用的许可信息。
简单的PHP作业队列、执行和管理
0.1.0
2017-09-11 07:38 UTC
Requires
- monolog/monolog: ~1.11
- openclerk/config: ^0.1
- openclerk/db: ^0.1
- openclerk/events: ^0.2
Requires (Dev)
- soundasleep/component-tests: dev-master
This package is auto-updated.
Last update: 2024-09-14 10:07:53 UTC
README
一个简单的PHP作业队列、执行和管理库,由Openclerk和CryptFolio使用。
虽然cron作业是运行定期任务的一种简单方法,但openclerk/jobs
允许以可靠的方式定义、执行和管理任务。
安装
在您的项目composer.json
中将openclerk/jobs
作为依赖项包含,然后运行composer update
将其安装到项目中
{ "require": { "openclerk/jobs": "dev-master" } }
功能
- 立即排队作业以稍后执行
- 作业可以返回成功或失败(通过抛出异常)
- 重复失败的作业可以从不执行队列中删除
- 定义自己的作业选择算法
- 在作业执行过程中抛出的任何异常都将存储在
job_exceptions
表中
使用
定义作业类
use \Openclerk\Jobs\Job; use \Db\Connection; use \Monolog\Logger; class MyJob implements Job { /** * @param $job the `job` instance, an array of `job_type`, `arg_id` and optionally `user_id` */ function __construct($job) { $this->job = $job; } function run(Connection $db, Logger $logger) { $q = $db->prepare("SELECT * FROM table WHERE id=?"); $q->execute(array($this->job['arg_id'])); if (!$q->fetch()) { throw new \Exception("Could not find that instance"); } } function passed(Connection $db, Logger $logger) { // (optional) the job passed } function failed(\Exception $runtime_exception, Connection $db, Logger $logger) { // (optional) the job failed } }
定义作业队列器
use \Openclerk\Jobs\JobQueuer; use \Openclerk\Jobs\Job; use \Db\Connection; use \Monolog\Logger; class MyJobQueuer extends JobQueuer { /** * Get a list of all jobs that need to be queued, as an array of associative * arrays with (job_type, arg_id, [user_id]). */ function findJobs(Connection $db, Logger $logger) { $result = array(); $q = $db->prepare("SELECT * FROM table WHERE is_queued=0"); $q->execute(); while ($r = $q->fetch()) { $result[] = array( 'job_type' => 'table', 'arg_id' => $r['id'], // optional: user_id ); } return $result; } /** * The given job has been queued up, so we can mark it as successfully queued. */ function jobQueued(Connection $db, Logger $logger, $job) { $q = $db->prepare("UPDATE table SET is_queued=1 WHERE id=?"); $q->execute(array($job['arg_id'])); } }
定义作业运行器
use \Openclerk\Jobs\JobRunner; use \Openclerk\Jobs\Job; use \Db\Connection; use \Monolog\Logger; class MyJobRunner extends JobRunner { /** * Get the {@link Job} to run for this job type. */ function createJob($job, Connection $db, Logger $logger) { switch ($job['job_type']) { case 'table': return new MyJob($job); } } }
编写批量脚本来执行队列器和运行器
例如,一个批量脚本来排队新作业
$logger = new \Monolog\Logger("batch_queue"); $logger->pushHandler(new \Core\MyLogger()); $runner = new MyJobQueuer(); $runner->queue(db(), $logger);
或者,一个批量脚本来运行单个作业
$logger = new \Monolog\Logger("batch_run"); $logger->pushHandler(new \Core\MyLogger()); $runner = new MyJobRunner(); $runner->runOne(db(), $logger);
然后可以设置cron等来运行这些批量脚本
扩展
捐赠
欢迎捐赠.
待办事项
- 捕获超时的作业
- 通过openclerk/events的job_failed事件
- 更多测试