chh/kue

多任务队列实现的简单接口

v1.0.0 2014-01-07 15:06 UTC

This package is not auto-updated.

Last update: 2024-09-14 12:44:33 UTC


README

一个简约、通用且与框架无关的任务队列接口

安装

通过 Composer 安装

% wget https://getcomposer.org.cn/composer.phar
% php composer.phar require chh/kue:*@dev

使用

任务

任务是实现 Kue\Job 接口的类。此接口指定了一个方法 —— run()。当工作脚本从队列中拉取任务时,会调用 run 方法。

为什么不直接实现 callable 呢?

因为如果合同规定任务必须是可调用的,那么每个回调(即使是闭包)都可以满足它。这种方法的缺点是,工作通常在将任务放入队列的脚本之外的其他进程中运行。闭包和函数无法序列化,因此无法将它们传输到工作进程。对象可以在一个进程中序列化,在另一个进程中反序列化,并保留所有状态和上下文。

队列

一个队列实现了 Kue\Queue 接口。此接口如下所示

interface Kue\Queue
{
    function push(Kue\Job $job);
    function pop();
    function flush();
    function process(Kue\Worker $worker);
}

队列的职责是,作为将任务推入队列的用户和通过轮询 pop() 从队列中提取任务的工作脚本的传输层。

客户端应该在推送任务后调用 flush() 方法,并可以使用更有效的传输机制(例如批量请求)发送多个任务。

Kue 随附了一个 Kue\LocalQueue 用于开发环境。此队列在 push() 时将任务发送到本地网络套接字,当调用 pop() 时接收。

对于生产,建议在队列接口之后使用 SQS。

工作进程

工作进程接受队列,并开始使用 pop() 方法轮询任务。

interface Kue\Worker extends \Evenement\EventEmitterInterface
{
    function process(Kue\Queue $queue);
}

工作进程应该抽象处理队列中任务的战略。Kue 随附的工作进程实现对于大多数用例应该足够充分。

Kue 随附以下工作进程

  • Kue\SequentialWorker —— 一个简单的工人,它在一个 for 循环中调用队列的 pop() 方法,并严格按顺序处理任务。这很简单,并且可以在任何平台上工作,但应仅用于开发或 Windows。
  • Kue\PreforkingWorker —— 一个高级工人,它启动一个主进程并从父进程中派生一个工作进程池,所有进程都单独调用 pop()。它可以并发运行作业,并且具有更高的容错性,因为当工作进程死亡时,主进程会简单地启动一个新的工作进程。您将希望在生产中使用此工作进程。它仅在 *nix 平台上工作,如 OS X 和 Linux。

所有工作进程都是 Evenement 事件发射器,并发出以下标准事件

  • init: 在作业运行之前发出,带作业作为参数调用。
  • success: 作业成功处理。处理程序接收作业作为参数。
  • exception: 运行作业时发生异常。作业和异常作为参数传递。

Kue\PreforkingWorker 支持以下附加事件

  • beforeFork: 在从父进程派生工作进程之前发出。
  • afterFork: 从主进程派生后由子进程发出。用于在工作进程中对资源进行重新初始化。

这些工作人员通常由Kue附带的Symfony控制台命令来管理。

use Symfony\Component\Console\Application;
use Kue\Command\WorkCommand;
use Kue\LocalQueue;

$worker = new WorkCommand(new LocalQueue);

$app = new Application;
$app->add($worker);

$app->run();

可以使用位于Symfony控制台应用程序中的kue:work命令来运行队列工作人员。可以通过-c标志切换工作人员实现。

  • 1:使用Kue\SequentialWorker
  • >1:使用Kue\PreforkingWorker,拥有c个工作池。

要为自动选择的工作实例附加您自己的事件监听器,可以将事件名称加处理器的数组作为第二个参数传递给命令构造函数。

$webApp = new MyApplication;

$worker = new WorkerCommand(new LocalQueue, array(
    'init' => function($job) use ($webApp) {
        $job->application = $webApp;
    }
));