advissmedia/yii2-daemon

扩展提供了创建和控制简单守护进程的功能

安装: 85

依赖项: 0

建议者: 0

安全性: 0

星星: 2

关注者: 1

分支: 34

类型:yii2-extension

0.2.7 2019-10-14 10:16 UTC

This package is not auto-updated.

Last update: 2024-09-24 09:52:24 UTC


README

扩展提供了创建和控制简单守护进程的功能。还包括守护进程监督器。

安装

安装此扩展的最佳方式是通过composer

运行以下命令之一:

php composer.phar require --prefer-dist advissmedia/yii2-daemon "*"

或者添加以下内容到您的composer.json文件的require部分:

"advissmedia/yii2-daemon": "*"

设置WatcherDaemon(监督器)

WatcherDaemon是主要的守护进程,提供从盒子里开始的功能。这个守护进程检查其他守护进程并运行,如果需要的话。执行以下步骤:

  1. 在您的控制台控制器路径中创建一个文件,例如DaemonsSupervisorController.php,内容如下
<?php

namespace console\controllers;
use \advissmedia\daemon\controllers\WatcherDaemonController;

class DaemonsSupervisorController extends WatcherDaemonController
{
    /**
     * @return array
     */
    protected function getDaemonsList()
    {
        //TODO: modify list, or get it from config, it does not matter
        $daemons = [
            ['className' => 'FirstDaemonController', 'enabled' => true, 'hardKill' => false],
            ['className' => 'AnotherDaemonController', 'enabled' => false, 'hardKill' => true]
        ];
        return $daemons;
    }
}

或者从文件中获取这个作业数组,例如json数组

<?php

namespace console\controllers;
use \advissmedia\daemon\controllers\WatcherDaemonController;

class DaemonsSupervisorController extends WatcherDaemonController
{
    /**
     * @return array
     */
    protected function getDaemonsList()
    {
        $string = @file_get_contents("/url/or/uri/to/json/file.json");
        if ($string === false || empty($daemons = json_decode($string,true)))
            return [];
        return $daemons;
    }
}

并使用file.json格式

[
   {
	"daemon":"FirstDaemonController",
	"enabled":true,
	"hardKill":false
   },
   {
	"daemon":"AnotherDaemonController",
	"enabled":false,
	"hardKill":false
   }
]

或者从数据库中获取运行中的守护进程列表

  1. 没有人检查Watcher。Watcher应该持续运行。将其添加到您的crontab中
* * * * * /path/to/yii/project/yii watcher-daemon --demonize=1

Watcher不能启动两次,一次只能有一个实例在工作。

用法

创建新的守护进程

  1. 在您的控制台控制器路径中创建一个文件{NAME}DaemonController.php,内容如下
<?php

namespace console\controllers;

use \advissmedia\daemon\DaemonController;

class {NAME}DaemonController extends DaemonController
{
    /**
     * @return array
     */
    protected function defineJobs()
    {
        /*
        TODO: return task list, extracted from DB, queue managers and so on. 
        Extract tasks in small portions, to reduce memory usage.
        */
    }
    /**
     * @return jobtype
     */
    protected function doJob($job)
    {
        /*
        TODO: implement you logic
        Don't forget to mark task as completed in your task source
        */
    }


    const DEF_FIRST_JOB = 1;
    const DEF_SOCOND_JOB = 3;

    /**
     * {@inheritdoc}
     */
    public function init()
    {
        parent::init();
        /*
         * Set type of jobs list can be processed
         */
        $this->isJobsListPersistent = true;
        /*
         * Array connection name for renew connections for daemon process
         */
        $this->connections = ['pgdb', 'sqltdb'];
    }

    /**
     * @return array
     */
    protected function defineJobs()
    {
        /*
        TODO: return task list, extracted from DB, queue managers or jast array. 
        Extract tasks in small portions, to reduce memory usage.
        */

        return [
            ['job_id' => self::DEF_FIRST_JOB],
            ['job_id' => self::DEF_SECOND_JOB]
        ];
    }

    /**
     * @param array $job
     * @return bool
     * @throws \Exception
     * @throws \Throwable
     * @throws \yii\db\Exception
     */
    protected function doJob($job)
    {
        /*
        TODO: implement you logic
        Don't forget to mark task as completed in your task source
        */

        if (array_key_exists('job_id', $job)) {
            switch ($job['job_id']) {
                case self::DEF_FIRST_JOB:
                    $ret = $this->processFirstJob();
                    break;
                case self::DEF_SECOND_JOB:
                    $ret = $this->processSecondJob();
                    break;
                default:
                    $ret = false;
            }
            return $ret;
        }
        return false;
    }
}
  1. 实现逻辑。
  2. 将新的守护进程添加到Watcher的守护进程列表中。

与RabbitMQ交互(此示例需要"videlalvaro/php-amqplib"包)

<?php

namespace console\controllers\daemons;

use console\components\controllers\BaseDaemonController;
use PhpAmqpLib\Channel\AMQPChannel;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

/**
 * Class SomeRabbitQueueController
 */
class SomeRabbitQueueController extends BaseDaemonController
{
    /**
     *  @var $connection AMQPStreamConnection
     */
    protected $connection;

    /**
     *  @var $connection AMQPChannel
     */
    protected $channel;


    /**
     * @return array|bool
     */
    protected function defineJobs()
    {
        $channel = $this->getQueue();
        while (count($channel->callbacks)) {
            try {
                $channel->wait(null, true, 5);
            } catch (\PhpAmqpLib\Exception\AMQPTimeoutException $timeout) {

            } catch (\PhpAmqpLib\Exception\AMQPRuntimeException $runtime) {
                \Yii::error($runtime->getMessage());
                $this->channel = null;
                $this->connection = null;
            }
        }
        return false;
    }

    /**
     * @param AMQPMessage $job
     * @return bool
     * @throws NotSupportedException
     * @SuppressWarnings(PHPMD.UnusedFormalParameter)
     */
    protected function doJob($job)
    {
        $result = false;

        //do somethink here and set $result

        if ($result) {
            $this->ask($job);
        } else {
            $this->nask($job);
        }
        return $result;
    }


    /**
     * @return AMQPChannel
     * @throws InvalidParamException
     */
    protected function getQueue()
    {
        if ($this->channel == null) {
            if ($this->connection == null) {
                if (isset(\Yii::$app->params['rabbit'])) {
                    $rabbit = \Yii::$app->params['rabbit'];
                } else {
                    throw new InvalidParamException('Bad config RabbitMQ');
                }
                $this->connection = new AMQPStreamConnection($rabbit['host'], $rabbit['port'], $rabbit['user'], $rabbit['password']);
            }

            $this->channel = $this->connection->channel();

            $this->channel->exchange_declare($this->exchange, $this->type, false, true, false);

            $args = [];

            if ($this->dlx) {
                $args['x-dead-letter-exchange'] = ['S', $this->exchange];
                $args['x-dead-letter-routing-key'] = ['S',$this->dlx];
            }
            if ($this->max_length) {
                $args['x-max-length'] = ['I', $this->max_length];
            }
            if ($this->max_bytes) {
                $args['x-max-length-bytes'] = ['I', $this->max_bytes];
            }
            if ($this->max_priority) {
                $args['x-max-priority'] = ['I', $this->max_priority];
            }

            list($queue_name, ,) = $this->channel->queue_declare($this->queue_name, false, true, false, false, false, $args);

            foreach ($this->binding_keys as $binding_key) {
                $this->channel->queue_bind($queue_name, $this->exchange, $binding_key);
            }

            $this->channel->basic_consume($queue_name, '', false, false, false, false, [$this, 'doJob']);
        }

        return $this->channel;
    }


    /**
     * @param $job
     */
    protected function ask($job)
    {
        $job->delivery_info['channel']->basic_ack($job->delivery_info['delivery_tag']);
    }

    /**
     * @param $job
     */
    protected function nask($job)
    {
        $job->delivery_info['channel']->basic_nack($job->delivery_info['delivery_tag']);
    }
}

守护进程设置(属性)

在您的守护进程中,您可以覆盖父属性

  • $demonize - 如果为0,守护进程不会作为守护进程运行,而仅作为简单的控制台应用程序。这需要调试。
  • $memoryLimit - 如果守护进程达到此限制,则守护进程停止工作。这可以防止内存泄漏。停止WatcherDaemon后再次运行此守护进程。
  • $sleep - 检查新任务之间的延迟,如果任务列表已满,守护进程不会休眠。
  • $pidDir - 守护进程pid所在的目录
  • $logDir - 守护进程日志所在的目录
  • $isMultiInstance - 此选项允许守护进程为每个任务创建自我副本。这意味着守护进程可以同时执行多个任务。这在需要一些时间和服务器资源执行许多此类任务时很有用。
  • $maxChildProcesses - 只有在$isMultiInstance=true的情况下。守护进程实例的最大数量。如果达到最大数量,则系统会等待至少一个子进程终止。
  • $isJobsListPersistent - 更改作业列表处理方法。如果为false,则作业列表处理始终在存在作业和空闲工作者时进行。如果为true,则从第一个作业到最后一个作业处理作业列表,并在工作器休眠指定时间后再次开始处理第一个作业。

守护进程控制

守护进程捕获某些信号并执行某些操作。以下是捕获的信号列表以及守护进程在接收到这些信号时的操作

  • SIGINTSIGTERM - 守护进程关闭,从循环中跳出并写入日志数据,并关闭已使用的资源。
  • SIGCHLD - 关闭所有子工作者,主守护进程进程仍在工作。
  • SIGUSR1 - 将有关守护进程运行时间的Linux系统日志信息写入。

记录器设置

如果您想更改日志设置,可以覆盖initLogger函数。示例

    /**
     * Adjusting logger. You can override it.
     */
    protected function initLogger()
    {

        $targets = \Yii::$app->getLog()->targets;
        foreach ($targets as $name => $target) {
            $target->enabled = false;
        }
        $config = [
            'levels' => ['error', 'warning', 'trace', 'info'],
            'logFile' => \Yii::getAlias($this->logDir) . DIRECTORY_SEPARATOR . $this->shortClassName() . '.log',
            'logVars'=>[], // Don't log all variables
            'enableRotation' => true, // Enable log rotation if log file exceed 10Mb as default
            'exportInterval'=>1, // Write each message to disk
            'except' => [
                'yii\db\*', // Don't include messages from db
            ],
        ];
        $targets['daemon'] = new \yii\log\FileTarget($config);
        \Yii::$app->getLog()->targets = $targets;
        \Yii::$app->getLog()->init();
        // Flush each message
        \Yii::$app->getLog()->flushInterval = 1;
    }

在PHP < 5.5.0上安装Proctitle

您需要在服务器上安装proctitle扩展才能安装yii2-daemon。对于debian 7,可以使用以下命令

# pecl install channel://pecl.php.net/proctitle-0.1.2
# echo "extension=proctitle.so" > /etc/php5/mods-available/proctitle.ini
# php5enmod proctitle