2tvenom / php-queue
PHP任务执行队列
Requires
- php: >=5.4.0
This package is auto-updated.
Last update: 2024-09-08 22:04:21 UTC
README
The PhpQueue库提供执行PHP脚本的队列。该代码由Ven于2013年8月至2013年10月开发并维护。
您可以在github上在此处发送评论、补丁、问题,或发送到2tvenom@gmail.com
##队列 ###简单队列
###向队列中添加任务
PhpQueue有组件:队列、队列驱动程序、任务执行者和任务。
#####示例
<? use PhpQueue\Queue; use PhpQueue\Drivers\SqlPdoDriver; use PhpQueue\TaskPerformer; use PhpQueue\Task; $pdo = new \PDO("mysql:host=localhost;dbname=queue", "root", ""); $driver = new SqlPdoDriver($pdo); $queue = new Queue($driver); $task = new Task("Job"); $queue->add_task($task); ?>
###从队列中获取任务并执行 获取队列中任务并执行的步骤。如果队列中没有任务,则队列返回false
。
- 连接到队列
- 从队列中获取任务
- 队列设置任务状态为“处理中”
- 执行任务
- 在队列中修改任务状态(警告! 此步骤是必需的)
#####示例
<? $pdo = new \PDO("mysql:host=localhost;dbname=queue", "root", ""); $driver = new SqlPdoDriver($pdo); $queue = new Queue($driver); $task_performer = new TaskPerformer(); $task = $queue->get_task(); //in queue this task have status "In process" $task = $task_performer->execute_task($task); $queue->modify_task($task); ?>
任务
创建任务
任务名称这是在PhpQueue\Interfaces\IJob
中实现的类的名称
#####示例
<? class Job implements PhpQueue\Interfaces\IJob { public static function run(Task $task) { return 2*2; } } $task = new PhpQueue\Task("Job"); ?>
请求数据
任务可以接收输入数据。这是一个对象、数组或标量值。
#####示例
<? class Job implements PhpQueue\Interfaces\IJob { public static function run(Task $task) { return $task->get_request_data() * 2; } } ?>
您可以在构造函数中设置数据$task = new PhpQueue\Task("Job", 100);
或通过方法$task->set_request_data(100);
###从队列中选择任务
###优先级 队列中的任务按优先级排序。默认情况下,所有任务都具有零优先级。如果任务具有相同的优先级,则按ID降序排序。
设置优先级:set_priority($priority)
。优先级必须是整数。
获取优先级:get_priority()
####示例
<? $task = new PhpQueue\Task("Job"); $queue->add_task($task); $super_task = new PhpQueue\Task("SuperJob"); $super_task->set_priority(10); $queue->add_task($super_task);
####结果 首先执行的任务将是SuperJob
,其次是Job
###任务组ID
可以通过task_group_id
将队列划分为几个队列。默认情况下,所有任务的组ID为0
。
设置任务组ID:set_task_group_id($group_id)
。组ID必须是整数。
获取优先级:get_task_group_id()
####示例 添加任务
<? $task1 = new PhpQueue\Task("Job1"); $task1->set_task_group_id(1); $queue->add_task($task1); $task2 = new PhpQueue\Task("Job2"); $task2->set_task_group_id(2); $queue->add_task($task2);
获取任务
<? $task1 = $queue->get_task(array( TaskConst::TASK_GROUP_ID => 1 )); $task2 = $queue->get_task(array( TaskConst::TASK_GROUP_ID => 2 )); $task3 = $queue->get_task();
####结果 task1是Job1
task2是Job2
task3是false
执行日期
任务将在此日期后执行。
设置执行日期:set_execution_date($date)
。日期格式为Y-m-d H:i:s
获取日期:get_execution_date()
####示例
<? $task = new Task("JobByDate"); $task->set_execution_date(date('Y-m-d H:i:s', strtotime('now') + 10)); $queue->add_task($task); $task1 = $queue->get_task(); sleep(15); $task2 = $queue->get_task();
####结果 task1是false
task2是JobByDate
唯一ID
所有任务都有唯一的ID。在创建对象时生成。唯一ID是32个字符的字符串(md5)。
根据唯一ID从队列中选择未设置任务状态为 进行中
获取任务的唯一ID: get_uniqid()
。
####示例 添加到队列
<? $task = new Task("Job"); $queue->add_task($task); $uniqid = $task->get_uniqid();
####结果 字符串: a8a042ffabf5230dfdfa0a2cf9d47110
根据唯一ID选择
<? $task = $queue->get_task(array( TaskConst::UNIQID => "a8a042ffabf5230dfdfa0a2cf9d47110" ));
####结果 $task 是唯一ID为 a8a042ffabf5230dfdfa0a2cf9d47110
的 Job
任务。任务在队列中仍处于 新
状态
##任务和子任务
###子任务
PhpQueue有两种任务类型
- 简单任务
- 带有子任务的任务
任务可以有一个嵌套层级的子任务。
####示例 添加子任务
<? $task = new Task("JobWithSubTasks"); $task ->sub_tasks() ->add(new \PhpQueue\Task("Job1", 5)) ->add(new \PhpQueue\Task("Job2", 10)) ->add(new \PhpQueue\Task("Job3", 15)); $queue->add_task($task);
执行者只会执行子任务。
####示例 执行子任务
for($i=0; $i<3; $i++){ $task = $queue->get_task(); $task = $task_performer->execute_task($task); $queue->modify_task($task); }
####结果 执行步骤
- 执行
Job1
。状态完成 - 执行
Job2
。状态完成 - 执行
Job3
。状态完成 JobWithSubTasks
状态完成。 父任务不执行。
独占任务
默认情况下,任务具有非独占状态 false
。
非独占
任务的子任务可以执行所有任务执行者。
#####示例 添加任务
<? $task = new Task("NonExclusiveTask"); $task ->sub_tasks() ->add(new \PhpQueue\Task("Job1", 5)) ->add(new \PhpQueue\Task("Job2", 10)) ->add(new \PhpQueue\Task("Job3", 15)); $queue->add_task($task);
在三个服务器 (server1, server2, server3) 上启动了 TaskPerformers
<? $performer_name = php_uname('n'); $task_performer = new TaskPerformer($performer_name); $task = $queue->get_task(); $task = $task_performer->execute_task($task); $queue->modify_task($task);
####结果 NonExclusiveTask 任务状态是 完成
Job1
由server1
执行。状态完成
Job2
由server2
执行。状态完成
Job3
由server3
执行。状态完成
获取执行者名称: get_performer()
。默认执行者名称是 Default_Performer
独占
任务的子任务只能执行一个任务执行者。
设置独占: set_exclusive(true)
#####示例 添加任务
<? $task = new Task("ExclusiveTask"); $task ->set_exclusive(true) ->sub_tasks() ->add(new \PhpQueue\Task("Job1", 5)) ->add(new \PhpQueue\Task("Job2", 10)) ->add(new \PhpQueue\Task("Job3", 15)); $queue->add_task($task);
在三个服务器 (server1, server2, server3) 上启动了 TaskPerformers
<? $performer_name = php_uname('n'); $task_performer = new TaskPerformer($performer_name); $task = $queue->get_task( array(TaskConst::PERFORMER => $performer_name) ); $task = $task_performer->execute_task($task); $queue->modify_task($task);
####结果 ExclusiveTask 任务状态是 完成
Job1
由server1
执行。状态完成
Job2
由server1
执行。状态完成
Job3
由server1
执行。状态完成
server2
从队列接收到false
server3
从队列接收到false
####警告 ####在使用独占任务时,所有 TaskPerformers 必须有唯一名称。如果没有这样做,可能会破坏队列的逻辑。如果不在队列请求 get_task()
中添加 TaskConst::PERFORMER 唯一名称,队列和执行者不会接收到第一个执行后的第二个子任务。
任务响应,日志,父任务
响应
任务响应返回 run
方法的数据
####示例
<? $task = new Task("ParentJob"); $task ->set_exclusive(true) ->sub_tasks() ->add(new \PhpQueue\Task("Job1", 5)) ->add(new \PhpQueue\Task("Job2", 10)) ->add(new \PhpQueue\Task("Job3", 15)); class Job1 implements PhpQueue\Interfaces\IJob { public static function run(Task $task) { $sub_task_request_data = $task->get_request_data(); return $sub_task_request_data + 5; } }
####结果 获取响应: $task->response()->get_response()
Job1
响应: 10Job2
响应: 15Job3
响应: 20
日志
任务中的日志记录 ####示例
<? class Job1 implements PhpQueue\Interfaces\IJob { public static function run(Task $task) { $arg1 = 1; $task->response()->set_log('Step 1'); $arg2 = 2; $task->response()->set_log('Step 2'); return $arg1+$arg2; } }
####结果
<? $logs = $task->response()->get_log();
返回:数组 array('Step 1', 'Step2');
访问父任务
方法 $task->parent_task()
####示例
<? $task = new Task("ParentTask", 5); $task ->set_exclusive(true) ->sub_tasks() ->add(new \PhpQueue\Task("Job1", 5)) ->add(new \PhpQueue\Task("Job2", 10)) ->add(new \PhpQueue\Task("Job3", 15)); class Job1 implements PhpQueue\Interfaces\IJob { public static function run(Task $task) { $sub_task_request_data = $task->get_request_data(); $parent_request_data = $task->parent_task()->get_request_data(); return $sub_task_request_data * $parent_request_data; } } class Job2 extends Job1 {} class Job3 extends Job1 {}
####结果 响应
Job1
响应: 25Job2
响应: 50Job3
响应: 75
任务和子任务中的错误
简单任务中的错误
在第一次异常后,任务得到状态: 错误
。您可以设置任务的最大错误尝试次数。
设置最大错误尝试次数 $task->settings()->set_error_max_trial($trials)
。其中 $trials
是最大尝试次数(int)。
获取当前尝试次数 $task->settings()->get_trial()
。返回:int,当前尝试次数。
####示例
<? $task = new Task("ExceptionTask"); $task->settings()->set_error_max_trial(5); class ExceptionTask implements PhpQueue\Interfaces\IJob { public static function run(Task $task) { if($task->settings()->get_trial() < 3) { throw new Exception("Test exception"); } return 100; } }
####结果 响应
- 错误。状态 "新"
- 错误。状态 "新"
- 完成。
带子任务的任务中的错误
默认情况下,在最后一次错误尝试后,父任务得到状态 错误
。
如果需要继续执行子任务,则需要将 error break
设置为 false
设置错误中断标志 $task->settings()->set_error_break($flag)
。其中 $flag
是 bool。
####示例
<? $task = new Task("TaskWithOneExceptionSubTask"); $task-> subtasks() ->add(new \PhpQueue\Task("Job1")) ->add(new \PhpQueue\Task("ExceptionTask")) ->add(new \PhpQueue\Task("Job2")); class ExceptionTask implements PhpQueue\Interfaces\IJob { public static function run(Task $task) { throw new Exception("Test exception"); } } class Job1 implements PhpQueue\Interfaces\IJob { public static function run(Task $task) { return true; } } class Job2 extends Job1 {}
####错误中断标志 = false 的结果
Job1
状态完成
ExceptionTask
状态错误
TaskWithOneExceptionSubTask
状态错误
####错误中断标志 = true 的结果
Job1
状态完成
ExceptionTask
状态错误
Job2
状态完成
TaskWithOneExceptionSubTask
状态完成但有错误
嵌套设置
在父任务中设置,覆盖子任务的设置。子任务的设置会覆盖父任务的设置。
####示例
<? $task = new Task("ParentJob"); $task->settings()->set_error_max_trial(5); $job1 = new \PhpQueue\Task("Job1"); $job2 = new \PhpQueue\Task("Job2"); $job2->settings()->set_error_max_trial(2); $job3 = new \PhpQueue\Task("Job3"); $task->subtasks()->add(array($job1, $job2, $job3,));
####结果
Job1
最大尝试次数 5Job1
最大尝试次数 2Job1
最大尝试次数 5
错误日志
从任务获取异常日志:$task->response()->get_error()
返回:数组
回调和错误回调
回调
任务后执行回调。
PhpQueue 有两种类型的回调
- 回调
PhpQueue\Interfaces\ICallback
- 错误回调
PhpQueue\Interfaces\IErrorCallback
示例
<? $task = new Task("TaskWithCallback"); $task->set_callback("SimpleCallback"); class SimpleCallback implements PhpQueue\Interfaces\ICallback { public static function callback_run(Task $task) { echo "Callback"; } }
结果
执行了 TaskWithCallback
并在屏幕上显示 "Callback"。
错误回调
任务错误后执行错误回调
<? $task = new Task("TaskWithError"); $task ->set_callback("SimpleCallback") ->set_error_callback("ErrorCallback"); class TaskWithError implements PhpQueue\Interfaces\IJob { public static function run(Task $task) { throw new Exception("Test exception"); } } class ErrorCallback implements PhpQueue\Interfaces\IErrorCallback { public static function callback_error_run(Task $task) { echo "ERROR!"; } }
结果
执行了 TaskWithCallback
并在屏幕上显示 "ERROR!"。
父任务的回调
子任务和父任务都可以有回调。首先执行子任务回调,然后执行父任务回调。
示例
<? $task = new Task("TaskWithSubTasks"); $task->set_callback("ParentCallback") $job1 =new \PhpQueue\Task("Job"); $job1->set_callback("Callback"); $job2 =new \PhpQueue\Task("Job"); $job3 =new \PhpQueue\Task("Job"); $job4 =new \PhpQueue\Task("Job"); $job4->set_callback("Callback") $task->subtasks()->add(array($job1, $job2, $job3, $job4)); class Job implements PhpQueue\Interfaces\IJob { public static function run(Task $task) { echo "Job"; } } class Callback implements PhpQueue\Interfaces\ICallback { public static function callback_run(Task $task) { echo "Callback"; } }
结果
显示在屏幕上
Job
Callback
ParentCallback
Job
ParentCallback
Job
ParentCallback
Job
Callback
ParentCallback
全局回调
任务执行者可以调用 Callback 和 Error 回调
示例
<? $task = new Task("TaskWithException"); class TaskWithException implements PhpQueue\Interfaces\IJob { public static function run(Task $task) { throw new Exception("Test exception"); } } class ErrorCallback implements PhpQueue\Interfaces\IErrorCallback { public static function callback_error_run(Task $task) { echo "GLOBAL ERROR!"; } } $task_performer = new TaskPerformer(); $task_performer->set_global_error_callback("ErrorCallback"); $task_performer->execute_task($task);
结果
显示在屏幕上 "GLOBAL ERROR!"
回调中的错误
回调任务中的错误后,任务获得状态:CALLBACK ERROR
而不执行其他回调。
回调执行优先级
回调
- 子任务回调/错误回调
- 父任务回调/错误回调
- 全局回调/错误回调
队列驱动程序
PhpQueue 支持
- MySql
- PostgreSQL
- SQLite
- 文件
未来:Mongo, Redis
SqlPdoDriver
要求: PDO
示例
创建基于 MySQL 的队列
<? $pdo = new \PDO("mysql:host=localhost;dbname=queue", "root", ""); $driver = new SqlPdoDriver($pdo); $queue = new Queue($driver);
FileDriver
要求: SimpleXMLElement
示例
创建基于文件的队列
<? $driver = new SqlPdoDriver('queue_folder'); $queue = new Queue($driver);
Autoloader
示例
<? require_once __DIR__ . "/../src/PhpQueue/AutoLoader.php"; use PhpQueue\AutoLoader; AutoLoader::RegisterDirectory(array('Callbacks', 'Tasks/Example')); AutoLoader::RegisterNamespaces(array('PhpQueue' => '../src/PhpQueue')); AutoLoader::RegisterAutoLoader();
Web界面
安装
只需将队列文件从 web
文件夹复制到 web 服务器文件夹。
驱动连接
在文件 web/index.php
中找到此行
<? $web_driver = include(DRIVERS_PATH . "SqlWebDriver.php");
并更改路径到具有驱动程序的文件(如果需要的话)。
###驱动连接属性
在 drivers
文件夹中的驱动文件中的连接属性。
####示例 SqlWebDriver 的连接属性
<? define("QUEUE_HOST", "localhost"); define("QUEUE_DATABASE", "queue"); define("QUEUE_USER", "root"); define("QUEUE_PASSWORD", ""); define("QUEUE_TABLE", "queue_tasks");
自定义
您可以自定义网页界面
PhpQueue 有 4 种类型的网页界面
- 列表
- 详细信息
- 日志
- 错误日志
需要自定义任务时,需要创建一个与任务名称相同的文件夹。
####示例
<? //Task class class JobLog implements Interfaces\IJob { public static function run(Task $task) { for($i=0; $i<6; $i++) { $task->response()->set_log(ceil(rand(0, 100) / 10)); } return $task->get_request_data()*2; } } // /web/templates/task_render/JobLog/log.php foreach($list as $_id => $log_string){ $performed = (int)$log_string; $not_performed = 10 - $performed; echo "Fork {$_id} <span style='color:#009944'>" . str_repeat("#", $performed) . "</span><span style='color:#ff0000'>" . str_repeat("#",$not_performed) . "</span><br>"; } // /web/templates/task_render/JobLog/list.php /** * @var array $task */ use PhpQueue\Task; use PhpQueue\TaskConst; ?> <div class="row"> <div class="col-md-12"> <span class="pull-right label label-<?= TaskModel::$class_by_status[$task[TaskConst::STATUS]] ?>"> <?= TaskModel::$status_text[$task[TaskConst::STATUS]] ?> <? if ($task[TaskConst::STATUS] == Task::STATUS_IN_PROCESS && $task[TaskConst::SUBTASKS_QUANTITY] > 0) { ?> <?= round(100 - (int)$task[TaskConst::SUBTASKS_QUANTITY_NOT_PERFORMED] / ((int)$task[TaskConst::SUBTASKS_QUANTITY] / 100)); ?>% <? } ?> </span> <h1 class="list-group-item-heading" style="text-align: center; color: #3a87ad"> <?= $task[TaskConst::TASK_NAME] ?> </h1> </div> </div>
结果
自定义列表
自定义日志