symplely/spawn

一个简单的 `uv_spawn` 或 `proc-open` 包装 API,用于执行和管理子进程池,实现并行/异步 PHP 阻塞 I/O。

5.0.1 2023-03-07 16:40 UTC

README

LinuxWindowsmacOScodecovCodacy BadgeMaintainability

一个简单的 uv_spawnproc-open 包装 API,用于 执行管理 一组 子进程,实现并行/异步 PHP 阻塞 I/O。

目录

本包使用 libuv 的功能,Node.js 库的 PHP 扩展 ext-uv。它使用 uv_spawn 函数启动进程。其性能优于 pcntl-extension 或使用 proc_open。如果未安装 libuv,则此包将回退到使用 [symfony/process]。

此包是我们 symplely/coroutine 包的一部分,用于处理任何无法由 Coroutine 本地处理的 阻塞 I/O 进程。

要了解更多关于 libuv 的功能,请阅读在线教程

版本 3x 及以上版本的术语已更改,以与 ext-parallel 扩展使用保持一致,并作为 Thread 运行,但不受该库扩展限制的许多限制。

ChanneledFuture 类都设计为可扩展,以创建基于 Parallel 的库的自定义 实现。目前需要 libuv 来获得实现的最大好处。

安装

composer require symplely/spawn

如果可用,此包将使用 libuv 功能。执行以下操作之一进行安装。

对于类似 Debian 的发行版,Ubuntu...

apt-get install libuv1-dev php-pear -y

对于类似 RedHat 的发行版,CentOS...

yum install libuv-devel php-pear -y

现在可以通过 Pecl 自动编译、安装和设置。

pecl channel-update pecl.php.net
pecl install uv-beta

对于 Windows,有好的消息,通过 libuv 的本地 async 已到来。

稳定 PHP 版本的 Windows 构建可以从 PECL 获取。

直接从 https://windows.php.net/downloads/pecl/releases/uv/ 下载最新版本

libuv.dll 提取到与 PHP 二进制可执行文件相同的目录,并将 php_uv.dll 提取到 ext\ 目录。

在 php.ini 中启用扩展 php_sockets.dllphp_uv.dll

cd C:\Php
Invoke-WebRequest "https://windows.php.net/downloads/pecl/releases/uv/0.2.4/php_uv-0.2.4-7.2-ts-vc15-x64.zip" -OutFile "php_uv-0.2.4.zip"
#Invoke-WebRequest "https://windows.php.net/downloads/pecl/releases/uv/0.2.4/php_uv-0.2.4-7.3-nts-vc15-x64.zip" -OutFile "php_uv-0.2.4.zip"
#Invoke-WebRequest "https://windows.php.net/downloads/pecl/releases/uv/0.2.4/php_uv-0.2.4-7.4-ts-vc15-x64.zip" -OutFile "php_uv-0.2.4.zip"
7z x -y php_uv-0.2.4.zip libuv.dll php_uv.dll
copy php_uv.dll ext\php_uv.dll
del php_uv.dll
del php_uv-0.2.4.zip
echo extension=php_sockets.dll >> php.ini
echo extension=php_uv.dll >> php.ini

用法

include 'vendor/autoload.php';

use Async\Spawn\Spawn;

// Shows output by default and Channel instance is extracted from args.
$future = \parallel($function, ...$args)
// Shows output by default, turns on yield usage, can include additional file, and the Channel instance is extracted from args.
$future = \paralleling($function, $includeFile, ...$args)
// Or Does not show output by default and channel instance has to be explicitly passed in.
$future = \spawn($function, $timeout, $channel)
// Or Show output by default and channel instance has to be explicitly passed in.
$future = \spawning($function, $timeout, $channel)
// Or
$future = Spawn::create(function () use ($thing) {
    // Do a thing
    }, $timeout, $channel)
    ->then(function ($output) {
        // Handle success
    })->catch(function (\Throwable $exception) {
        // Handle exception
});

// Wait for `Future` to terminate. Note this should only be executed for local testing only.
// Use "How to integrate into your project/package" section instead.
// Second option can be used to set to display child output, default is false
\spawn_run($future, true);
// Or same as
$future->displayOn()->run();
// Or
$future->run();

通道在子进程和父进程之间传输消息

该功能已完全重新设计,以与 PHP ext-parallel 扩展类似。

请参阅 Channel 页面以获取真实示例。

include 'vendor/autoload.php';

use Async\Spawn\Channeled as Channel;

$channel = Channel::make("io");

// Shows output by default and Channel instance is extracted for args.
$future = parallel(function ($channel) {
  $channel = Channel::open($channel);

  for ($count = 0; $count <= 10; $count++) {
    $channel->send($count);
  }

  echo 'pingpangpong';
  $channel->send(false);

  return 'return whatever';
}, (string) $channel);

while (($value = $channel->recv()) !== false) {
  var_dump($value);
}

echo \spawn_output($future); // pingpangpong
// Or
echo \spawn_result($future); // return whatever
// Or
echo $future->getResult(); // return whatever

事件钩子

在创建异步进程时,您将返回一个 FutureInterface 实例。您可以在 Future 进程上添加以下事件 回调 钩子。

// Shows output by default and Channel instance is extracted for args.
$future = parallel($function, ...$args)
// Or
$future = spawn($function, $timeout, $channel)
// Or
$future = Spawn::create(function () {
        // The second argument is optional, Defaults no timeout,
        // it sets The maximum amount of time a process may take to finish in seconds
        // The third is the Channel instance pass to Future subprocess.

        return `whatever`|Object|Closure|; // `whatever` will be encoded, then decoded by parent.
    }, int $timeout = 0 , $input = null)
    ->then(function ($result) {
        // On success, `$result` is returned by the process.
    })
    ->catch(function ($exception) {
        // When an exception is thrown from within a process, it's caught and passed here.
    })
    ->timeout(function () {
        // When an timeout is reached, it's caught and passed here.
    })
    ->progress(function ($type, $data) {
        // Live progressing of output: `$type, $data` is returned by the Future process.
        // $type is `ERR` for stderr, or `OUT` for stdout.
    })
    ->signal($signal, function ($signal) {
        // The process will be sent termination `signal` and stopped.
        // When an signal is triggered, it's caught and passed here.
        // This feature is only available using `libuv`.
    });
->then(function ($result) {
    // On success, `$result` is returned by the Future process or callable you passed.
        //
    }, function ($catchException) {
        //
    }, function ($progressOutput) {
        //
    }
);

// To turn on displaying of child output.
->displayOn();

// Stop displaying child output.
->displayOff();

// A `Future` process can be retried.
->restart();

// Wait for `Future` to terminate. Note this should only be executed for local testing only.
// Use "How to integrate into your project/package" section instead.
->run();

并行

Parallel类用于管理一组Future对象。它提供了相同的事件钩子错误处理

include 'vendor/autoload.php';

use Async\Spawn\Parallel;

$parallel = new Parallel();

foreach ($things as $thing) {
        // the second argument `optional`, can set the maximum amount of time a process may take to finish in seconds.
    $parallel->add(function () use ($thing) {
        // Do a thing
    }, $optional)->then(function ($output) {
        // Handle success
        // On success, `$output` is returned by the process or callable you passed to the queue.
    })->catch(function (\Throwable $exception) {
        // Handle exception
        // When an exception is thrown from within a process, it's caught and passed here.
    });
}

// Wait for Parallel `Future` Pool to terminate. Note this should only be executed for local testing only.
// Use "How to integrate into your project/package" section instead.
$parallel->wait();

并行配置

您可以根据需要创建任意数量的并行进程池,每个并行进程池都有自己的进程队列来处理。

并行进程池可由开发者进行配置。

use Async\Spawn\Parallel;

$parallel = (new Parallel())

// The maximum amount of processes which can run simultaneously.
    ->concurrency(20)

// Configure how long the loop should sleep before re-checking the process statuses in milliseconds.
    ->sleepTime(50000);

幕后

此包使用uv_spawn,并在需要时使用proc_open作为备用方案,在PHP中创建和管理子进程池。通过动态创建子进程,我们能够在并行执行PHP脚本。这种并行性在处理多个不需要相互等待的同步I/O任务时可以显著提高性能。

通过为这些任务分配单独的进程执行,底层操作系统可以负责并行运行它们。

此包提供的Parallel类通过调度和运行来处理尽可能多的进程。当生成多个进程时,每个进程都可以有不同的完成时间。

通过使用uv_run或基本的子进程轮询来等待所有进程完成,直到所有进程都完成。

当一个进程完成时,将触发其成功事件,您可以使用->then()函数来挂钩。当进程失败时,将触发一个错误事件,您可以使用->catch()函数来挂钩。当进程超时时,将触发一个超时事件,您可以使用->timeout()函数来挂钩。

然后迭代将更新该进程的状态并继续执行。

与原作者的 "Spatie/Async" 的区别

此包与原作者的spatie/async实现有所不同。

  • Runnable类是具有扩展功能的Future
  • Pool类是Parallel类,其中一些功能提取到了另一个类FutureHandler中。
  • ParentRuntime类是Spawn类,它可以接受一个string命令行操作以执行,返回一个Future
  • async函数是带有额外启动功能的spawn,它会显示任何子进程的输出。
  • 移除了输出限制,除非在Future中设置超时,否则没有超时,添加了所有SymfonyProcess功能。
  • 不仅限于LinuxCLI,在WindowsApple macOSWeb环境中也能以相同的方式运行。
  • 添加了对libuv事件循环库的支持,现在是主要的使用模型,如果未安装,则回退到proc-open进程。
  • Libuv允许更直接的Channel消息交换,与proc-open相同,但功能更有限。

待办事项:将所有类似于ext-parallel的功能从外部Coroutine库中迁移过来。

之前提交的PR解决了真正的Windows支持问题。

如何将库集成到您的项目/包中

当您将此库包含到项目中时,您不能直接执行spawn_wait()spawn_run()wait()run()函数/方法。它们主要用于在本地测试此库。您需要适应或创建自定义的事件循环例程。

Parallel类有一个getFutureHandler()方法,该方法返回一个FutureHandler实例。该FutureHandler有两个方法processing()isEmpty(),您需要在自定义循环例程中调用这两个方法。这两个调用与wait()方法在while循环中调用相同的调用一样,额外增加了sleepingTime()

processing() 方法将 监控/检查 Future 的 状态,并 执行 任何适当的 事件回调 处理器。 FutureHandler 类可以 接受/处理 自定义的 事件循环,该循环定义了 executeTask(event callback, future)isPcntl() 方法。自定义的 事件循环 对象应提供给 Parallel 实例化。

添加到您的 Event Loop 的基本设置

use Async\Spawn\Parallel;
use Async\Spawn\FutureHandler;
use Async\Spawn\FutureInterface;
use Async\Spawn\ParallelInterface;

class setupLoop
{
  /**
   * @var Parallel
   */
  protected $parallel;

  /**
   * @var FutureHandler
   */
  protected $future = null;

  public function __construct() {
    $this->parallel = new Parallel($this);
    $this->future = $this->parallel->getFutureHandler();
  }

  public function addFuture($callable, int $timeout = 0, bool $display = false, $channel = null): FutureInterface {
    $future = $this->parallel->add($callable, $timeout, $channel);
    return $display ? $future->displayOn() : $future;
  }

  public function getParallel(): ParallelInterface {
    return $this->parallel;
  }

  /**
   * Check for pending I/O events, signals, futures, streams/sockets/fd activity, timers or etc...
   */
  protected function hasEvents(): bool {
    return !$this->future->isEmpty() || !$this->ActionEventsCheckers->isEmpty();
  }

  public function runLoop() {
    while ($this->hasEvents()) {
      $this->future->processing();
      if ($this->waitForAction());
        $this->DoEventActions();
    }
  }

  public function executeTask($event, $parameters = null) {
    $this->DoEventActions($event, $parameters);
    // Or just
    // if (\is_callable($event))
       // $event($parameters);
  }

  public function isPcntl(): bool {}
}

此库使用 opis/closure 包进行 closure/callable 序列化。为了使任何 函数 方法在 Future 子进程中可用,您必须修改您的 composer.json 文件以确保它被选中。 composer.json 文件应包含指向包含您始终需要的函数的文件的指针,并确保所有新类/命名空间都在其中。您不能随意创建局部 命名函数 并期望它们可用。

// composer.json
"autoload": {
    "files": [
        "Extra/functions.php"
    ],
    "psr-4": {
        "Name\\Space\\": ["Folder/"],
        "Extra\\Name\\Spaces\\": ["Extra/"]
    }
},
// functions.php
if (!\function_exists('___marker')) {
  //
  // All additional extra functions needed in a `Future` process...
  //

  function ___marker()
  {
    return true;
  }
}

错误处理

如果子进程中抛出 ExceptionError,可以通过在 ->catch() 方法中指定回调来按进程捕获。

如果没有添加错误处理器,错误将在父进程中抛出。

如果子进程意外停止而没有抛出 Throwable,写入 stderr 的输出将被包装,并在父进程中作为 Async\Spawn\SpawnError 抛出。

贡献

鼓励并欢迎贡献;我总是很高兴在 Github 上收到反馈或拉取请求 :) 为错误和新功能创建 Github Issues,并就您感兴趣的评论。

许可

MIT 许可证 (MIT)。有关更多信息,请参阅 许可文件