seatgeek/djjob

此包已被弃用且不再维护。未建议替代包。
关于此包最新版本(1.0.0)的许可证信息不可用。

允许PHP网络应用异步处理长时间运行的任务

1.0.0 2017-09-12 18:01 UTC

This package is not auto-updated.

Last update: 2020-09-28 09:33:34 UTC


README

DJJob允许PHP网络应用异步处理长时间运行的任务。它是Shopify开发的delayed_job(自2010年4月起在SeatGeek的生产环境中使用)的PHP端口。

与delayed_job类似,DJJob使用一个jobs表来持久化和跟踪挂起、进行中和失败的任务。

需求

  • PHP5
  • PDO(PHP >= 5.1自带)
  • (可选)PCNTL库

设置

导入SQL数据库表。

mysql db < jobs.sql

jobs表结构如下

CREATE TABLE `jobs` (
`id` INT UNSIGNED NOT NULL AUTO_INCREMENT PRIMARY KEY,
`handler` TEXT NOT NULL,
`queue` VARCHAR(255) NOT NULL DEFAULT 'default',
`attempts` INT UNSIGNED NOT NULL DEFAULT 0,
`run_at` DATETIME NULL,
`locked_at` DATETIME NULL,
`locked_by` VARCHAR(255) NULL,
`failed_at` DATETIME NULL,
`error` TEXT NULL,
`created_at` DATETIME NOT NULL
) ENGINE = INNODB;

如果你传递的是数据记录ID而不是序列化BLOB数据,则需要将handler列的类型设置为BLOB。更多信息请见这里。这种情况可能适用于以下错误:unserialize(): Error at offset 2010 of 2425 bytes

告诉DJJob如何连接到数据库

DJJob::configure([
    'driver' => 'mysql',
    'host' => '127.0.0.1',
    'dbname' => 'djjob',
    'user' => 'root',
    'password' => 'topsecret',
]);

使用方法

任务是PHP对象,它响应一个perform方法。任务被序列化并存储在数据库中。

<?php
// Job class
class HelloWorldJob {
    public function __construct($name) {
        $this->name = $name;
    }
    public function perform() {
        echo "Hello {$this->name}!\n";
    }
}

// enqueue a new job
DJJob::enqueue(new HelloWorldJob("delayed_job"));

与delayed_job不同,DJJob没有任务优先级的概念(至少目前没有)。相反,它支持多个队列。默认情况下,任务被放在“默认”队列中。您可以指定一个替代队列,如下所示

DJJob::enqueue(new SignupEmailJob("dev@seatgeek.com"), "email");

在SeatGeek,我们运行一个特定于电子邮件的队列。电子邮件有一个sendLater方法,它将任务放置在email队列中。以下是我们的基本Email类的简化版本

class Email {
    public function __construct($recipient) {
        $this->recipient = $recipient;
    }
    public function send() {
        // do some expensive work to build the email: geolocation, etc..
        // use mail api to send this email
    }
    public function perform() {
        $this->send();
    }
    public function sendLater() {
        DJJob::enqueue($this, "email");
    }
}

因为Email有一个perform方法,所以电子邮件类的所有实例也是任务。

运行任务

运行一个工作器就像这样简单

$worker = new DJWorker($options);
$worker->start();

初始化环境、连接到数据库等,由您自己决定。我们使用symfony的任务系统来运行工作器,以下是我们的工作器任务的示例

<?php
class jobsWorkerTask extends sfPropelBaseTask {
  protected function configure() {
    $this->namespace        = 'jobs';
    $this->name             = 'worker';
    $this->briefDescription = '';
    $this->detailedDescription = <<<EOF
The [jobs:worker|INFO] task runs jobs created by the DJJob system.
Call it with:

  [php symfony jobs:worker|INFO]
EOF;
    $this->addArgument('application', sfCommandArgument::OPTIONAL, 'The application name', 'customer');
    $this->addOption('env', null, sfCommandOption::PARAMETER_REQUIRED, 'The environment', 'dev');
    $this->addOption('connection', null, sfCommandOption::PARAMETER_REQUIRED, 'The connection name', 'propel');
    $this->addOption('queue', null, sfCommandOption::PARAMETER_REQUIRED, 'The queue to pull jobs from', 'default');
    $this->addOption('count', null, sfCommandOption::PARAMETER_REQUIRED, 'The number of jobs to run before exiting (0 for unlimited)', 0);
    $this->addOption('sleep', null, sfCommandOption::PARAMETER_REQUIRED, 'Seconds to sleep after finding no new jobs', 5);
}

  protected function execute($arguments = array(), $options = array()) {
    // Database initialization
    $databaseManager = new sfDatabaseManager($this->configuration);
    $connection = Propel::getConnection($options['connection'] ? $options['connection'] : '');

    $worker = new DJWorker($options);
    $worker->start();
  }
}

如果数据库有任何连接问题,工作器将退出。我们使用god来管理我们的工作器,包括在它们由于任何原因退出时重新启动它们。

更改

  • 通过切换到PDO消除了Propel依赖