windbridges/process-messaging

允许将进程输出序列化为消息

dev-master 2021-09-10 19:47 UTC

This package is auto-updated.

Last update: 2024-09-11 02:18:56 UTC


README

安装

composer require windbridges/process-messaging

关于

此包有助于与子进程通信。子进程使用 ProcessMessaging::send() 将序列化的消息写入 STDOUT,父进程使用 WindBridges\ProcessMessaging\Process 捕获这些消息,它是对 Symfony Process 的包装 (https://github.com/symfony/process)。 ProcessMessaging 还会捕获任何 echo,并支持 Symfony 的 VarDumper 输出(以及 dump/dd 输出)。

  • 捕获子进程的每个 echo 消息
  • 捕获使用 ProcessMessaging::send 发送的定制消息
  • 捕获从子脚本抛出的异常

用法

处理 echo

在调用 ProcessMessaging::handleAll() 之后,每个 echo 输出都将序列化为一个消息,该消息可以在父 PHP 进程中反序列化。

// child.php

use WindBridges\ProcessMessaging\ProcessMessaging;

require "vendor/autoload.php";

ProcessMessaging::handleAll();

echo 'Echo message text';
// parent.php
use WindBridges\ProcessMessaging\Process;

require "vendor/autoload.php";

$proc = new Process(['php', 'child.php']);

$proc->onEcho(function (string $buffer) {
    // $buffer contains 'Echo message text' here 
    echo "Output from parent: $buffer\n";
});

$proc->run();

注意! 如果在发送任何输出之前没有调用 ProcessMessaging::handleAll(),则将中断执行,因为父进程将尝试反序列化纯输出。然而,在开发期间手动运行子脚本可能很有用。例如

// child.php

use Symfony\Component\Console\Input\ArgvInput;
use Symfony\Component\Console\Input\InputArgument;
use Symfony\Component\Console\Input\InputDefinition;
use WindBridges\ProcessMessaging\ProcessMessaging;

require "vendor/autoload.php";

// Let's set PROCESS_MESSAGING env variable when creating process to understand if we launched child from parent or manually
if (getenv('PROCESS_MESSAGING')) {
    // Child is launched from parent process,
    // so call handleAll() to serialize messages
    ProcessMessaging::handleAll();
    // And read input data from STDIN
    $data = json_decode(fgets(STDIN), true);
} else {
    // If we are here, then we launched the child manually,
    // so don't call handleAll(), and read input from options
    // (or any other way you want)  
    $input = new ArgvInput($argv, new InputDefinition([
        new InputArgument('n1', InputArgument::REQUIRED)
    ]));

    $data = [
        'n1' => $input->getArgument('n1')
    ];
}
发送自定义消息

此包的主要功能是发送自定义消息。它们可以包含标量数据、数组、对象等,但不能包含资源和其他无法序列化的东西。

// child.php

use WindBridges\ProcessMessaging\ProcessMessaging;

require "vendor/autoload.php";

class MyObject
{
    protected $a;
}

ProcessMessaging::send('A string');
ProcessMessaging::send([1, 2, 3]);
ProcessMessaging::send(new MyObject());
// ...
// parent.php
use WindBridges\ProcessMessaging\Process;

require "vendor/autoload.php";

$proc = new Process(['php', 'child.php']);

$proc->onMessage(function ($object) {
    // Get your $object here 
    var_dump($object);
});

$proc->run();
处理异常

异常会自动包装到 SerializableException 类中,以防止堆栈跟踪的序列化。跟踪作为 traceAsString() 方法的返回结果存储。

// child.php

use WindBridges\ProcessMessaging\ProcessMessaging;

require "vendor/autoload.php";

ProcessMessaging::handleAll();

throw new Exception('Test exception');
// parent.php

use WindBridges\ProcessMessaging\Process;
use WindBridges\ProcessMessaging\SerializableException;

require "vendor/autoload.php";

$proc = new Process(['php', 'child.php']);

$proc->onException(function (SerializableException $exception)  {
    echo $exception->getMessage();
});

$proc->run();

进程池

使用进程池,我们可以以有限的并发性运行多个进程。

// parent.php

use WindBridges\ProcessMessaging\Process;
use WindBridges\ProcessMessaging\ProcessPool;

require "vendor/autoload.php";

$process = new Process(
    ['php', 'child.php'], // commandline
    null,
    ['CHILD_PROC' => true], // tell to the child that it launched from parent 
    null,
    86400 // process timeout
); 

$process->setTag('Child');

$data = [1, 2, 3, 4, 5];

$pool = new ProcessPool(function () use($process, $data) {
    // spawn separate process for each data item
    foreach($data as $n) {
        $process = clone $process;
        $process->setInput(json_encode(['n' => $n]));
        yield $process;    
    }
});

$pool->setConcurrency(5);
$pool->run();
// child.php

require "vendor/autoload.php";