jenner / simple_fork
基于pcntl的简单多进程管理器
1.2.2
2017-03-22 03:46 UTC
Requires
- php: >=5.3.0
README
中文README.MD
Simple Fork框架基于PCNTL扩展,其接口类似于Java中的Thread
和Runnable
。
为什么选择SimpleFork
对于新手来说,编写多进程程序是比较困难的。你必须考虑到如何恢复僵尸进程、进程间通信,特别是处理进程信号。SimpleFork框架提供了一些接口,类似于Java的Thread
以及进程池、同步和IPC的解决方案。你不需要关心如何控制多进程。
需求
composer require jenner/simple_fork
或
require '/path/to/simple-fork-php/autoload.php'
依赖
必须
- php > 5.3.0
- ext-pcntl进程控制
可选
- ext-sysvmsg消息队列
- ext-sysvsem信号量
- ext-sysvshm共享内存
- ext-redis redis缓存和redis消息队列
属性
- 进程池和固定池
- 自动恢复僵尸进程
- 共享内存、系统V消息队列、信号量锁、文件锁、redis缓存、redis队列
- 创建进程的三种方式:扩展Process、实现Runnable或使用带有回调函数的进程对象
- 你可以获取子进程的状态
- 如果你想要,你可以停止任何进程,或者只是关闭所有进程
- 你可以通过reload()方法重新加载进程,然后进程将退出并启动新的进程。
进程池
当你有多个进程或任务需要管理时,你可以使用两种池:Pool和FixedPool。
- Pool:你可以在一个Pool对象中执行不同的进程。然后调用
wait
方法等待所有子进程退出(或者做其他事情,但不要忘记调用wait
方法) - ParallelPool:它将保持子进程的数量,你应该在FixedPool开始之前不要初始化任何socket连接(在多进程中共享socket连接是危险的)。这个类有一个
reload
方法,可以重新加载所有子进程。当你调用reload
方法时,主进程将启动新的N个进程并关闭旧的进程。 - SinglePool:无论你执行多少进程,它都会保持一个进程启动,并在它停止后启动另一个进程。
- FixedPool:无论你执行多少进程,它都会保持N个进程启动,并在它们停止后启动新的进程。活跃进程的数量将永远小于N+1。
注意
- 请记住,你应该调用
Process::dispatchSignal
方法来调用挂起的信号处理器。 - 不建议在程序开始时添加
declare(ticks=n);
来处理挂起的信号。 - 处理单个进程的更好方法是调用
pcntl_signal_dispatch
而不是declare
,因为这会浪费CPU资源。 - 如果子进程频繁快速退出,你应该将
n
设置为一个小整数,否则设置一个大整数值以节省CPU时间。 - 如果你想在主进程中注册信号处理器,子进程将继承该处理器。
- 如果你想在子进程启动之前注册信号处理器,你可以调用
Process::registerSignalHandler
方法。子进程的start
方法被调用时,它会自动注册信号处理器。
示例
更多示例请见[示例](https://github.com/huyanping/simple-fork-php/tree/master/examples 示例)字典
一个简单的示例。
class TestRunnable implements \Jenner\SimpleFork\Runnable{ /** * Entrance * @return mixed */ public function run() { echo "I am a sub process" . PHP_EOL; } } $process = new \Jenner\SimpleFork\Process(new TestRunnable()); $process->start(); $process->wait();
使用回调的过程
$process = new \Jenner\SimpleFork\Process(function(){ for($i=0; $i<3; $i++){ echo $i . PHP_EOL; sleep(1); } }); $process->start(); $process->wait();
使用共享内存进行进程通信
class Producer extends \Jenner\SimpleFork\Process{ public function run(){ $cache = new \Jenner\SimpleFork\Cache\SharedMemory(); //$cache = new \Jenner\SimpleFork\Cache\RedisCache(); for($i = 0; $i<10; $i++){ $cache->set($i, $i); echo "set {$i} : {$i}" . PHH_EOL; } } } class Worker extends \Jenner\SimpleFork\Process{ public function run(){ sleep(5); $cache = new \Jenner\SimpleFork\Cache\SharedMemory(); //$cache = new \Jenner\SimpleFork\Cache\RedisCache(); for($i=0; $i<10; $i++){ echo "get {$i} : " . $cache->get($i) . PHP_EOL; } } } $producer = new Producer(); $worker = new Worker(); $pool = new \Jenner\SimpleFork\Pool(); $pool->execute($producer); $pool->execute($worker); $pool->wait();
使用系统V消息队列进行进程通信
class Producer extends \Jenner\SimpleFork\Process { public function run() { $queue = new \Jenner\SimpleFork\Queue\SystemVMessageQueue(); //$queue = new \Jenner\SimpleFork\Queue\RedisQueue(); for ($i = 0; $i < 10; $i++) { echo getmypid() . PHP_EOL; $queue->put($i); } } } class Worker extends \Jenner\SimpleFork\Process { public function run() { sleep(5); $queue = new \Jenner\SimpleFork\Queue\SystemVMessageQueue(); //$queue = new \Jenner\SimpleFork\Queue\RedisQueue(); for ($i = 0; $i < 10; $i++) { $res = $queue->get(); echo getmypid() . ' = ' . $i . PHP_EOL; var_dump($res); } } } $producer = new Producer(); $worker = new Worker(); $pool = new \Jenner\SimpleFork\Pool(); $pool->execute($producer); $pool->execute($worker); $pool->wait();
使用信号量锁进行进程通信
class TestRunnable implements \Jenner\SimpleFork\Runnable { /** * @var \Jenner\SimpleFork\Lock\LockInterface */ protected $sem; public function __construct() { $this->sem = \Jenner\SimpleFork\Lock\Semaphore::create("test"); //$this->sem = \Jenner\SimpleFork\Lock\FileLock::create("/tmp/test.lock"); } /** * @return mixed */ public function run() { for ($i = 0; $i < 20; $i++) { $this->sem->acquire(); echo "my turn: {$i} " . getmypid() . PHP_EOL; $this->sem->release(); sleep(1); } } } $pool = new \Jenner\SimpleFork\Pool(); $pool->execute(new \Jenner\SimpleFork\Process(new TestRunnable())); $pool->execute(new \Jenner\SimpleFork\Process(new TestRunnable())); $pool->wait();
进程池管理进程
$pool = new \Jenner\SimpleFork\Pool(); $pool->execute(new \Jenner\SimpleFork\Process(new TestRunnable())); $pool->execute(new \Jenner\SimpleFork\Process(new TestRunnable())); $pool->execute(new \Jenner\SimpleFork\Process(new TestRunnable())); $pool->wait();
ParallelPool 管理进程
$fixed_pool = new \Jenner\SimpleFork\ParallelPool(new TestRunnable(), 10); $fixed_pool->start(); $fixed_pool->keep(true);
FixedPool 管理进程
$pool = new \Jenner\SimpleFork\FixedPool(2); $pool->execute(new \Jenner\SimpleFork\Process(new TestRunnable())); $pool->execute(new \Jenner\SimpleFork\Process(new TestRunnable())); $pool->execute(new \Jenner\SimpleFork\Process(new TestRunnable())); $pool->wait();
SinglePool 管理进程
$pool = new \Jenner\SimpleFork\SinglePool(); $pool->execute(new \Jenner\SimpleFork\Process(new TestRunnable())); $pool->execute(new \Jenner\SimpleFork\Process(new TestRunnable())); $pool->execute(new \Jenner\SimpleFork\Process(new TestRunnable())); $pool->wait();