2tvenom/php-queue

PHP任务执行队列

dev-master 2015-09-09 10:48 UTC

This package is auto-updated.

Last update: 2024-09-08 22:04:21 UTC


README

The PhpQueue库提供执行PHP脚本的队列。该代码由Ven于2013年8月至2013年10月开发并维护。

您可以在github上在此处发送评论、补丁、问题,或发送到2tvenom@gmail.com

  1. 队列
  2. 任务
  3. 任务和子任务
  4. 任务和子任务中的错误
  5. 回调和错误回调
  6. 队列驱动程序
  7. Autoloader
  8. Web界面

##队列 ###简单队列

###向队列中添加任务

PhpQueue有组件:队列、队列驱动程序、任务执行者和任务。

#####示例

<?
use PhpQueue\Queue;
use PhpQueue\Drivers\SqlPdoDriver;
use PhpQueue\TaskPerformer;
use PhpQueue\Task;

$pdo = new \PDO("mysql:host=localhost;dbname=queue", "root", "");
$driver = new SqlPdoDriver($pdo);
$queue = new Queue($driver);

$task = new Task("Job");
$queue->add_task($task);
?>

###从队列中获取任务并执行 获取队列中任务并执行的步骤。如果队列中没有任务,则队列返回false

  1. 连接到队列
  2. 从队列中获取任务
  3. 队列设置任务状态为“处理中”
  4. 执行任务
  5. 在队列中修改任务状态(警告! 此步骤是必需的)

#####示例

<?
$pdo = new \PDO("mysql:host=localhost;dbname=queue", "root", "");

$driver = new SqlPdoDriver($pdo);
$queue = new Queue($driver);
$task_performer = new TaskPerformer();

$task = $queue->get_task();
//in queue this task have status "In process"
$task = $task_performer->execute_task($task);
$queue->modify_task($task);
?>

任务

创建任务

任务名称这是在PhpQueue\Interfaces\IJob中实现的类的名称

#####示例

<?
class Job implements PhpQueue\Interfaces\IJob
{
    public static function run(Task $task)
    {
        return 2*2;
    }
}

$task = new PhpQueue\Task("Job");
?>

请求数据

任务可以接收输入数据。这是一个对象、数组或标量值。

#####示例

<?
class Job implements PhpQueue\Interfaces\IJob
{
    public static function run(Task $task)
    {
        return $task->get_request_data() * 2;
    }
}
?>

您可以在构造函数中设置数据$task = new PhpQueue\Task("Job", 100);或通过方法$task->set_request_data(100);

###从队列中选择任务

###优先级 队列中的任务按优先级排序。默认情况下,所有任务都具有零优先级。如果任务具有相同的优先级,则按ID降序排序。

设置优先级:set_priority($priority)。优先级必须是整数。

获取优先级:get_priority()

####示例

<?
$task = new PhpQueue\Task("Job");
$queue->add_task($task);

$super_task = new PhpQueue\Task("SuperJob");
$super_task->set_priority(10);
$queue->add_task($super_task);

####结果 首先执行的任务将是SuperJob,其次是Job

###任务组ID

可以通过task_group_id将队列划分为几个队列。默认情况下,所有任务的组ID为0

设置任务组ID:set_task_group_id($group_id)。组ID必须是整数。

获取优先级:get_task_group_id()

####示例 添加任务

<?
$task1 = new PhpQueue\Task("Job1");
$task1->set_task_group_id(1);
$queue->add_task($task1);

$task2 = new PhpQueue\Task("Job2");
$task2->set_task_group_id(2);
$queue->add_task($task2);

获取任务

<?
$task1 = $queue->get_task(array(
    TaskConst::TASK_GROUP_ID => 1
));

$task2 = $queue->get_task(array(
    TaskConst::TASK_GROUP_ID => 2
));

$task3 = $queue->get_task();

####结果 task1是Job1

task2是Job2

task3是false

执行日期

任务将在此日期后执行。

设置执行日期:set_execution_date($date)。日期格式为Y-m-d H:i:s

获取日期:get_execution_date()

####示例

<?
$task = new Task("JobByDate");
$task->set_execution_date(date('Y-m-d H:i:s', strtotime('now') + 10));
$queue->add_task($task);

$task1 = $queue->get_task();
sleep(15);
$task2 = $queue->get_task();

####结果 task1是false

task2是JobByDate

唯一ID

所有任务都有唯一的ID。在创建对象时生成。唯一ID是32个字符的字符串(md5)。

根据唯一ID从队列中选择未设置任务状态为 进行中

获取任务的唯一ID: get_uniqid()

####示例 添加到队列

<?
$task = new Task("Job");
$queue->add_task($task);
$uniqid = $task->get_uniqid();

####结果 字符串: a8a042ffabf5230dfdfa0a2cf9d47110

根据唯一ID选择

<?
$task = $queue->get_task(array(
    TaskConst::UNIQID => "a8a042ffabf5230dfdfa0a2cf9d47110"
));

####结果 $task 是唯一ID为 a8a042ffabf5230dfdfa0a2cf9d47110Job 任务。任务在队列中仍处于 状态

##任务和子任务

###子任务

PhpQueue有两种任务类型

  • 简单任务
  • 带有子任务的任务

任务可以有一个嵌套层级的子任务。

####示例 添加子任务

<?
$task = new Task("JobWithSubTasks");
$task
    ->sub_tasks()
    ->add(new \PhpQueue\Task("Job1", 5))
    ->add(new \PhpQueue\Task("Job2", 10))
    ->add(new \PhpQueue\Task("Job3", 15));
    
$queue->add_task($task);

执行者只会执行子任务。

####示例 执行子任务

for($i=0; $i<3; $i++){
    $task = $queue->get_task();
    $task = $task_performer->execute_task($task);
    $queue->modify_task($task);
}

####结果 执行步骤

  1. 执行 Job1。状态完成
  2. 执行 Job2。状态完成
  3. 执行 Job3。状态完成
  4. JobWithSubTasks 状态完成。 父任务不执行。

独占任务

默认情况下,任务具有非独占状态 false

非独占

任务的子任务可以执行所有任务执行者。

#####示例 添加任务

<?
$task = new Task("NonExclusiveTask");
$task
    ->sub_tasks()
    ->add(new \PhpQueue\Task("Job1", 5))
    ->add(new \PhpQueue\Task("Job2", 10))
    ->add(new \PhpQueue\Task("Job3", 15));
    
$queue->add_task($task);

在三个服务器 (server1, server2, server3) 上启动了 TaskPerformers

<?
$performer_name = php_uname('n');
$task_performer = new TaskPerformer($performer_name);
$task = $queue->get_task();
$task = $task_performer->execute_task($task);
$queue->modify_task($task);

####结果 NonExclusiveTask 任务状态是 完成

  • Job1server1 执行。状态 完成
  • Job2server2 执行。状态 完成
  • Job3server3 执行。状态 完成

获取执行者名称: get_performer()。默认执行者名称是 Default_Performer

独占

任务的子任务只能执行一个任务执行者。

设置独占: set_exclusive(true)

#####示例 添加任务

<?
$task = new Task("ExclusiveTask");
$task
    ->set_exclusive(true)
    ->sub_tasks()
    ->add(new \PhpQueue\Task("Job1", 5))
    ->add(new \PhpQueue\Task("Job2", 10))
    ->add(new \PhpQueue\Task("Job3", 15));
    
$queue->add_task($task);

在三个服务器 (server1, server2, server3) 上启动了 TaskPerformers

<?
$performer_name = php_uname('n');
$task_performer = new TaskPerformer($performer_name);
$task = $queue->get_task(
    array(TaskConst::PERFORMER => $performer_name)
);
$task = $task_performer->execute_task($task);
$queue->modify_task($task);

####结果 ExclusiveTask 任务状态是 完成

  • Job1server1 执行。状态 完成
  • Job2server1 执行。状态 完成
  • Job3server1 执行。状态 完成
  • server2 从队列接收到 false
  • server3 从队列接收到 false

####警告 ####在使用独占任务时,所有 TaskPerformers 必须有唯一名称。如果没有这样做,可能会破坏队列的逻辑。如果不在队列请求 get_task() 中添加 TaskConst::PERFORMER 唯一名称,队列和执行者不会接收到第一个执行后的第二个子任务。

任务响应,日志,父任务

响应

任务响应返回 run 方法的数据

####示例

<?
$task = new Task("ParentJob");
$task
    ->set_exclusive(true)
    ->sub_tasks()
    ->add(new \PhpQueue\Task("Job1", 5))
    ->add(new \PhpQueue\Task("Job2", 10))
    ->add(new \PhpQueue\Task("Job3", 15));
    
class Job1 implements PhpQueue\Interfaces\IJob
{
    public static function run(Task $task)
    {
        $sub_task_request_data = $task->get_request_data();
        
        return $sub_task_request_data + 5;
    }
}

####结果 获取响应: $task->response()->get_response()

  • Job1 响应: 10
  • Job2 响应: 15
  • Job3 响应: 20

日志

任务中的日志记录 ####示例

<?
class Job1 implements PhpQueue\Interfaces\IJob
{
    public static function run(Task $task)
    {
        $arg1 = 1;
        $task->response()->set_log('Step 1');
        
        $arg2 = 2;
        $task->response()->set_log('Step 2');
        
        return $arg1+$arg2;
    }
}

####结果

<?
$logs = $task->response()->get_log();

返回:数组 array('Step 1', 'Step2');

访问父任务

方法 $task->parent_task()

####示例

<?
$task = new Task("ParentTask", 5);
$task
    ->set_exclusive(true)
    ->sub_tasks()
    ->add(new \PhpQueue\Task("Job1", 5))
    ->add(new \PhpQueue\Task("Job2", 10))
    ->add(new \PhpQueue\Task("Job3", 15));
    
class Job1 implements PhpQueue\Interfaces\IJob
{
    public static function run(Task $task)
    {
        $sub_task_request_data = $task->get_request_data();
        $parent_request_data = $task->parent_task()->get_request_data();
        
        return $sub_task_request_data * $parent_request_data;
    }
}
class Job2 extends Job1 {}
class Job3 extends Job1 {}

####结果 响应

  • Job1 响应: 25
  • Job2 响应: 50
  • Job3 响应: 75

任务和子任务中的错误

简单任务中的错误

在第一次异常后,任务得到状态: 错误。您可以设置任务的最大错误尝试次数。

设置最大错误尝试次数 $task->settings()->set_error_max_trial($trials)。其中 $trials 是最大尝试次数(int)。

获取当前尝试次数 $task->settings()->get_trial()。返回:int,当前尝试次数。

####示例

<?
$task = new Task("ExceptionTask");
$task->settings()->set_error_max_trial(5);

class ExceptionTask implements PhpQueue\Interfaces\IJob
{
    public static function run(Task $task)
    {
        if($task->settings()->get_trial() < 3)
        {
            throw new Exception("Test exception");
        }
        return 100;
    }
}

####结果 响应

  1. 错误。状态 "新"
  2. 错误。状态 "新"
  3. 完成。

带子任务的任务中的错误

默认情况下,在最后一次错误尝试后,父任务得到状态 错误

如果需要继续执行子任务,则需要将 error break 设置为 false

设置错误中断标志 $task->settings()->set_error_break($flag)。其中 $flag 是 bool。

####示例

<?
$task = new Task("TaskWithOneExceptionSubTask");
$task->
    subtasks()
    ->add(new \PhpQueue\Task("Job1"))
    ->add(new \PhpQueue\Task("ExceptionTask"))
    ->add(new \PhpQueue\Task("Job2"));

class ExceptionTask implements PhpQueue\Interfaces\IJob
{
    public static function run(Task $task)
    {
        throw new Exception("Test exception");
    }
}

class Job1 implements PhpQueue\Interfaces\IJob
{
    public static function run(Task $task)
    {
        return true;
    }
}

class Job2 extends Job1 {}

####错误中断标志 = false 的结果

  • Job1 状态 完成
  • ExceptionTask 状态 错误
  • TaskWithOneExceptionSubTask 状态 错误

####错误中断标志 = true 的结果

  • Job1 状态 完成
  • ExceptionTask 状态 错误
  • Job2 状态 完成
  • TaskWithOneExceptionSubTask 状态 完成但有错误

嵌套设置

在父任务中设置,覆盖子任务的设置。子任务的设置会覆盖父任务的设置。

####示例

<?
$task = new Task("ParentJob");
$task->settings()->set_error_max_trial(5);

$job1 = new \PhpQueue\Task("Job1");
$job2 = new \PhpQueue\Task("Job2");
$job2->settings()->set_error_max_trial(2);
$job3 = new \PhpQueue\Task("Job3");

$task->subtasks()->add(array($job1, $job2, $job3,));

####结果

  • Job1 最大尝试次数 5
  • Job1 最大尝试次数 2
  • Job1 最大尝试次数 5

错误日志

从任务获取异常日志:$task->response()->get_error() 返回:数组

回调和错误回调

回调

任务后执行回调。

PhpQueue 有两种类型的回调

  • 回调 PhpQueue\Interfaces\ICallback
  • 错误回调 PhpQueue\Interfaces\IErrorCallback

示例

<?
$task = new Task("TaskWithCallback");
$task->set_callback("SimpleCallback");

class SimpleCallback implements PhpQueue\Interfaces\ICallback
{
    public static function callback_run(Task $task)
    {
        echo "Callback";
    }
}

结果

执行了 TaskWithCallback 并在屏幕上显示 "Callback"。

错误回调

任务错误后执行错误回调

<?
$task = new Task("TaskWithError");
$task
    ->set_callback("SimpleCallback")
    ->set_error_callback("ErrorCallback");

class TaskWithError implements PhpQueue\Interfaces\IJob
{
    public static function run(Task $task)
    {
        throw new Exception("Test exception");
    }
}

class ErrorCallback implements PhpQueue\Interfaces\IErrorCallback
{
    public static function callback_error_run(Task $task)
    {
        echo "ERROR!";
    }
}

结果

执行了 TaskWithCallback 并在屏幕上显示 "ERROR!"。

父任务的回调

子任务和父任务都可以有回调。首先执行子任务回调,然后执行父任务回调。

示例

<?
$task = new Task("TaskWithSubTasks");
$task->set_callback("ParentCallback")

$job1 =new \PhpQueue\Task("Job");
$job1->set_callback("Callback");
$job2 =new \PhpQueue\Task("Job");
$job3 =new \PhpQueue\Task("Job");
$job4 =new \PhpQueue\Task("Job");
$job4->set_callback("Callback")

$task->subtasks()->add(array($job1, $job2, $job3, $job4));

class Job implements PhpQueue\Interfaces\IJob
{
    public static function run(Task $task)
    {
        echo "Job";
    }
}

class Callback implements PhpQueue\Interfaces\ICallback
{
    public static function callback_run(Task $task)
    {
        echo "Callback";
    }
}

结果

显示在屏幕上

Job
Callback
ParentCallback
Job
ParentCallback
Job
ParentCallback
Job
Callback
ParentCallback

全局回调

任务执行者可以调用 Callback 和 Error 回调

示例

<?
$task = new Task("TaskWithException");

class TaskWithException implements PhpQueue\Interfaces\IJob
{
    public static function run(Task $task)
    {
        throw new Exception("Test exception");
    }
}

class ErrorCallback implements PhpQueue\Interfaces\IErrorCallback
{
    public static function callback_error_run(Task $task)
    {
        echo "GLOBAL ERROR!";
    }
}

$task_performer = new TaskPerformer();
$task_performer->set_global_error_callback("ErrorCallback");
$task_performer->execute_task($task);

结果

显示在屏幕上 "GLOBAL ERROR!"

回调中的错误

回调任务中的错误后,任务获得状态:CALLBACK ERROR 而不执行其他回调。

回调执行优先级

回调

  1. 子任务回调/错误回调
  2. 父任务回调/错误回调
  3. 全局回调/错误回调

队列驱动程序

PhpQueue 支持

  • MySql
  • PostgreSQL
  • SQLite
  • 文件

未来:Mongo, Redis

SqlPdoDriver

要求: PDO

示例

创建基于 MySQL 的队列

<?
$pdo = new \PDO("mysql:host=localhost;dbname=queue", "root", "");
$driver = new SqlPdoDriver($pdo);
$queue = new Queue($driver);

FileDriver

要求: SimpleXMLElement

示例

创建基于文件的队列

<?
$driver = new SqlPdoDriver('queue_folder');
$queue = new Queue($driver);

Autoloader

示例

<?
require_once __DIR__ . "/../src/PhpQueue/AutoLoader.php";

use PhpQueue\AutoLoader;
AutoLoader::RegisterDirectory(array('Callbacks', 'Tasks/Example'));
AutoLoader::RegisterNamespaces(array('PhpQueue' => '../src/PhpQueue'));
AutoLoader::RegisterAutoLoader();

Web界面

Alt text

安装

只需将队列文件从 web 文件夹复制到 web 服务器文件夹。

驱动连接

在文件 web/index.php 中找到此行

<?
$web_driver = include(DRIVERS_PATH . "SqlWebDriver.php");

并更改路径到具有驱动程序的文件(如果需要的话)。

###驱动连接属性

drivers 文件夹中的驱动文件中的连接属性。

####示例 SqlWebDriver 的连接属性

<?
define("QUEUE_HOST", "localhost");
define("QUEUE_DATABASE", "queue");
define("QUEUE_USER", "root");
define("QUEUE_PASSWORD", "");
define("QUEUE_TABLE", "queue_tasks");

自定义

Alt text

您可以自定义网页界面

PhpQueue 有 4 种类型的网页界面

  • 列表
  • 详细信息
  • 日志
  • 错误日志

需要自定义任务时,需要创建一个与任务名称相同的文件夹。

####示例

<?
//Task class
class JobLog implements Interfaces\IJob
{
    public static function run(Task $task)
    {
        for($i=0; $i<6; $i++)
        {
            $task->response()->set_log(ceil(rand(0, 100) / 10));
        }
        return $task->get_request_data()*2;
    }
}

// /web/templates/task_render/JobLog/log.php

foreach($list as $_id => $log_string){
    $performed = (int)$log_string;
    $not_performed = 10 - $performed;
    echo "Fork {$_id} <span style='color:#009944'>" . str_repeat("#", $performed) . "</span><span style='color:#ff0000'>" . str_repeat("#",$not_performed) . "</span><br>";
}

// /web/templates/task_render/JobLog/list.php

/**
 * @var array $task
 */
use PhpQueue\Task;
use PhpQueue\TaskConst;
?>
<div class="row">
    <div class="col-md-12">
        <span class="pull-right label label-<?= TaskModel::$class_by_status[$task[TaskConst::STATUS]] ?>">
            <?= TaskModel::$status_text[$task[TaskConst::STATUS]] ?>
            <? if ($task[TaskConst::STATUS] == Task::STATUS_IN_PROCESS && $task[TaskConst::SUBTASKS_QUANTITY] > 0) { ?>
                <?= round(100 - (int)$task[TaskConst::SUBTASKS_QUANTITY_NOT_PERFORMED] / ((int)$task[TaskConst::SUBTASKS_QUANTITY] / 100)); ?>%
            <? } ?>
        </span>
        <h1 class="list-group-item-heading" style="text-align: center; color: #3a87ad">
            <?= $task[TaskConst::TASK_NAME] ?>
        </h1>
    </div>
</div>

结果

自定义列表

Custom list

自定义日志

Custom log