底板/leptir

此包已被弃用且不再维护。未建议替代包。

后台任务处理器

dev-master 2015-11-16 19:12 UTC

This package is not auto-updated.

Last update: 2016-02-14 07:47:10 UTC


README

Leptir 是一个异步、高度可扩展的任务处理器,可以使用多个代理(队列)并行处理。可以为不同的代理定义不同的优先级。

Build Status

Leptir 配置

    return array(
        'leptir' => array(
            'brokers' => array(
                   array(
                    'type' => 'mongodb',
                    'connection' => array(
                            'collection' => 'tasksp1',
                            'host' => 'localhost',
                            'port' => 27017,
                            'database' => 'leptir',
                            'options' => array(
                                'secondaryPreferred' => true
                            )

                    ),
                    'configuration' => array(
                            'priority' => 1,
                            'task_count_caching_time' => 0.6
                    )
                ),
                array(
                    'type' => 'mongodb',
                    'connection' => array(
                            'collection' => 'tasksp2'
                            // default values will be used
                    ),
                    'configuration' => array(
                            'priority' => 2
                    )
                )
            ),
            'loggers' => array(
                // loggers configuration explained later
            ),
            'daemon' => array(
                'configuration' => array(
                    'task_execution_time' => 600,
                    'number_of_workers' => 4,
                    'empty_queue_sleep_time' => 0.5,
                    'workers_active_sleep_time' => 0.2
                )
            ),
            'meta_storage' => array(
                array(
                    //storage configuration
                ),
                array(
                    // storage configuraion
                )
            ),
            'error_reporting' => array(
                array(
                    'class' => 'Application\ErrorReporting\ErrorReporting'
                )
            )
        )
    )

Leptir 配置参数说明

  • task_execution_time - 最大任务执行时间。如果任务在此时间内未完成,则将被终止。可以通过将任务推送到代理队列时为每个任务单独覆盖最大执行时间。
  • number_of_workers - 每个Leptir实例的最大活动工作者数。
  • empty_queue_sleep_time - 队列为空时休眠的时间(0 - 不休眠)--- 减少了代理的查询次数。(秒,支持浮点数)
  • workers_active_sleep_time - 当所有工作者都忙碌时休眠的时间(0 - 不休眠)--- 减少了代理的查询次数。(秒,支持浮点数)

参数 empty_queue_sleep_timeworkers_active_sleep_time 用于减少代理的查询次数(对于查询次数影响价格的 SQS 代理更有用)。

配置可以包含多个代理的定义,这些代理将同时使用。代理也可以有不同的优先级。优先级索引较低的代理有更高的概率其任务被选中。

配置还可以包含多个任务元信息存储的定义。

代理

代理本质上是一些队列,它们持有未处理的任务。每个代理都有调度支持。每个代理也分配了优先级(默认优先级为0)。目前支持的代理

  • MongoDB
  • Amazon SQS
  • Redis

可以通过使用具有不同优先级级别的多个代理来对任务进行优先级排序。

代理使用示例

    $broker = new Broker($brokersConfiguration);
    $task = new TestTask(
        array(
            'param1' => 'First parameter'
            'paramFloat' => 3.89,
            'randomIntParam' => 10
        )
    );
    $broker->pushTask($task, null, 1);

代理方法

    public function pushTask(BaseTask $task, \DateTime $timeOfExecution = null, $priority = 0, $timeLimit = 0)

方法用于将新任务推送到队列中。

每个代理都有一些配置选项

  • priority: 代理优先级级别(数字越小优先级越高)
  • task_count_caching_time - 队列中当前任务数量信息的缓存时间。用于减少对数据库/SQS 的请求次数。
MongoDB 代理

配置示例

    array(
        'type' => 'mongodb',
        'connection' => array(
            'host' => 'localhost',
            'port' => 27017,
            'database' => 'leptir' 
            'collection' => 'tasks',
            'options' => array(
                'connect' => true,
                'secondaryPreferred' => true
            )
        ),
        'configuration' => array(
            'priority' => 0
        )
    )    

此配置也是默认配置。在创建代理之前,默认配置将与用户定义的配置合并。

Amazon SQS 代理

配置示例

    array(
        'type' => 'sqs',
        'connection' => array(
            'sqs_key' => 'SQS KEY',
            'sqs_secret' => 'SQS SECRET',
            'sqs_queue' => 'QUEUE URI'
        ),
        'configuration' => array(
            'priority' => 0
        )
     ) 
Redis 代理

配置示例

    array(
        'type' => 'redis',
        'connection' => array(
            'scheme' => 'tcp',
            'host' => 'localhost',
            'port' => 6379,
            'database' => 0,
            'key' => 'leptir:ztasks'
        ),
        'configuration' => array(
            'priority' => 0
        )
     ) 

此配置也是默认配置。

任务元存储

用于存储有关已执行任务的信息。可以定义多个元信息存储。

目前支持的存储

  • MongoDB
  • Redis
Mongo 元存储

任务信息格式

{
    "status" : 3,
    "retC" : 1,
    "exTime" : 0.0005939006805419922,
    "respM" : "Test task done.",
    "type" : "Leptir\\Task\\Test\\TestTask",
    "exStart" : ISODate("2013-12-04T02:43:35Z"),
    "_id" : "10577529e96d5eb0225.50827478"
}
  • status - 任务当前状态(1 - 等待,2 - 进行中,3 - 完成)
  • retC - 任务返回的状态码(1 - 成功,2 - 警告,3 - 错误,4 - 未知,5 - 超出时间限制)
  • exTime - 执行时间
  • respM - 任务的响应消息
  • type - 任务类名称
  • exStart - 任务执行开始时间
  • _id - 唯一任务ID(由代理生成)

配置

    array(
        'connection' => array(
            'host' => 'localhost',
            'port' => 27017,
            'database' => 'leptir'
            'collection' => 'info'
            'options' => array(
                // mongo connection options
            )
        ),
        'configuration' => array(

        )
    )
Redis元存储

Redis元存储存储有关每个任务的键值对信息,其中键是任务ID,值是JSON编码的任务信息数组。

配置

    array(
        'connection' => array(
            'scheme' => 'tcp',
            'host' => 'localhost',
            'port' => 6379,
            'database' => 0
        ),
        'configuration' => array(
            'expire_after_seconds' => 86400
        )
    )

日志记录器

Leptir守护进程的日志记录器可以在配置文件中定义。可用的日志记录器

  • 文件日志记录器
  • STDOUT日志记录器
  • STDERR日志记录器

Leptir使用Zend\Logging库生成日志。配置示例

    'loggers' => array(
            'logfile' => array(
                'type' => 'file',
                'options' => array(
                    'path' => '/var/log/leptir.log'
                )
            ),
            'stdoutlog' => array(
                'type' => 'stdout'
            )
        )

logfilestdoutlog 是名称(仅用于可读性)。STDOUT和STDERR日志记录器不需要额外的选项。文件日志记录器的 path 是必需的。

错误报告

Leptir将捕获所有异常和错误并将此信息包含到元信息中。有时需要将每个错误报告给第三方服务以便更容易跟踪。我们可以定义多个错误报告实现,并配置Leptir使用这些实现将每个错误和异常转发。每个错误报告对象的实现都必须实现 Leptir\ErrorReport\ErrorReportInterface。配置示例

    'error_reporting' => array(
        array(
            'class' => 'Application\ErrorReporting\ErrorReporting'
        )
    )

ErrorReportingInterface 包含方法

  • reportException(\Exception $ex)
  • reportMessage($message)
  • addErrorData($errorData)

错误数据(额外的错误信息)不会自动清除。错误报告实现需要在每个报告时手动清除数据(如果需要的话)。

任务

每个任务都必须扩展抽象类 Leptir\Task\BaseTask。需要实现的方法是受保护的 doJob 方法。

简单任务示例

<?php

namespace Leptir\Task\Test;

use Leptir\Task\BaseTask;

/**
 * Simple task used for testing purpose. This task will not anything smart, it will just sleep for
 * random amount of seconds (between 6 and 19)
 *
 * Class SlowTask
 * @package Leptir\Task\Test
 */
class SlowTask extends BaseTask
{
    protected function doJob()
    {
        $sleepTime = rand(6, 19);
        $this->logInfo('Sleeping for '. $sleepTime);
        sleep($sleepTime);

        $this->addResponseLine('Task had a great nap');
        return self::EXIT_SUCCESS;
    }
}

可以覆盖的附加方法,以实现对任务执行的扩展控制。

  • protected function beforeStart() - 此方法将在 doJob 方法之前执行
  • protected function afterFinish() - 此方法将在 doJob 方法之后执行
  • public function getAdditionalMetaInfo() - 该方法应返回要保存为任务信息的关联数组(如之前描述的任务信息后端部分)

BaseTask类还实现了几个方法,可以在实现自定义任务时使用

  • 获取传递给任务参数的方法:
    protected function getInt($paramName, $defaultValue = null)
    protected function getString($paramName, $defaultValue = null)
    protected function getFloat($paramName, $defaultValue = null)
    protected function getArray($paramName, $defaultValue = null)

当参数类型不匹配请求的类型时,方法将抛出 Leptir\Exception\LeptirInputException

  • 当前执行时间 - 有时任务实现需要任务执行时间(例如,如果运行时间过长则执行某些操作)
    protected function getExecutionTime()

该方法将返回表示执行时间的浮点数。

  • 日志记录方法
    protected function logInfo($message)
    protected function logError($message)
    protected function logWarning($message)
    protected function logDebug($message)
  • 执行流方法 - 可以跟踪任务执行流程 - 帮助编写用户编写任务的单元测试
  • changeState($state) - 改变执行当前状态
  • getLastState() - 获取任务最后所在的状态
  • getExecutionFlow() - 返回一个数组,其中包含任务运行时所在的状态列表

命令行操作

启动Leptir
    php index.php leptir start [--daemon] [--config=] [--pid=]

Leptir可以作为守护进程启动。当Leptir以守护进程运行时,只有文件日志记录器会激活。

停止Leptir
    php index.php leptir stop [--pid=]

用于停止Leptir进程的命令。

将Leptir安装为服务
    php index.php leptir install  [--config=] [--pid=] [--php_path=]

用于将Leptir安装为服务的命令(创建 /etc/ini.d/leptir 文件)。文件内容

#!/bin/bash
PID_PATH='{{PID_PATH}}'

case "$1" in
start)
        # check if PID file exists
        if [ -f $PID_PATH ]; then
                pid="`cat $PID_PATH`"
                if ps $pid > /dev/null
                then
                        echo -e "\e[31mLeptir is already flying on this box.\e[0m"
                        exit 1
                else
                        echo -e "\e[33m.pid file is there, but process is not running. Cleaning .pid file and starting the process.\e[0m"
                        rm -f "$PID_PATH"
                fi
        fi
        echo -e "\e[32mStarting a little butterfly. Fly buddy, fly!\e[0m"
    {{PHP_PATH}}php {{ROOT_PATH}}/public/index.php leptir start  --config={{CONFIG_PATH}} --daemon --pid $PID_PATH
;;
stop)
    echo -ne "Stopping a little butterfly. You'll have to wait for all the tasks to finish though.\n"
        {{PHP_PATH}}php {{ROOT_PATH}}/public/index.php leptir stop --pid $PID_PATH
        while [ -f $PID_PATH ];
        do
                sleep 1
                echo -ne "."
        done
        echo
;;
*)
    echo "Usage: $0 (start|stop)"
    exit 1
esac

exit 0