windbridges / process-messaging
允许将进程输出序列化为消息
dev-master
2021-09-10 19:47 UTC
Requires
- symfony/process: 5.3.x-dev
Requires (Dev)
- phpunit/phpunit: 9.5.x-dev
This package is auto-updated.
Last update: 2024-09-11 02:18:56 UTC
README
安装
composer require windbridges/process-messaging
关于
此包有助于与子进程通信。子进程使用 ProcessMessaging::send() 将序列化的消息写入 STDOUT,父进程使用 WindBridges\ProcessMessaging\Process 捕获这些消息,它是对 Symfony Process 的包装 (https://github.com/symfony/process)。 ProcessMessaging 还会捕获任何 echo,并支持 Symfony 的 VarDumper 输出(以及 dump/dd 输出)。
- 捕获子进程的每个
echo消息 - 捕获使用
ProcessMessaging::send发送的定制消息 - 捕获从子脚本抛出的异常
用法
处理 echo
在调用 ProcessMessaging::handleAll() 之后,每个 echo 输出都将序列化为一个消息,该消息可以在父 PHP 进程中反序列化。
// child.php use WindBridges\ProcessMessaging\ProcessMessaging; require "vendor/autoload.php"; ProcessMessaging::handleAll(); echo 'Echo message text';
// parent.php use WindBridges\ProcessMessaging\Process; require "vendor/autoload.php"; $proc = new Process(['php', 'child.php']); $proc->onEcho(function (string $buffer) { // $buffer contains 'Echo message text' here echo "Output from parent: $buffer\n"; }); $proc->run();
注意! 如果在发送任何输出之前没有调用 ProcessMessaging::handleAll(),则将中断执行,因为父进程将尝试反序列化纯输出。然而,在开发期间手动运行子脚本可能很有用。例如
// child.php use Symfony\Component\Console\Input\ArgvInput; use Symfony\Component\Console\Input\InputArgument; use Symfony\Component\Console\Input\InputDefinition; use WindBridges\ProcessMessaging\ProcessMessaging; require "vendor/autoload.php"; // Let's set PROCESS_MESSAGING env variable when creating process to understand if we launched child from parent or manually if (getenv('PROCESS_MESSAGING')) { // Child is launched from parent process, // so call handleAll() to serialize messages ProcessMessaging::handleAll(); // And read input data from STDIN $data = json_decode(fgets(STDIN), true); } else { // If we are here, then we launched the child manually, // so don't call handleAll(), and read input from options // (or any other way you want) $input = new ArgvInput($argv, new InputDefinition([ new InputArgument('n1', InputArgument::REQUIRED) ])); $data = [ 'n1' => $input->getArgument('n1') ]; }
发送自定义消息
此包的主要功能是发送自定义消息。它们可以包含标量数据、数组、对象等,但不能包含资源和其他无法序列化的东西。
// child.php use WindBridges\ProcessMessaging\ProcessMessaging; require "vendor/autoload.php"; class MyObject { protected $a; } ProcessMessaging::send('A string'); ProcessMessaging::send([1, 2, 3]); ProcessMessaging::send(new MyObject()); // ...
// parent.php use WindBridges\ProcessMessaging\Process; require "vendor/autoload.php"; $proc = new Process(['php', 'child.php']); $proc->onMessage(function ($object) { // Get your $object here var_dump($object); }); $proc->run();
处理异常
异常会自动包装到 SerializableException 类中,以防止堆栈跟踪的序列化。跟踪作为 traceAsString() 方法的返回结果存储。
// child.php use WindBridges\ProcessMessaging\ProcessMessaging; require "vendor/autoload.php"; ProcessMessaging::handleAll(); throw new Exception('Test exception');
// parent.php use WindBridges\ProcessMessaging\Process; use WindBridges\ProcessMessaging\SerializableException; require "vendor/autoload.php"; $proc = new Process(['php', 'child.php']); $proc->onException(function (SerializableException $exception) { echo $exception->getMessage(); }); $proc->run();
进程池
使用进程池,我们可以以有限的并发性运行多个进程。
// parent.php use WindBridges\ProcessMessaging\Process; use WindBridges\ProcessMessaging\ProcessPool; require "vendor/autoload.php"; $process = new Process( ['php', 'child.php'], // commandline null, ['CHILD_PROC' => true], // tell to the child that it launched from parent null, 86400 // process timeout ); $process->setTag('Child'); $data = [1, 2, 3, 4, 5]; $pool = new ProcessPool(function () use($process, $data) { // spawn separate process for each data item foreach($data as $n) { $process = clone $process; $process->setInput(json_encode(['n' => $n])); yield $process; } }); $pool->setConcurrency(5); $pool->run();
// child.php require "vendor/autoload.php";