底板 / leptir
后台任务处理器
Requires
- php: >=5.4.0
- zendframework/zend-config: 2.*
- zendframework/zend-http: 2.*
- zendframework/zendframework: 2.*
Requires (Dev)
- amazonwebservices/aws-sdk-for-php: dev-master
- aws/aws-sdk-php: *
- phpunit/phpunit: 3.7.*
- predis/predis: 1.1.*@dev
This package is not auto-updated.
Last update: 2016-02-14 07:47:10 UTC
README
Leptir 是一个异步、高度可扩展的任务处理器,可以使用多个代理(队列)并行处理。可以为不同的代理定义不同的优先级。
Leptir 配置
return array( 'leptir' => array( 'brokers' => array( array( 'type' => 'mongodb', 'connection' => array( 'collection' => 'tasksp1', 'host' => 'localhost', 'port' => 27017, 'database' => 'leptir', 'options' => array( 'secondaryPreferred' => true ) ), 'configuration' => array( 'priority' => 1, 'task_count_caching_time' => 0.6 ) ), array( 'type' => 'mongodb', 'connection' => array( 'collection' => 'tasksp2' // default values will be used ), 'configuration' => array( 'priority' => 2 ) ) ), 'loggers' => array( // loggers configuration explained later ), 'daemon' => array( 'configuration' => array( 'task_execution_time' => 600, 'number_of_workers' => 4, 'empty_queue_sleep_time' => 0.5, 'workers_active_sleep_time' => 0.2 ) ), 'meta_storage' => array( array( //storage configuration ), array( // storage configuraion ) ), 'error_reporting' => array( array( 'class' => 'Application\ErrorReporting\ErrorReporting' ) ) ) )
Leptir 配置参数说明
task_execution_time
- 最大任务执行时间。如果任务在此时间内未完成,则将被终止。可以通过将任务推送到代理队列时为每个任务单独覆盖最大执行时间。number_of_workers
- 每个Leptir实例的最大活动工作者数。empty_queue_sleep_time
- 队列为空时休眠的时间(0 - 不休眠)--- 减少了代理的查询次数。(秒,支持浮点数)workers_active_sleep_time
- 当所有工作者都忙碌时休眠的时间(0 - 不休眠)--- 减少了代理的查询次数。(秒,支持浮点数)
参数 empty_queue_sleep_time
和 workers_active_sleep_time
用于减少代理的查询次数(对于查询次数影响价格的 SQS 代理更有用)。
配置可以包含多个代理的定义,这些代理将同时使用。代理也可以有不同的优先级。优先级索引较低的代理有更高的概率其任务被选中。
配置还可以包含多个任务元信息存储的定义。
代理
代理本质上是一些队列,它们持有未处理的任务。每个代理都有调度支持。每个代理也分配了优先级(默认优先级为0)。目前支持的代理
- MongoDB
- Amazon SQS
- Redis
可以通过使用具有不同优先级级别的多个代理来对任务进行优先级排序。
$broker = new Broker($brokersConfiguration); $task = new TestTask( array( 'param1' => 'First parameter' 'paramFloat' => 3.89, 'randomIntParam' => 10 ) ); $broker->pushTask($task, null, 1);
代理方法
public function pushTask(BaseTask $task, \DateTime $timeOfExecution = null, $priority = 0, $timeLimit = 0)
方法用于将新任务推送到队列中。
每个代理都有一些配置选项
priority
: 代理优先级级别(数字越小优先级越高)task_count_caching_time
- 队列中当前任务数量信息的缓存时间。用于减少对数据库/SQS 的请求次数。
配置示例
array( 'type' => 'mongodb', 'connection' => array( 'host' => 'localhost', 'port' => 27017, 'database' => 'leptir' 'collection' => 'tasks', 'options' => array( 'connect' => true, 'secondaryPreferred' => true ) ), 'configuration' => array( 'priority' => 0 ) )
此配置也是默认配置。在创建代理之前,默认配置将与用户定义的配置合并。
Amazon SQS 代理配置示例
array( 'type' => 'sqs', 'connection' => array( 'sqs_key' => 'SQS KEY', 'sqs_secret' => 'SQS SECRET', 'sqs_queue' => 'QUEUE URI' ), 'configuration' => array( 'priority' => 0 ) )Redis 代理
配置示例
array( 'type' => 'redis', 'connection' => array( 'scheme' => 'tcp', 'host' => 'localhost', 'port' => 6379, 'database' => 0, 'key' => 'leptir:ztasks' ), 'configuration' => array( 'priority' => 0 ) )
此配置也是默认配置。
任务元存储
用于存储有关已执行任务的信息。可以定义多个元信息存储。
目前支持的存储
- MongoDB
- Redis
任务信息格式
{ "status" : 3, "retC" : 1, "exTime" : 0.0005939006805419922, "respM" : "Test task done.", "type" : "Leptir\\Task\\Test\\TestTask", "exStart" : ISODate("2013-12-04T02:43:35Z"), "_id" : "10577529e96d5eb0225.50827478" }
status
- 任务当前状态(1 - 等待,2 - 进行中,3 - 完成)retC
- 任务返回的状态码(1 - 成功,2 - 警告,3 - 错误,4 - 未知,5 - 超出时间限制)exTime
- 执行时间respM
- 任务的响应消息type
- 任务类名称exStart
- 任务执行开始时间_id
- 唯一任务ID(由代理生成)
配置
array( 'connection' => array( 'host' => 'localhost', 'port' => 27017, 'database' => 'leptir' 'collection' => 'info' 'options' => array( // mongo connection options ) ), 'configuration' => array( ) )Redis元存储
Redis元存储存储有关每个任务的键值对信息,其中键是任务ID,值是JSON编码的任务信息数组。
配置
array( 'connection' => array( 'scheme' => 'tcp', 'host' => 'localhost', 'port' => 6379, 'database' => 0 ), 'configuration' => array( 'expire_after_seconds' => 86400 ) )
日志记录器
Leptir守护进程的日志记录器可以在配置文件中定义。可用的日志记录器
- 文件日志记录器
- STDOUT日志记录器
- STDERR日志记录器
Leptir使用Zend\Logging库生成日志。配置示例
'loggers' => array( 'logfile' => array( 'type' => 'file', 'options' => array( 'path' => '/var/log/leptir.log' ) ), 'stdoutlog' => array( 'type' => 'stdout' ) )
logfile
和 stdoutlog
是名称(仅用于可读性)。STDOUT和STDERR日志记录器不需要额外的选项。文件日志记录器的 path
是必需的。
错误报告
Leptir将捕获所有异常和错误并将此信息包含到元信息中。有时需要将每个错误报告给第三方服务以便更容易跟踪。我们可以定义多个错误报告实现,并配置Leptir使用这些实现将每个错误和异常转发。每个错误报告对象的实现都必须实现 Leptir\ErrorReport\ErrorReportInterface
。配置示例
'error_reporting' => array( array( 'class' => 'Application\ErrorReporting\ErrorReporting' ) )
ErrorReportingInterface 包含方法
reportException(\Exception $ex)
reportMessage($message)
addErrorData($errorData)
错误数据(额外的错误信息)不会自动清除。错误报告实现需要在每个报告时手动清除数据(如果需要的话)。
任务
每个任务都必须扩展抽象类 Leptir\Task\BaseTask
。需要实现的方法是受保护的 doJob
方法。
简单任务示例
<?php namespace Leptir\Task\Test; use Leptir\Task\BaseTask; /** * Simple task used for testing purpose. This task will not anything smart, it will just sleep for * random amount of seconds (between 6 and 19) * * Class SlowTask * @package Leptir\Task\Test */ class SlowTask extends BaseTask { protected function doJob() { $sleepTime = rand(6, 19); $this->logInfo('Sleeping for '. $sleepTime); sleep($sleepTime); $this->addResponseLine('Task had a great nap'); return self::EXIT_SUCCESS; } }
可以覆盖的附加方法,以实现对任务执行的扩展控制。
protected function beforeStart()
- 此方法将在 doJob 方法之前执行protected function afterFinish()
- 此方法将在 doJob 方法之后执行public function getAdditionalMetaInfo()
- 该方法应返回要保存为任务信息的关联数组(如之前描述的任务信息后端部分)
BaseTask类还实现了几个方法,可以在实现自定义任务时使用
- 获取传递给任务参数的方法:
protected function getInt($paramName, $defaultValue = null)
protected function getString($paramName, $defaultValue = null)
protected function getFloat($paramName, $defaultValue = null)
protected function getArray($paramName, $defaultValue = null)
当参数类型不匹配请求的类型时,方法将抛出 Leptir\Exception\LeptirInputException。
- 当前执行时间 - 有时任务实现需要任务执行时间(例如,如果运行时间过长则执行某些操作)
protected function getExecutionTime()
该方法将返回表示执行时间的浮点数。
- 日志记录方法
protected function logInfo($message)
protected function logError($message)
protected function logWarning($message)
protected function logDebug($message)
- 执行流方法 - 可以跟踪任务执行流程 - 帮助编写用户编写任务的单元测试
changeState($state)
- 改变执行当前状态getLastState()
- 获取任务最后所在的状态getExecutionFlow()
- 返回一个数组,其中包含任务运行时所在的状态列表
命令行操作
启动Leptirphp index.php leptir start [--daemon] [--config=] [--pid=]
Leptir可以作为守护进程启动。当Leptir以守护进程运行时,只有文件日志记录器会激活。
停止Leptirphp index.php leptir stop [--pid=]
用于停止Leptir进程的命令。
将Leptir安装为服务php index.php leptir install [--config=] [--pid=] [--php_path=]
用于将Leptir安装为服务的命令(创建 /etc/ini.d/leptir
文件)。文件内容
#!/bin/bash PID_PATH='{{PID_PATH}}' case "$1" in start) # check if PID file exists if [ -f $PID_PATH ]; then pid="`cat $PID_PATH`" if ps $pid > /dev/null then echo -e "\e[31mLeptir is already flying on this box.\e[0m" exit 1 else echo -e "\e[33m.pid file is there, but process is not running. Cleaning .pid file and starting the process.\e[0m" rm -f "$PID_PATH" fi fi echo -e "\e[32mStarting a little butterfly. Fly buddy, fly!\e[0m" {{PHP_PATH}}php {{ROOT_PATH}}/public/index.php leptir start --config={{CONFIG_PATH}} --daemon --pid $PID_PATH ;; stop) echo -ne "Stopping a little butterfly. You'll have to wait for all the tasks to finish though.\n" {{PHP_PATH}}php {{ROOT_PATH}}/public/index.php leptir stop --pid $PID_PATH while [ -f $PID_PATH ]; do sleep 1 echo -ne "." done echo ;; *) echo "Usage: $0 (start|stop)" exit 1 esac exit 0