exsyst / worker
工作子进程管理
Requires
- php: >=5.5
- exsyst/io: >=0.1 <0.2
- symfony/process: ~2.3|~3.0
Requires (Dev)
- react/event-loop: >=0.1 <0.5
- symfony/phpunit-bridge: ~2.8|~3.0
Suggests
- react/event-loop: Allows spawning evented (and shared) workers
- symfony/console: For the standalone bin/check_shared_worker command
README
EXSyst Worker 组件
工作子进程管理
操作系统兼容性
此库使用特定于 Unix 的工具,例如 lsof
(用于查找共享工作进程的进程 ID 并清除共享工作进程的文件描述符)和 getconf
(用于查询处理器数量,如果您使用自动工作池大小调整)。如果您使用这些功能,请确保这些程序已安装并可由 PHP 执行。
此库在 Windows 上不受支持。
专用工作进程
专用工作进程是最简单的工作进程类型。它只有一个主进程,并且当其主进程对其的引用超出作用域时,其通信通道会关闭(这应该在其完成消息处理后立即停止它)。它允许在主进程之外独立运行代码。
以下是一个专用工作进程的最小示例。为了简洁起见,省略了 use
和 require
。
Master.php
<?php $wf = new WorkerFactory(); $w = $wf->createWorker(MyWorkerImpl::class); for ($i = 0; $i < 10; ++$i) { $w->sendMessage($i); var_dump($w->receiveMessage()); }
MyWorkerImpl.php
作为原始工作进程
<?php class MyWorkerImpl implements RawWorkerImplementationInterface { public function run(ChannelInterface $channel) { for (; ; ) { try { $message = $channel->receiveMessage(); } catch (\UnderflowException $e) { // The master closed the connection (its Worker object went out of scope) break; } $channel->sendMessage($message . " squared is " . ($message * $message)); } } }
MyWorkerImpl.php
作为事件工作进程
事件工作进程需要 react/event-loop
。对于简单用例,它们可能比原始工作进程更复杂,但对于复杂用例,它们的强大功能将导致通常更简单的工作进程实现。
<?php class MyWorkerImpl implements EventedWorkerImplementationInterface { public function setLoop(LoopInterface $loop) { } public function initialize() { } public function onConnect(ChannelInterface $channel, $peerName) { } public function onDisconnect(ChannelInterface $channel) { } public function terminate() { } public function onMessage($message, ChannelInterface $channel, $peerName) { $channel->sendMessage($message . " squared is " . ($message * $message)); } }
专用工作进程池
工作进程池可以简化多个并行、专用工作进程的管理。
以下是一个专用工作进程池的最小示例。与前面的示例一样,为了简洁起见,省略了 use
和 require
。 MyWorkerImpl.php
是之前看到的文件之一。
MasterWithPool.php
<?php $wf = new WorkerFactory(); $wp = $wf->createWorkerPool(MyWorkerImpl::class, 4); $i = 0; $busy = 0; foreach ($wp as $w) { if ($i >= 10) { break; } $w->sendMessage($i); ++$busy; ++$i; } while ($i < 10) { // WorkerPool->receiveMessage takes an output parameter, which it fills // with the worker which actually received the returned message var_dump($wp->receiveMessage($w)); $w->sendMessage($i); ++$i; } while ($busy > 0) { var_dump($wp->receiveMessage($w)); --$busy; }
共享工作进程
共享工作进程是一种事件工作进程(见上文),可以具有多个主进程。它不是使用其标准 I/O 流与单个主进程通信,而是监听一个套接字,任何主进程都可以随时连接和断开。
仅通过从所有主进程中断开连接无法停止共享工作进程。唯一优雅地停止它的方法是发送包含其“管理cookie”的适当消息,该cookie已在其引导配置文件中配置(见下文)。它也可以通过标准 POSIX 信号终止,但在此情况下,它可能会留下一些垃圾。
以下是一个共享工作进程的最小示例。与前面的示例一样,为了简洁起见,省略了 use
和 require
。
MasterOfShared.php
<?php $wbsp = new WorkerBootstrapProfile(); $wbsp->setAdminCookie('This value is not so secret, change it in your app !'); $wf = new WorkerFactory($wbsp); if ($argc > 1 and $argv[1] == '--stop') { $wf->stopSharedWorker('unix://' . __DIR__ . '/Worker.sock'); exit; } $w = $wf->connectToSharedWorker('unix://' . __DIR__ . '/Worker.sock', MySharedWorkerImpl::class); $w->sendMessage(($argc > 1) ? $argv[1] : 'world'); var_dump($w->receiveMessage());
MySharedWorkerImpl.php
<?php class MySharedWorkerImpl implements SharedWorkerImplementationInterface { private $i; public function __construct() { $this->i = 0; } public function setLoop(LoopInterface $loop) { } public function initialize() { } public function onConnect(ChannelInterface $channel, $peerName) { } public function onDisconnect(ChannelInterface $channel) { } public function onStop() { } public function terminate() { } public function onQuery($privileged) { return 'Current counter value : ' . $this->i; } public function onMessage($message, ChannelInterface $channel, $peerName) { $channel->sendMessage("Hello " . $message . " ! You're my " . self::ordinal(++$this->i) . " client."); } private static function ordinal($n) { $units = $n % 10; $tens = (($n % 100) - $units) / 10; if ($tens == 1) { return $n . 'th'; } elseif ($units == 1) { return $n . 'st'; } elseif ($units == 2) { return $n . 'nd'; } elseif ($units == 3) { return $n . 'rd'; } else { return $n . 'th'; } } }
查询共享工作进程的状态
您可以使用 $worker->query()
或 $workerFactory->querySharedWorker($socketAddress)
向您的工作进程发送查询消息。如果您已配置“管理cookie”(见下文),则您的工
您的工
此模型旨在简化与监控系统(如 Nagios)的接口。如果您安装了 symfony/console
,您甚至可以使用此包中提供的独立 check_shared_worker
命令作为 Nagios 插件。
优雅地停止共享工作进程
该库提供了两种优雅地停止共享工作线程的方法
- 您可以在工作线程内部调用
SharedWorker::stopCurrent()
,然后手动清理所有其他资源; - 您可以使用
$worker->stop()
或$workerFactory->stopSharedWorker($socketAddress)
向工作线程发送适当的停止消息,如果您已配置了“管理cookie”(见下文)。
如果使用“管理cookie”,并且共享工作线程拥有资源(例如,例如,子工作线程池或Ratchet服务器套接字)并已将其注册到事件循环中,则在其onStop
方法中,必须注销它们或停止循环:只要还有资源注册到它,循环就不会自动停止,这将使您的工作线程无法停止,如果您忘记注销资源。
远程共享工作线程
共享工作线程支持监听Unix域套接字以及互联网域套接字。因此,它们可以暴露到网络中。
主进程可以连接到另一台机器上公开的网络共享工作线程,并且如果它知道其“管理cookie”,则可以停止它,但它无法远程启动共享工作线程。
警告:出于安全原因,请勿在公开网络的共享工作线程上使用SerializedChannelFactory
(这是默认设置)(有关更多信息,请参阅unserialize
)。相反,考虑使用使用安全格式的通道工厂,例如JsonChannelFactory
。
禁用共享工作线程
您可能想禁用共享工作线程,例如作为应用新版本部署的一部分。
为此,该库提供了一个“关闭开关”,形式为包含以下内容的JSON文件:
- 一个标志,表示是否为当前引导配置全局禁用共享工作线程(并可能用于部署);
- 一个套接字地址列表,指示哪些特定的工作线程被禁用。
您可以从工作线程工厂中操作“关闭开关”,例如
<?php $wbsp = new WorkerBootstrapProfile(); $wbsp->setKillSwitchPath(__DIR__ . '/WorkerKillSwitch.json'); $wf = new WorkerFactory($wbsp); // Save state : $global = $wf->areSharedWorkersDisabledGlobally(); $addresses = $wf->getDisabledSharedWorkers(); // Disable them globally : $wf->disableSharedWorkersGlobally(); // Disable the one from our previous example : $wf->disableSharedWorker('unix://' . __DIR__ . '/Worker.sock'); // "Revert" our changes : $wf->reEnableSharedWorker('unix://' . __DIR__ . '/Worker.sock'); $wf->reEnableSharedWorkersGlobally(); // Re-enable every single worker : $wf->reEnableAllSharedWorkers(); // Really revert our changes : if ($global) { $wf->disableSharedWorkersGlobally(); } // If it fits between foreach and as, disable/reEnableSharedWorker will accept it and will process all of its elements in a single transaction. $wf->disableSharedWorker($addresses);
请注意,禁用共享工作线程不会自动停止它们(并且全局禁用不能,因为库不会跟踪所有正在运行的工作线程的列表)。如果您想禁用正在运行的线程,您必须在禁用它们之后手动停止它们(为了避免竞争条件,它们可能在您停止它们之后、禁用它们之前重新启动)。
引导配置文件
此对象包含初始化工作线程所需的所有参数。该库旨在尝试提供强制性参数的默认值
php
或hhvm
可执行文件路径和参数(默认情况下,将使用symfony/process
自动检测)- 首选身份,这是工作线程将倾向于代表其运行的系统用户:如果它们作为
root
(或在Linux上使用CAP_SETUID
),则将setuid
为此用户(默认为无) - “第一阶段”部分,在执行任何脚本之前执行(默认为无)
- 要执行的脚本(默认情况下,组件将尝试查找
composer
的自动加载器,除非被指示不查找) - “第二阶段”部分,在执行脚本后执行,但在创建工作线程实现之前(默认为无)
- 将持有工作线程实现的变量名称(默认为
workerImpl
) - 传递给工作实现构造函数的参数,当实际使用其构造函数(默认为无)创建它时;
- “第3阶段”部分,在创建工作实现之后执行,并且可以使用指定的变量名引用它(默认为无);
- 通道工厂,可以创建通道,并且必须是可序列化的(默认为来自
exsyst/io
的SerializedChannelFactory
); - 工作将评估的事件循环表达式,在“第3阶段”之后(默认为无,这将使子进程自动调用
Factory::create()
); - 工作将评估的套接字上下文表达式,在“第3阶段”之后(默认为无,这将使共享工作者的服务器套接字在没有上下文的情况下创建);
- “管理cookie”,这是一个预先共享的秘密字符串,必须作为适当的消息发送给共享工作者,以从中获取扩展的状态信息或使其优雅地停止(默认为无,这将使得无法使用此机制优雅地停止共享工作者);
- 终止开关路径:终止开关是一个JSON文件,指示是否全局禁用了共享工作者,如果没有,则指定哪些具体的共享工作者被禁用(默认为无,这使得无法禁用共享工作者);
- 预编译的脚本映射,允许重用相同的脚本供每个使用相同实现的工作者使用,而不是使用“在
/tmp
生成、运行一次然后删除”的方法(默认为无)。
如果您在创建工作工厂时没有指定引导配置文件,它将自动创建一个,所有参数的默认值。