axxapy / php_threadpool
pcntl 的线程和线程池包装器(不是实际线程,而是通过 fork 实现)
v1.0.1
2020-04-06 03:11 UTC
Requires
- ext-pcntl: *
- axxapy/php_core: ^1.0
This package is auto-updated.
Last update: 2024-09-22 14:46:21 UTC
README
示例
$Threads = []; $Threads[] = new Thread(function(Thread $Thread) { $Thread->saveData(file_get_contents($Thread->getLaunchParams()[0])); }); $Threads[] = clone $Threads[0]; $Threads[] = clone $Threads[0]; $Threads[0]->run('http://ya.ru'); $Threads[1]->run('http://mail.ru'); $Threads[2]->run('http://r0.ru'); while (array_filter($Threads, function (Thread $Thread) {return $Thread->isChildAlive();})) { sleep(1); } array_walk($Threads, function(Thread $Thread) { var_dump($Thread->getSavedResult()); });
ThreadPool
- 允许执行无限数量的线程。
- 管理子任务,并在它们未设置状态为“完成”时重新启动它们。
- 对于执行内存泄漏的脚本很有用。
- 可用于守护进程实现。 示例
$res = (new ThreadPool(10)) //threads count ->setTask(function (Thread $Thread) { $data = $Thread->getSavedData(); $i = isset($data['i']) ? $data['i'] : 0; $i++; var_dump($i); $Thread->saveResult([ 'i' => $i, 'num' => $Thread->getThreadNumber(), ]); if ($i >= 10) {//script will exit after 10 iterations/launches $Thread->markFinished(); } })->setInterruptedHandler(function (Thread $Thread, $signo) { //dump current state before exit file_put_contents('worker_' . $Thread->getThreadNumber(), json_encode($Thread->getSavedResult())); trigger_error('Interrupted with signal ' . $signo); })->run(); var_dump($res); // will return [ // 0 => [ // 'i' => 10, // 'num' => 0 // ], // 1 => [ // 'i' => 10, // 'num' => 1 // ] // ];