innmind / process-manager
4.2.0
2023-09-23 14:16 UTC
Requires
- php: ~8.2
- innmind/immutable: ~4.15|~5.0
- innmind/operating-system: ~3.0
- innmind/stream: ~4.0
Requires (Dev)
- innmind/coding-standard: ~2.0
- phpunit/phpunit: ~9.0
- vimeo/psalm: ~5.15
README
通过 pcntl_fork
执行并行代码的简单库。
安装
composer require innmind/process-manager
用法
use Innmind\ProcessManager\{ Manager\Parallel, Runner\SubProcess, }; use Innmind\OperatingSystem\Factory; use Innmind\Immutable\{ Sequence, Str, }; use GuzzleHttp\Client; $urls = Sequence::strings( 'http://google.com', 'http://github.com', 'http://wikipedia.org' ); $http = new Client; $os = Factory::buid(); $runner = new SubProcess($os->process()); $crawl = $urls->reduce( Parallel::of($runner), static function(Parallel $parallel, string $url) use ($http): Parallel { return $parallel->schedule(static function() use ($http, $url): void { \file_put_contents( '/tmp/'.md5($url), (string) $http->get($url)->getBody(), ); }); } ); $crawling = $crawl->start()->match( static fn($crawling) => $crawling, static fn() => throw new \RuntimeException('Failed to start crawlers'), ); echo 'These urls are being crawled in parallel: '.Str::of(', ')->join($urls); $crawling->wait()->match( static fn() => null, // finished static fn() => throw new \RuntimeException('A process failed'), );
此示例将通过子进程并行爬取 3 个 URL。
重要:使用此代码无法返回值,如果您想将内容返回给父进程,您需要实现基于套接字或共享内存的 IPC(这可能在未来的版本中实现)。
池
Pool
实现与 Parallel
相同的接口,但您需要指定希望允许的最大子进程数,即 Pool::of(2, $runner, $sockets)
将允许最多 2 个子进程并行。
重要:当您启动池时,仅调用计划中的前 n
个函数,您绝对需要调用 wait
方法,以便调用剩余的函数。
示例
use Innmind\ProcessManager\Manager\Pool; $pool = Pool::of(2, $runner, $os->sockets()) ->schedule(function() { sleep(10); }) ->schedule(function() { sleep(5); }) ->schedule(function() { sleep(60); }); // no process started yet, same behaviour as Parallel $running = $pool->start()->match( static fn($running) => $running, static fn() => throw new \RuntimeException, ); // first two functions are started as sub processes /* do some code that last more than 10 seconds... */ // third function still not started $pool->wait()->match( // this will run all the remaining functions static fn() => null, // finished static fn() => throw new \RuntimeException, );