innmind/process-manager

此包已被弃用,不再维护。作者建议使用 innmind/mantle 包。

用于处理子进程的库

4.2.0 2023-09-23 14:16 UTC

This package is auto-updated.

Last update: 2023-11-11 11:16:07 UTC


README

Build Status codecov Type Coverage

通过 pcntl_fork 执行并行代码的简单库。

安装

composer require innmind/process-manager

用法

use Innmind\ProcessManager\{
    Manager\Parallel,
    Runner\SubProcess,
};
use Innmind\OperatingSystem\Factory;
use Innmind\Immutable\{
    Sequence,
    Str,
};
use GuzzleHttp\Client;

$urls = Sequence::strings(
    'http://google.com',
    'http://github.com',
    'http://wikipedia.org'
);
$http = new Client;
$os = Factory::buid();
$runner = new SubProcess($os->process());
$crawl = $urls->reduce(
    Parallel::of($runner),
    static function(Parallel $parallel, string $url) use ($http): Parallel {
        return $parallel->schedule(static function() use ($http, $url): void {
            \file_put_contents(
                '/tmp/'.md5($url),
                (string) $http->get($url)->getBody(),
            );
        });
    }
);
$crawling = $crawl->start()->match(
    static fn($crawling) => $crawling,
    static fn() => throw new \RuntimeException('Failed to start crawlers'),
);
echo 'These urls are being crawled in parallel: '.Str::of(', ')->join($urls);
$crawling->wait()->match(
    static fn() => null, // finished
    static fn() => throw new \RuntimeException('A process failed'),
);

此示例将通过子进程并行爬取 3 个 URL。

重要:使用此代码无法返回值,如果您想将内容返回给父进程,您需要实现基于套接字或共享内存的 IPC(这可能在未来的版本中实现)。

Pool 实现与 Parallel 相同的接口,但您需要指定希望允许的最大子进程数,即 Pool::of(2, $runner, $sockets) 将允许最多 2 个子进程并行。

重要:当您启动池时,仅调用计划中的前 n 个函数,您绝对需要调用 wait 方法,以便调用剩余的函数。

示例

use Innmind\ProcessManager\Manager\Pool;

$pool = Pool::of(2, $runner, $os->sockets())
    ->schedule(function() {
        sleep(10);
    })
    ->schedule(function() {
        sleep(5);
    })
    ->schedule(function() {
        sleep(60);
    });
// no process started yet, same behaviour as Parallel
$running = $pool->start()->match(
    static fn($running) => $running,
    static fn() => throw new \RuntimeException,
);
// first two functions are started as sub processes
/*
do some code that last more than 10 seconds...
 */
// third function still not started
$pool->wait()->match( // this will run all the remaining functions
    static fn() => null, // finished
    static fn() => throw new \RuntimeException,
);