libcast/jobqueue

此包已被废弃且不再维护。作者建议使用bricev/jobqueue包。

JobQueue 是一个 PHP 组件,用于队列定义 `jobs` 的 `tasks`,这些任务可以由异步和/或分布式 `workers` 执行。

v2.1.8 2018-05-30 14:55 UTC

README

Scrutinizer Code Quality Code Coverage Build Status

简单的 Job/Queue PHP 组件,帮助应用通过多个 worker 分布式任务。

安装

此包可通过 Composer 安装和自动加载。

$ composer require bricev/jobqueue

应用程序提供了一个 webservice 和两个 CLIs。请参阅以下文档以获取更多信息。

配置

  • 必须安装 PHP 7.1
  • 必须安装 Redis(用于队列数据持久化)
  • JOBQUEUE_ENV 环境变量可以设置为 devprod(或任何字符串,未设置时默认为 dev
  • JOBQUEUE_REDIS_DSN 环境变量必须定义 Redis DSN(例如:'tcp://127.0.0.1:6379')

使用

定义一个 job

job 包含了由 worker 执行任务的代码。

每个 job 必须在实现 ExecutableJob 接口的 PHP 类中定义

<?php

use JobQueue\Domain\Job\ExecutableJob;
use JobQueue\Domain\Task\Task;
use Psr\Log\LoggerAwareTrait;

final class DummyJob implements ExecutableJob
{
    use LoggerAwareTrait;

    /**
     *
     * @param Task $task
     */
    function setUp(Task $task)
    {
        // This is called before the `perform()` method
        /** @todo prepare the job execution */
    }

    /**
     *
     * @param Task $task
     */
    function perform(Task $task)
    {
        /** @todo do the job! */
    }

    /**
     *
     * @param Task $task
     */
    function tearDown(Task $task)
    {
        // This is called after the `perform()` method
        /** @todo clean up after the job execution */
    }
}

注意:ExecutableJob 接口扩展了 LoggerAwareInterface,可用于设置日志记录器。此文档提供了有关 PSR-3 日志接口的更多信息。

提示:可以通过运行以下命令将包 psr/log 添加到 composer 项目中(仓库

composer require psr/log

\Psr\Log\LoggerAwareTrait 可以用来轻松添加日志设置器并符合 LoggerAwareInterface

定义一个 task

创建一个 task 需要以下元素

  • 一个 profile - 它可以是任何东西,它将任务分组到队列分区中,以便 JobQueue worker 应用程序可以从一个(仅一个)配置文件中消费任务
  • 一个 job - 包含执行任务的代码
  • 可选的 parameters,可以在 job 执行期间使用

在 PHP 中定义任务

<?php

use JobQueue\Domain\Task\Profile;
use JobQueue\Domain\Task\Task;
use JobQueue\Tests\Domain\Job\DummyJob;

$task = new Task(
    new Profile('foobar'),
    new DummyJob,
    [
        'foo' => 'bar',
        // [...]
    ]
);

将任务添加到队列

首先,需要实例化队列。

这可以手动完成

<?php

use JobQueue\Infrastructure\RedisQueue;
use Predis\Client;

$predis = new Client('tcp://:6379');
$queue = new RedisQueue($predis);

或者通过使用 ServiceContainer(这需要适当的配置,请参阅上面的 Configuration 部分)

<?php

use JobQueue\Infrastructure\ServiceContainer;

$queue = ServiceContainer::getInstance()->queue;

然后,可以轻松地将任务入队

<?php

/** @var \JobQueue\Domain\Task\Queue $queue */
/** @var \JobQueue\Domain\Task\Task $task */

$queue->add($task);

任务的 job 将在 worker 开始消费任务配置文件时执行。此组件包含一个 PHP 可执行 worker。请参阅 CLI 部分以获取更多详细信息。

Worker 事件

worker 会发出一些可以监听的事件

事件名称 描述 事件属性
worker.start 在 worker 启动时触发 $event->getWorker()
worker.finished worker 完成运行后触发 $event->getWorker()
task.fetched 每次 worker 从队列中提取任务时触发 获取任务:$event->getTask()
task.executed 当任务作业成功执行时触发 获取任务:$event->getTask()
task.failed 当任务作业执行失败时触发 获取任务:$event->getTask()

要拦截事件,可以使用服务容器中的EventDispatcher

<?php

use JobQueue\Infrastructure\ServiceContainer;

$dispatcher = ServiceContainer::getInstance()->dispatcher;
$dispatcher->addListener('task.failed', function ($event) {
    /** @var \JobQueue\Domain\Task\TaskHasFailed $event */
    $task = $event->getTask();
    // Do something...
});

命令行界面

这些功能需要适当的配置,请参阅上文的配置部分。

manager应用程序可用于对任务执行CRUD操作。

用法

$ bin/manager list               # lists all commands
$ bin/manager {command} --help   # display the command help

worker应用程序可用于消费队列中的任务。

用法

$ bin/worker --help

worker应用程序可作为操作系统服务(例如Unix上的upstart、systemd等)在服务器上运行。

网络服务

配置

应该配置一个网络服务器,以将public/index.php作为路由脚本提供服务。此功能需要适当的配置,请参阅上文的配置部分。

API

列出所有任务

GET /tasks
profile: string (a profile name that filters tasks)
status: waiting|running|finished|failed (a status that filters tasks)
order: date|profile|status (sort order, default: status)

返回任务数组

HTTP/1.1 200 Ok
Content-Type: application/json

[
    {
        "identifier": "47a5b21d-0a02-4e6e-b8c9-51dc1534cb68",
        "status": "waiting",
        "profile": "foobar",
        "job": "JobQueue\\Tests\\Domain\\Job\\DummyJob",
        "date": "Fri, 23 Feb 2018 13:45:22 +0000",
        "parameters": {
            "name_1": "value_1",
            "name_2": "value_2"
        }
    }
]

错误

  • 如果其中一个参数错误,将返回400 Bad Request
    • 如果状态不等于waiting|running|finished|failed,则返回“Status "foo" does not exists
    • 如果配置文件格式不正确,则返回“Profile name only allows lowercase alphanumerical, dash and underscore characters
    • 如果排序不等于date|profile|status,则返回“Impossible to order by "foobar"
  • 如果出现技术错误,则返回500 Internal Server Error

获取任务信息

GET /task/{identifier}

返回任务定义

HTTP/1.1 200 Ok
Content-Type: application/json

{
    "identifier": "47a5b21d-0a02-4e6e-b8c9-51dc1534cb68",
    "status": "waiting",
    "profile": "foobar",
    "job": "JobQueue\\Tests\\Domain\\Job\\DummyJob",
    "date": "Fri, 23 Feb 2018 13:45:22 +0000",
    "parameters": {
        "name_1": "value_1",
        "name_2": "value_2"
    }
}

错误

  • 如果没有任务与标识符相对应,则返回404 Not Found
  • 如果出现技术错误,则返回500 Internal Server Error

创建新的任务

POST /tasks

{
    "profile": "foobar",
    "job": "JobQueue\\Tests\\Domain\\Job\\DummyJob",
    "parameters": {
        "name_1": "value_1",
        "name_2": "value_2"
    }
}

返回任务定义

HTTP/1.1 201 Created
Content-Type: application/json

{
    "identifier": "47a5b21d-0a02-4e6e-b8c9-51dc1534cb68",
    "status": "waiting",
    "profile": "foobar",
    "job": "JobQueue\\Tests\\Domain\\Job\\DummyJob",
    "date": "Fri, 23 Feb 2018 13:45:22 +0000",
    "parameters": {
        "name_1": "value_1",
        "name_2": "value_2"
    }
}

错误

  • 如果JSON主体格式不正确,则返回400 Bad Request
    • 如果JSON对象缺少job值,则返回“Missing job
    • 如果JSON对象缺少profile值,则返回“Missing profile
    • 如果配置文件格式不正确,则返回“Profile name only allows lowercase alphanumerical, dash and underscore characters
    • 如果JSON对象中的parameters值不是键值数组,则返回“Malformed parameters
  • 如果出现技术错误,则返回500 Internal Server Error

测试

首先,本地必须运行一个Redis服务器(127.0.0.1上的6379端口)。

然后,要运行测试,请使用以下命令

$ php vendor/bin/phpunit

MIT许可证(MIT)。有关更多信息,请参阅LICENSE