citysites / yii2-daemon
扩展提供简单守护进程创建和控制的函数
0.2.6
2018-07-22 13:25 UTC
Requires
- ext-pcntl: *
- ext-posix: *
- yiisoft/yii2: *
README
扩展提供简单守护进程创建和控制的函数。还有一个守护进程监督器。
安装
安装此扩展的首选方式是通过 composer。
运行以下命令之一:
php composer.phar require --prefer-dist advissmedia/yii2-daemon "*"
或者在您的 composer.json
文件的 require 部分添加:
"advissmedia/yii2-daemon": "*"
设置 WatcherDaemon(监督器)
WatcherDaemon 是主要的守护进程,提供开箱即用的功能。此守护进程检查其他守护进程并运行,如果需要。按照以下步骤操作:
- 在您的控制台控制器路径中创建文件,例如 DaemonsSupervisorController.php,内容如下:
<?php
namespace console\controllers;
use \advissmedia\daemon\controllers\WatcherDaemonController;
class DaemonsSupervisorController extends WatcherDaemonController
{
/**
* @return array
*/
protected function getDaemonsList()
{
//TODO: modify list, or get it from config, it does not matter
$daemons = [
['className' => 'FirstDaemonController', 'enabled' => true, 'hardKill' => false],
['className' => 'AnotherDaemonController', 'enabled' => false, 'hardKill' => true]
];
return $daemons;
}
}
或者从文件中获取此作业数组,例如 JSON 数组
<?php
namespace console\controllers;
use \advissmedia\daemon\controllers\WatcherDaemonController;
class DaemonsSupervisorController extends WatcherDaemonController
{
/**
* @return array
*/
protected function getDaemonsList()
{
$string = @file_get_contents("/url/or/uri/to/json/file.json");
if ($string === false || empty($daemons = json_decode($string,true)))
return [];
return $daemons;
}
}
并且 file.json
格式
[
{
"daemon":"FirstDaemonController",
"enabled":true,
"hardKill":false
},
{
"daemon":"AnotherDaemonController",
"enabled":false,
"hardKill":false
}
]
或者从数据库中获取正在运行的守护进程列表
- 没有人在检查 Watcher。Watcher 应该持续运行。将其添加到您的 crontab 中
* * * * * /path/to/yii/project/yii watcher-daemon --demonize=1
Watcher 不能启动两次,一次只能有一个实例在工作。
用法
创建新的守护进程
- 在您的控制台控制器路径中创建文件 {NAME}DaemonController.php,内容如下:
<?php
namespace console\controllers;
use \advissmedia\daemon\DaemonController;
class {NAME}DaemonController extends DaemonController
{
/**
* @return array
*/
protected function defineJobs()
{
/*
TODO: return task list, extracted from DB, queue managers and so on.
Extract tasks in small portions, to reduce memory usage.
*/
}
/**
* @return jobtype
*/
protected function doJob($job)
{
/*
TODO: implement you logic
Don't forget to mark task as completed in your task source
*/
}
const DEF_FIRST_JOB = 1;
const DEF_SOCOND_JOB = 3;
/**
* {@inheritdoc}
*/
public function init()
{
parent::init();
/*
* Set type of jobs list can be processed
*/
$this->isJobsListPersistent = true;
/*
* Array connection name for renew connections for daemon process
*/
$this->connections = ['pgdb', 'sqltdb'];
}
/**
* @return array
*/
protected function defineJobs()
{
/*
TODO: return task list, extracted from DB, queue managers or jast array.
Extract tasks in small portions, to reduce memory usage.
*/
return [
['job_id' => self::DEF_FIRST_JOB],
['job_id' => self::DEF_SECOND_JOB]
];
}
/**
* @param array $job
* @return bool
* @throws \Exception
* @throws \Throwable
* @throws \yii\db\Exception
*/
protected function doJob($job)
{
/*
TODO: implement you logic
Don't forget to mark task as completed in your task source
*/
if (array_key_exists('job_id', $job)) {
switch ($job['job_id']) {
case self::DEF_FIRST_JOB:
$ret = $this->processFirstJob();
break;
case self::DEF_SECOND_JOB:
$ret = $this->processSecondJob();
break;
default:
$ret = false;
}
return $ret;
}
return false;
}
}
- 实现逻辑。
- 将新守护进程添加到监督器的守护进程列表中。
与 RabbitMQ 一起使用(此示例需要 "videlalvaro/php-amqplib" 包)
<?php
namespace console\controllers\daemons;
use console\components\controllers\BaseDaemonController;
use PhpAmqpLib\Channel\AMQPChannel;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
/**
* Class SomeRabbitQueueController
*/
class SomeRabbitQueueController extends BaseDaemonController
{
/**
* @var $connection AMQPStreamConnection
*/
protected $connection;
/**
* @var $connection AMQPChannel
*/
protected $channel;
/**
* @return array|bool
*/
protected function defineJobs()
{
$channel = $this->getQueue();
while (count($channel->callbacks)) {
try {
$channel->wait(null, true, 5);
} catch (\PhpAmqpLib\Exception\AMQPTimeoutException $timeout) {
} catch (\PhpAmqpLib\Exception\AMQPRuntimeException $runtime) {
\Yii::error($runtime->getMessage());
$this->channel = null;
$this->connection = null;
}
}
return false;
}
/**
* @param AMQPMessage $job
* @return bool
* @throws NotSupportedException
* @SuppressWarnings(PHPMD.UnusedFormalParameter)
*/
protected function doJob($job)
{
$result = false;
//do somethink here and set $result
if ($result) {
$this->ask($job);
} else {
$this->nask($job);
}
return $result;
}
/**
* @return AMQPChannel
* @throws InvalidParamException
*/
protected function getQueue()
{
if ($this->channel == null) {
if ($this->connection == null) {
if (isset(\Yii::$app->params['rabbit'])) {
$rabbit = \Yii::$app->params['rabbit'];
} else {
throw new InvalidParamException('Bad config RabbitMQ');
}
$this->connection = new AMQPStreamConnection($rabbit['host'], $rabbit['port'], $rabbit['user'], $rabbit['password']);
}
$this->channel = $this->connection->channel();
$this->channel->exchange_declare($this->exchange, $this->type, false, true, false);
$args = [];
if ($this->dlx) {
$args['x-dead-letter-exchange'] = ['S', $this->exchange];
$args['x-dead-letter-routing-key'] = ['S',$this->dlx];
}
if ($this->max_length) {
$args['x-max-length'] = ['I', $this->max_length];
}
if ($this->max_bytes) {
$args['x-max-length-bytes'] = ['I', $this->max_bytes];
}
if ($this->max_priority) {
$args['x-max-priority'] = ['I', $this->max_priority];
}
list($queue_name, ,) = $this->channel->queue_declare($this->queue_name, false, true, false, false, false, $args);
foreach ($this->binding_keys as $binding_key) {
$this->channel->queue_bind($queue_name, $this->exchange, $binding_key);
}
$this->channel->basic_consume($queue_name, '', false, false, false, false, [$this, 'doJob']);
}
return $this->channel;
}
/**
* @param $job
*/
protected function ask($job)
{
$job->delivery_info['channel']->basic_ack($job->delivery_info['delivery_tag']);
}
/**
* @param $job
*/
protected function nask($job)
{
$job->delivery_info['channel']->basic_nack($job->delivery_info['delivery_tag']);
}
}
守护进程设置(属性)
在您的守护进程中,您可以覆盖父属性
$demonize
- 如果为 0,守护进程不以守护进程的形式运行,而仅作为简单的控制台应用程序。这需要用于调试。$memoryLimit
- 如果守护进程达到此限制,则守护进程停止工作。这可以防止内存泄漏。停止 WatcherDaemon 后再次运行此守护进程。$sleep
- 检查新任务之间的延迟,如果任务列表已满,则守护进程不会休眠。$pidDir
- 守护进程 pid 所在的目录$logDir
- 守护进程日志所在的目录$isMultiInstance
- 此选项允许守护进程为每个任务创建自复制。也就是说,守护进程可以同时执行多个任务。当一项任务需要一些时间且服务器资源允许执行许多此类任务时,这很有用。$maxChildProcesses
- 只有当$isMultiInstance=true
时。守护进程实例的最大数量。如果达到最大数量,则系统将等待至少一个子进程终止。$isJobsListPersistent
- 改变作业列表处理方法。如果为 false,则在有作业和空闲工作者的情况下始终处理作业列表。如果为 true,则从第一个作业到最后一个作业处理作业列表,并在睡眠时间后使工作者再次开始处理第一个作业。
守护进程控制
守护进程捕获一些信号并执行一些操作。以下是捕获的信号和守护进程在收到这些信号时的操作列表
- SIGINT,SIGTERM - 守护进程关闭,从循环中跳出写入日志数据并关闭使用的资源。
- SIGCHLD - 关闭所有子工作者,主守护进程进程仍然工作。
- SIGUSR1 - 将有关守护进程运行时间的 Linux 系统日志信息写入。
日志设置
如果您想更改日志设置,可以覆盖 initLogger 函数。示例
/**
* Adjusting logger. You can override it.
*/
protected function initLogger()
{
$targets = \Yii::$app->getLog()->targets;
foreach ($targets as $name => $target) {
$target->enabled = false;
}
$config = [
'levels' => ['error', 'warning', 'trace', 'info'],
'logFile' => \Yii::getAlias($this->logDir) . DIRECTORY_SEPARATOR . $this->shortClassName() . '.log',
'logVars'=>[], // Don't log all variables
'enableRotation' => true, // Enable log rotation if log file exceed 10Mb as default
'exportInterval'=>1, // Write each message to disk
'except' => [
'yii\db\*', // Don't include messages from db
],
];
$targets['daemon'] = new \yii\log\FileTarget($config);
\Yii::$app->getLog()->targets = $targets;
\Yii::$app->getLog()->init();
// Flush each message
\Yii::$app->getLog()->flushInterval = 1;
}
在 PHP < 5.5.0 上安装 Proctitle
您需要在您的服务器上安装 proctitle 扩展才能安装 yii2-daemon。对于 Debian 7,可以使用以下命令
# pecl install channel://pecl.php.net/proctitle-0.1.2
# echo "extension=proctitle.so" > /etc/php5/mods-available/proctitle.ini
# php5enmod proctitle