exsyst/worker

工作子进程管理

0.1.6 2015-12-08 16:52 UTC

This package is auto-updated.

Last update: 2024-09-09 14:29:10 UTC


README

Build Status Scrutinizer Code Quality

EXSyst Worker 组件

工作子进程管理

操作系统兼容性

此库使用特定于 Unix 的工具,例如 lsof(用于查找共享工作进程的进程 ID 并清除共享工作进程的文件描述符)和 getconf(用于查询处理器数量,如果您使用自动工作池大小调整)。如果您使用这些功能,请确保这些程序已安装并可由 PHP 执行。

此库在 Windows 上不受支持。

专用工作进程

专用工作进程是最简单的工作进程类型。它只有一个主进程,并且当其主进程对其的引用超出作用域时,其通信通道会关闭(这应该在其完成消息处理后立即停止它)。它允许在主进程之外独立运行代码。

以下是一个专用工作进程的最小示例。为了简洁起见,省略了 userequire

Master.php

<?php
$wf = new WorkerFactory();
$w = $wf->createWorker(MyWorkerImpl::class);
for ($i = 0; $i < 10; ++$i) {
  $w->sendMessage($i);
  var_dump($w->receiveMessage());
}

MyWorkerImpl.php 作为原始工作进程

<?php
class MyWorkerImpl implements RawWorkerImplementationInterface
{
  public function run(ChannelInterface $channel)
  {
    for (; ; ) {
      try {
        $message = $channel->receiveMessage();
      } catch (\UnderflowException $e) {
        // The master closed the connection (its Worker object went out of scope)
        break;
      }
      $channel->sendMessage($message . " squared is " . ($message * $message));
    }
  }
}

MyWorkerImpl.php 作为事件工作进程

事件工作进程需要 react/event-loop。对于简单用例,它们可能比原始工作进程更复杂,但对于复杂用例,它们的强大功能将导致通常更简单的工作进程实现。

<?php
class MyWorkerImpl implements EventedWorkerImplementationInterface
{
  public function setLoop(LoopInterface $loop) { }
  public function initialize() { }
  public function onConnect(ChannelInterface $channel, $peerName) { }
  public function onDisconnect(ChannelInterface $channel) { }
  public function terminate() { }

  public function onMessage($message, ChannelInterface $channel, $peerName)
  {
    $channel->sendMessage($message . " squared is " . ($message * $message));
  }
}

专用工作进程池

工作进程池可以简化多个并行、专用工作进程的管理。

以下是一个专用工作进程池的最小示例。与前面的示例一样,为了简洁起见,省略了 userequireMyWorkerImpl.php 是之前看到的文件之一。

MasterWithPool.php

<?php
$wf = new WorkerFactory();
$wp = $wf->createWorkerPool(MyWorkerImpl::class, 4);
$i = 0;
$busy = 0;
foreach ($wp as $w) {
  if ($i >= 10) {
    break;
  }
  $w->sendMessage($i);
  ++$busy;
  ++$i;
}
while ($i < 10) {
  // WorkerPool->receiveMessage takes an output parameter, which it fills
  // with the worker which actually received the returned message
  var_dump($wp->receiveMessage($w));
  $w->sendMessage($i);
  ++$i;
}
while ($busy > 0) {
  var_dump($wp->receiveMessage($w));
  --$busy;
}

共享工作进程

共享工作进程是一种事件工作进程(见上文),可以具有多个主进程。它不是使用其标准 I/O 流与单个主进程通信,而是监听一个套接字,任何主进程都可以随时连接和断开。

仅通过从所有主进程中断开连接无法停止共享工作进程。唯一优雅地停止它的方法是发送包含其“管理cookie”的适当消息,该cookie已在其引导配置文件中配置(见下文)。它也可以通过标准 POSIX 信号终止,但在此情况下,它可能会留下一些垃圾。

以下是一个共享工作进程的最小示例。与前面的示例一样,为了简洁起见,省略了 userequire

MasterOfShared.php

<?php
$wbsp = new WorkerBootstrapProfile();
$wbsp->setAdminCookie('This value is not so secret, change it in your app !');
$wf = new WorkerFactory($wbsp);
if ($argc > 1 and $argv[1] == '--stop') {
  $wf->stopSharedWorker('unix://' . __DIR__ . '/Worker.sock');
  exit;
}
$w = $wf->connectToSharedWorker('unix://' . __DIR__ . '/Worker.sock', MySharedWorkerImpl::class);
$w->sendMessage(($argc > 1) ? $argv[1] : 'world');
var_dump($w->receiveMessage());

MySharedWorkerImpl.php

<?php
class MySharedWorkerImpl implements SharedWorkerImplementationInterface
{
  private $i;

  public function __construct()
  {
    $this->i = 0;
  }

  public function setLoop(LoopInterface $loop) { }
  public function initialize() { }
  public function onConnect(ChannelInterface $channel, $peerName) { }
  public function onDisconnect(ChannelInterface $channel) { }
  public function onStop() { }
  public function terminate() { }

  public function onQuery($privileged)
  {
    return 'Current counter value : ' . $this->i;
  }

  public function onMessage($message, ChannelInterface $channel, $peerName)
  {
    $channel->sendMessage("Hello " . $message . " ! You're my " . self::ordinal(++$this->i) . " client.");
  }

  private static function ordinal($n)
  {
    $units = $n % 10;
    $tens = (($n % 100) - $units) / 10;
    if ($tens == 1) {
      return $n . 'th';
    } elseif ($units == 1) {
      return $n . 'st';
    } elseif ($units == 2) {
      return $n . 'nd';
    } elseif ($units == 3) {
      return $n . 'rd';
    } else {
      return $n . 'th';
    }
  }
}

查询共享工作进程的状态

您可以使用 $worker->query()$workerFactory->querySharedWorker($socketAddress) 向您的工作进程发送查询消息。如果您已配置“管理cookie”(见下文),则您的工

您的工

此模型旨在简化与监控系统(如 Nagios)的接口。如果您安装了 symfony/console,您甚至可以使用此包中提供的独立 check_shared_worker 命令作为 Nagios 插件。

优雅地停止共享工作进程

该库提供了两种优雅地停止共享工作线程的方法

  • 您可以在工作线程内部调用 SharedWorker::stopCurrent(),然后手动清理所有其他资源;
  • 您可以使用 $worker->stop()$workerFactory->stopSharedWorker($socketAddress) 向工作线程发送适当的停止消息,如果您已配置了“管理cookie”(见下文)。

如果使用“管理cookie”,并且共享工作线程拥有资源(例如,例如,子工作线程池或Ratchet服务器套接字)并已将其注册到事件循环中,则在其onStop方法中,必须注销它们或停止循环:只要还有资源注册到它,循环就不会自动停止,这将使您的工作线程无法停止,如果您忘记注销资源。

远程共享工作线程

共享工作线程支持监听Unix域套接字以及互联网域套接字。因此,它们可以暴露到网络中。

主进程可以连接到另一台机器上公开的网络共享工作线程,并且如果它知道其“管理cookie”,则可以停止它,但它无法远程启动共享工作线程。

警告:出于安全原因,请勿在公开网络的共享工作线程上使用SerializedChannelFactory(这是默认设置)(有关更多信息,请参阅unserialize)。相反,考虑使用使用安全格式的通道工厂,例如JsonChannelFactory

禁用共享工作线程

您可能想禁用共享工作线程,例如作为应用新版本部署的一部分。

为此,该库提供了一个“关闭开关”,形式为包含以下内容的JSON文件:

  • 一个标志,表示是否为当前引导配置全局禁用共享工作线程(并可能用于部署);
  • 一个套接字地址列表,指示哪些特定的工作线程被禁用。

您可以从工作线程工厂中操作“关闭开关”,例如

<?php
$wbsp = new WorkerBootstrapProfile();
$wbsp->setKillSwitchPath(__DIR__ . '/WorkerKillSwitch.json');
$wf = new WorkerFactory($wbsp);
// Save state :
$global = $wf->areSharedWorkersDisabledGlobally();
$addresses = $wf->getDisabledSharedWorkers();
// Disable them globally :
$wf->disableSharedWorkersGlobally();
// Disable the one from our previous example :
$wf->disableSharedWorker('unix://' . __DIR__ . '/Worker.sock');
// "Revert" our changes :
$wf->reEnableSharedWorker('unix://' . __DIR__ . '/Worker.sock');
$wf->reEnableSharedWorkersGlobally();
// Re-enable every single worker :
$wf->reEnableAllSharedWorkers();
// Really revert our changes :
if ($global) {
  $wf->disableSharedWorkersGlobally();
}
// If it fits between foreach and as, disable/reEnableSharedWorker will accept it and will process all of its elements in a single transaction.
$wf->disableSharedWorker($addresses);

请注意,禁用共享工作线程不会自动停止它们(并且全局禁用不能,因为库不会跟踪所有正在运行的工作线程的列表)。如果您想禁用正在运行的线程,您必须在禁用它们之后手动停止它们(为了避免竞争条件,它们可能在您停止它们之后、禁用它们之前重新启动)。

引导配置文件

此对象包含初始化工作线程所需的所有参数。该库旨在尝试提供强制性参数的默认值

  • phphhvm可执行文件路径和参数(默认情况下,将使用symfony/process自动检测)
  • 首选身份,这是工作线程将倾向于代表其运行的系统用户:如果它们作为root(或在Linux上使用CAP_SETUID),则将setuid为此用户(默认为无)
  • “第一阶段”部分,在执行任何脚本之前执行(默认为无)
  • 要执行的脚本(默认情况下,组件将尝试查找composer的自动加载器,除非被指示不查找)
  • “第二阶段”部分,在执行脚本后执行,但在创建工作线程实现之前(默认为无)
  • 将持有工作线程实现的变量名称(默认为workerImpl
  • 传递给工作实现构造函数的参数,当实际使用其构造函数(默认为无)创建它时;
  • “第3阶段”部分,在创建工作实现之后执行,并且可以使用指定的变量名引用它(默认为无);
  • 通道工厂,可以创建通道,并且必须是可序列化的(默认为来自 exsyst/ioSerializedChannelFactory);
  • 工作将评估的事件循环表达式,在“第3阶段”之后(默认为无,这将使子进程自动调用 Factory::create());
  • 工作将评估的套接字上下文表达式,在“第3阶段”之后(默认为无,这将使共享工作者的服务器套接字在没有上下文的情况下创建);
  • “管理cookie”,这是一个预先共享的秘密字符串,必须作为适当的消息发送给共享工作者,以从中获取扩展的状态信息或使其优雅地停止(默认为无,这将使得无法使用此机制优雅地停止共享工作者);
  • 终止开关路径:终止开关是一个JSON文件,指示是否全局禁用了共享工作者,如果没有,则指定哪些具体的共享工作者被禁用(默认为无,这使得无法禁用共享工作者);
  • 预编译的脚本映射,允许重用相同的脚本供每个使用相同实现的工作者使用,而不是使用“在 /tmp 生成、运行一次然后删除”的方法(默认为无)。

如果您在创建工作工厂时没有指定引导配置文件,它将自动创建一个,所有参数的默认值。