seatgeek / djjob
允许PHP网络应用异步处理长时间运行的任务
Requires
- php: >=5.1.0
This package is not auto-updated.
Last update: 2020-09-28 09:33:34 UTC
README
DJJob允许PHP网络应用异步处理长时间运行的任务。它是Shopify开发的delayed_job(自2010年4月起在SeatGeek的生产环境中使用)的PHP端口。
与delayed_job类似,DJJob使用一个jobs
表来持久化和跟踪挂起、进行中和失败的任务。
需求
- PHP5
- PDO(PHP >= 5.1自带)
- (可选)PCNTL库
设置
导入SQL数据库表。
mysql db < jobs.sql
jobs
表结构如下
CREATE TABLE `jobs` ( `id` INT UNSIGNED NOT NULL AUTO_INCREMENT PRIMARY KEY, `handler` TEXT NOT NULL, `queue` VARCHAR(255) NOT NULL DEFAULT 'default', `attempts` INT UNSIGNED NOT NULL DEFAULT 0, `run_at` DATETIME NULL, `locked_at` DATETIME NULL, `locked_by` VARCHAR(255) NULL, `failed_at` DATETIME NULL, `error` TEXT NULL, `created_at` DATETIME NOT NULL ) ENGINE = INNODB;
如果你传递的是数据记录ID而不是序列化BLOB数据,则需要将
handler
列的类型设置为BLOB。更多信息请见这里。这种情况可能适用于以下错误:unserialize(): Error at offset 2010 of 2425 bytes
告诉DJJob如何连接到数据库
DJJob::configure([ 'driver' => 'mysql', 'host' => '127.0.0.1', 'dbname' => 'djjob', 'user' => 'root', 'password' => 'topsecret', ]);
使用方法
任务是PHP对象,它响应一个perform
方法。任务被序列化并存储在数据库中。
<?php // Job class class HelloWorldJob { public function __construct($name) { $this->name = $name; } public function perform() { echo "Hello {$this->name}!\n"; } } // enqueue a new job DJJob::enqueue(new HelloWorldJob("delayed_job"));
与delayed_job不同,DJJob没有任务优先级的概念(至少目前没有)。相反,它支持多个队列。默认情况下,任务被放在“默认”队列中。您可以指定一个替代队列,如下所示
DJJob::enqueue(new SignupEmailJob("dev@seatgeek.com"), "email");
在SeatGeek,我们运行一个特定于电子邮件的队列。电子邮件有一个sendLater
方法,它将任务放置在email
队列中。以下是我们的基本Email
类的简化版本
class Email { public function __construct($recipient) { $this->recipient = $recipient; } public function send() { // do some expensive work to build the email: geolocation, etc.. // use mail api to send this email } public function perform() { $this->send(); } public function sendLater() { DJJob::enqueue($this, "email"); } }
因为Email
有一个perform
方法,所以电子邮件类的所有实例也是任务。
运行任务
运行一个工作器就像这样简单
$worker = new DJWorker($options); $worker->start();
初始化环境、连接到数据库等,由您自己决定。我们使用symfony的任务系统来运行工作器,以下是我们的工作器任务的示例
<?php class jobsWorkerTask extends sfPropelBaseTask { protected function configure() { $this->namespace = 'jobs'; $this->name = 'worker'; $this->briefDescription = ''; $this->detailedDescription = <<<EOF The [jobs:worker|INFO] task runs jobs created by the DJJob system. Call it with: [php symfony jobs:worker|INFO] EOF; $this->addArgument('application', sfCommandArgument::OPTIONAL, 'The application name', 'customer'); $this->addOption('env', null, sfCommandOption::PARAMETER_REQUIRED, 'The environment', 'dev'); $this->addOption('connection', null, sfCommandOption::PARAMETER_REQUIRED, 'The connection name', 'propel'); $this->addOption('queue', null, sfCommandOption::PARAMETER_REQUIRED, 'The queue to pull jobs from', 'default'); $this->addOption('count', null, sfCommandOption::PARAMETER_REQUIRED, 'The number of jobs to run before exiting (0 for unlimited)', 0); $this->addOption('sleep', null, sfCommandOption::PARAMETER_REQUIRED, 'Seconds to sleep after finding no new jobs', 5); } protected function execute($arguments = array(), $options = array()) { // Database initialization $databaseManager = new sfDatabaseManager($this->configuration); $connection = Propel::getConnection($options['connection'] ? $options['connection'] : ''); $worker = new DJWorker($options); $worker->start(); } }
如果数据库有任何连接问题,工作器将退出。我们使用god来管理我们的工作器,包括在它们由于任何原因退出时重新启动它们。
更改
- 通过切换到PDO消除了Propel依赖