此软件包最新版本(0.1.0)没有可用的许可信息。

简单的PHP作业队列、执行和管理

0.1.0 2017-09-11 07:38 UTC

This package is auto-updated.

Last update: 2024-09-14 10:07:53 UTC


README

一个简单的PHP作业队列、执行和管理库,由OpenclerkCryptFolio使用。

虽然cron作业是运行定期任务的一种简单方法,但openclerk/jobs允许以可靠的方式定义、执行和管理任务。

安装

在您的项目composer.json中将openclerk/jobs作为依赖项包含,然后运行composer update将其安装到项目中

{
  "require": {
    "openclerk/jobs": "dev-master"
  }
}

功能

  1. 立即排队作业以稍后执行
  2. 作业可以返回成功或失败(通过抛出异常)
  3. 重复失败的作业可以从不执行队列中删除
  4. 定义自己的作业选择算法
  5. 在作业执行过程中抛出的任何异常都将存储在job_exceptions表中

使用

定义作业类

use \Openclerk\Jobs\Job;
use \Db\Connection;
use \Monolog\Logger;

class MyJob implements Job {

  /**
   * @param $job the `job` instance, an array of `job_type`, `arg_id` and optionally `user_id`
   */
  function __construct($job) {
    $this->job = $job;
  }

  function run(Connection $db, Logger $logger) {
    $q = $db->prepare("SELECT * FROM table WHERE id=?");
    $q->execute(array($this->job['arg_id']));
    if (!$q->fetch()) {
      throw new \Exception("Could not find that instance");
    }
  }

  function passed(Connection $db, Logger $logger) {
    // (optional) the job passed
  }

  function failed(\Exception $runtime_exception, Connection $db, Logger $logger) {
    // (optional) the job failed
  }
}

定义作业队列器

use \Openclerk\Jobs\JobQueuer;
use \Openclerk\Jobs\Job;
use \Db\Connection;
use \Monolog\Logger;

class MyJobQueuer extends JobQueuer {

  /**
   * Get a list of all jobs that need to be queued, as an array of associative
   * arrays with (job_type, arg_id, [user_id]).
   */
  function findJobs(Connection $db, Logger $logger) {
    $result = array();

    $q = $db->prepare("SELECT * FROM table WHERE is_queued=0");
    $q->execute();
    while ($r = $q->fetch()) {
      $result[] = array(
        'job_type' => 'table',
        'arg_id' => $r['id'],
        // optional: user_id
      );
    }

    return $result;
  }

  /**
   * The given job has been queued up, so we can mark it as successfully queued.
   */
  function jobQueued(Connection $db, Logger $logger, $job) {
    $q = $db->prepare("UPDATE table SET is_queued=1 WHERE id=?");
    $q->execute(array($job['arg_id']));
  }
}

定义作业运行器

use \Openclerk\Jobs\JobRunner;
use \Openclerk\Jobs\Job;
use \Db\Connection;
use \Monolog\Logger;

class MyJobRunner extends JobRunner {

  /**
   * Get the {@link Job} to run for this job type.
   */
  function createJob($job, Connection $db, Logger $logger) {
    switch ($job['job_type']) {
      case 'table':
        return new MyJob($job);
    }
  }

}

编写批量脚本来执行队列器和运行器

例如,一个批量脚本来排队新作业

$logger = new \Monolog\Logger("batch_queue");
$logger->pushHandler(new \Core\MyLogger());

$runner = new MyJobQueuer();
$runner->queue(db(), $logger);

或者,一个批量脚本来运行单个作业

$logger = new \Monolog\Logger("batch_run");
$logger->pushHandler(new \Core\MyLogger());

$runner = new MyJobRunner();
$runner->runOne(db(), $logger);

然后可以设置cron等来运行这些批量脚本

扩展

  1. 使用require()而不是类来运行作业
  2. 仅运行特定类型的作业
  3. 仅运行具有特定属性的用户的作业
  4. 仅运行具有特定作业ID的作业
  5. 从网络管理员界面运行作业

捐赠

欢迎捐赠.

待办事项

  1. 捕获超时的作业
  2. 通过openclerk/events的job_failed事件
  3. 更多测试