jfloff/djjob-zf2

DJJob for zend 2

1.0.3 2013-02-07 14:07 UTC

This package is not auto-updated.

Last update: 2024-09-28 15:34:04 UTC


README

DJJob 允许 PHP 网络应用程序异步处理长时间运行的任务。它是 delayed_job(Shopify 开发)的 PHP 版本,自 2010 年 4 月以来已在 SeatGeek 的生产环境中使用。

与 delayed_job 类似,DJJob 使用 jobs 表来持久化和跟踪挂起、进行中和失败的任务。

要求

  • PHP5
  • PDO(随 PHP >= 5.1 一起发货)
  • (可选)PCNTL

设置

mysql db < jobs.sql

jobs 表结构如下

CREATE TABLE `jobs` (
`id` INT UNSIGNED NOT NULL AUTO_INCREMENT PRIMARY KEY,
`handler` TEXT NOT NULL,
`queue` VARCHAR(255) NOT NULL DEFAULT 'default',
`attempts` INT UNSIGNED NOT NULL DEFAULT 0,
`run_at` DATETIME NULL,
`locked_at` DATETIME NULL,
`locked_by` VARCHAR(255) NULL,
`failed_at` DATETIME NULL,
`error` TEXT NULL,
`created_at` DATETIME NOT NULL
) ENGINE = INNODB;

告诉 DJJob 如何连接到您的数据库

DJJob::configure("mysql:host=127.0.0.1;dbname=djjob_test;port=3306", array('mysql_user' => "root", 'mysql_pass' => "topsecret"));

如果您使用 mysql,则需要单独传递数据库凭据。否则,您可以在连接字符串中提供这些凭据,有关解释,请参阅 http://stackoverflow.com/questions/237367/why-is-php-pdo-dsn-a-different-format-for-mysql-versus-postgresql

使用方法

任务是响应方法 performPHP 对象。任务是序列化并存储在数据库中。

class HelloWorldJob {
    public function __construct($name) {
        $this->name = $name;
    }
    public function perform() {
        echo "Hello {$this->name}!\n";
    }
}

DJJob::enqueue(new HelloWorldJob("delayed_job"));

与 delayed_job 不同,DJJob 没有任务优先级的概念(至少目前还没有)。相反,它支持多个队列。默认情况下,任务被放置在“默认”队列中。您可以指定替代队列,例如

DJJob::enqueue(new SignupEmailJob("dev@seatgeek.com"), "email");

在 SeatGeek,我们运行一个特定的电子邮件队列。电子邮件有一个 sendLater 方法,它将任务放置在 email 队列中。以下是我们的基本 Email 类的简化版本

class Email {
    public function __construct($recipient) {
        $this->recipient = $recipient;
    }
    public function send() {
        ...do some expensive work to build the email: geolocation, etc..
        ...use mail api to send this email
    }
    public function perform() {
        $this->send();
    }
    public function sendLater() {
        DJJob::enqueue($this, "email");
    }
}

由于 Email 有一个 perform 方法,所以电子邮件类的所有实例也是任务。

运行任务

运行工作进程就像这样简单

$worker = new DJWorker($options);
$worker->start();

初始化您的环境、连接到数据库等,由您自行决定。我们使用 symfony 的任务系统来运行工作进程,以下是我们的工作进程任务的示例

class jobsWorkerTask extends sfPropelBaseTask {
  protected function configure() {
    $this->namespace        = 'jobs';
    $this->name             = 'worker';
    $this->briefDescription = '';
    $this->detailedDescription = <<<EOF
The [jobs:worker|INFO] task runs jobs created by the DJJob system.
Call it with:

  [php symfony jobs:worker|INFO]
EOF;
    $this->addArgument('application', sfCommandArgument::OPTIONAL, 'The application name', 'customer');
    $this->addOption('env', null, sfCommandOption::PARAMETER_REQUIRED, 'The environment', 'dev');
    $this->addOption('connection', null, sfCommandOption::PARAMETER_REQUIRED, 'The connection name', 'propel');
    $this->addOption('queue', null, sfCommandOption::PARAMETER_REQUIRED, 'The queue to pull jobs from', 'default');
    $this->addOption('count', null, sfCommandOption::PARAMETER_REQUIRED, 'The number of jobs to run before exiting (0 for unlimited)', 0);
    $this->addOption('sleep', null, sfCommandOption::PARAMETER_REQUIRED, 'Seconds to sleep after finding no new jobs', 5);
}

  protected function execute($arguments = array(), $options = array()) {
    // Database initialization
    $databaseManager = new sfDatabaseManager($this->configuration);
    $connection = Propel::getConnection($options['connection'] ? $options['connection'] : '');

    $worker = new DJWorker($options);
    $worker->start();
  }
}

如果数据库有任何连接问题,工作进程将退出。我们使用 god 来管理我们的工作进程,包括在它们由于任何原因退出时重启它们。

变更

  • 将 DJJob::configure 修改为接受一个选项数组
  • 通过切换到 PDO 消除了对 Propel 的依赖