jenner/simple_fork

基于pcntl的简单多进程管理器

安装数: 265,516

依赖者: 10

建议者: 0

安全: 0

星标: 224

关注者: 9

分支: 60

开放问题: 5

类型:package

1.2.2 2017-03-22 03:46 UTC

This package is not auto-updated.

Last update: 2024-09-11 09:14:17 UTC


README

Join the chat at https://gitter.im/huyanping/simple-fork-php Latest Stable Version Total Downloads Latest Unstable Version License travis Scrutinizer Code Quality Code Coverage

中文README.MD
Simple Fork框架基于PCNTL扩展,其接口类似于Java中的ThreadRunnable

为什么选择SimpleFork

对于新手来说,编写多进程程序是比较困难的。你必须考虑到如何恢复僵尸进程、进程间通信,特别是处理进程信号。SimpleFork框架提供了一些接口,类似于Java的Thread以及进程池、同步和IPC的解决方案。你不需要关心如何控制多进程。

需求

composer require jenner/simple_fork

require '/path/to/simple-fork-php/autoload.php'

依赖

必须

  • php > 5.3.0
  • ext-pcntl进程控制

可选

  • ext-sysvmsg消息队列
  • ext-sysvsem信号量
  • ext-sysvshm共享内存
  • ext-redis redis缓存和redis消息队列

属性

  • 进程池和固定池
  • 自动恢复僵尸进程
  • 共享内存、系统V消息队列、信号量锁、文件锁、redis缓存、redis队列
  • 创建进程的三种方式:扩展Process、实现Runnable或使用带有回调函数的进程对象
  • 你可以获取子进程的状态
  • 如果你想要,你可以停止任何进程,或者只是关闭所有进程
  • 你可以通过reload()方法重新加载进程,然后进程将退出并启动新的进程。

进程池

当你有多个进程或任务需要管理时,你可以使用两种池:Pool和FixedPool。

  • Pool:你可以在一个Pool对象中执行不同的进程。然后调用wait方法等待所有子进程退出(或者做其他事情,但不要忘记调用wait方法)
  • ParallelPool:它将保持子进程的数量,你应该在FixedPool开始之前不要初始化任何socket连接(在多进程中共享socket连接是危险的)。这个类有一个reload方法,可以重新加载所有子进程。当你调用reload方法时,主进程将启动新的N个进程并关闭旧的进程。
  • SinglePool:无论你执行多少进程,它都会保持一个进程启动,并在它停止后启动另一个进程。
  • FixedPool:无论你执行多少进程,它都会保持N个进程启动,并在它们停止后启动新的进程。活跃进程的数量将永远小于N+1。

注意

  • 请记住,你应该调用Process::dispatchSignal方法来调用挂起的信号处理器。
  • 不建议在程序开始时添加declare(ticks=n);来处理挂起的信号。
  • 处理单个进程的更好方法是调用pcntl_signal_dispatch而不是declare,因为这会浪费CPU资源。
  • 如果子进程频繁快速退出,你应该将n设置为一个小整数,否则设置一个大整数值以节省CPU时间。
  • 如果你想在主进程中注册信号处理器,子进程将继承该处理器。
  • 如果你想在子进程启动之前注册信号处理器,你可以调用Process::registerSignalHandler方法。子进程的start方法被调用时,它会自动注册信号处理器。

示例

更多示例请见[示例](https://github.com/huyanping/simple-fork-php/tree/master/examples 示例)字典
一个简单的示例。

class TestRunnable implements \Jenner\SimpleFork\Runnable{

    /**
     * Entrance
     * @return mixed
     */
    public function run()
    {
        echo "I am a sub process" . PHP_EOL;
    }
}

$process = new \Jenner\SimpleFork\Process(new TestRunnable());
$process->start();
$process->wait();

使用回调的过程

$process = new \Jenner\SimpleFork\Process(function(){
    for($i=0; $i<3; $i++){
        echo $i . PHP_EOL;
        sleep(1);
    }
});

$process->start();
$process->wait();

使用共享内存进行进程通信

class Producer extends \Jenner\SimpleFork\Process{
    public function run(){
        $cache = new \Jenner\SimpleFork\Cache\SharedMemory();
        //$cache = new \Jenner\SimpleFork\Cache\RedisCache();
        for($i = 0; $i<10; $i++){
            $cache->set($i, $i);
            echo "set {$i} : {$i}" . PHH_EOL;
        }
    }
}

class Worker extends \Jenner\SimpleFork\Process{
    public function run(){
        sleep(5);
        $cache = new \Jenner\SimpleFork\Cache\SharedMemory();
        //$cache = new \Jenner\SimpleFork\Cache\RedisCache();
        for($i=0; $i<10; $i++){
            echo "get {$i} : " . $cache->get($i) . PHP_EOL;
        }
    }
}

$producer = new Producer();

$worker = new Worker();

$pool = new \Jenner\SimpleFork\Pool();
$pool->execute($producer);
$pool->execute($worker);
$pool->wait();

使用系统V消息队列进行进程通信

class Producer extends \Jenner\SimpleFork\Process
{
    public function run()
    {
        $queue = new \Jenner\SimpleFork\Queue\SystemVMessageQueue();
        //$queue = new \Jenner\SimpleFork\Queue\RedisQueue();
        for ($i = 0; $i < 10; $i++) {
            echo getmypid() . PHP_EOL;
            $queue->put($i);
        }
    }
}

class Worker extends \Jenner\SimpleFork\Process
{
    public function run()
    {
        sleep(5);
        $queue = new \Jenner\SimpleFork\Queue\SystemVMessageQueue();
        //$queue = new \Jenner\SimpleFork\Queue\RedisQueue();
        for ($i = 0; $i < 10; $i++) {
            $res = $queue->get();
            echo getmypid() . ' = ' . $i . PHP_EOL;
            var_dump($res);
        }
    }
}

$producer = new Producer();

$worker = new Worker();

$pool = new \Jenner\SimpleFork\Pool();
$pool->execute($producer);
$pool->execute($worker);
$pool->wait();

使用信号量锁进行进程通信

class TestRunnable implements \Jenner\SimpleFork\Runnable
{

    /**
     * @var \Jenner\SimpleFork\Lock\LockInterface
     */
    protected $sem;

    public function __construct()
    {
        $this->sem = \Jenner\SimpleFork\Lock\Semaphore::create("test");
        //$this->sem = \Jenner\SimpleFork\Lock\FileLock::create("/tmp/test.lock");
    }

    /**
     * @return mixed
     */
    public function run()
    {
        for ($i = 0; $i < 20; $i++) {
            $this->sem->acquire();
            echo "my turn: {$i} " . getmypid() . PHP_EOL;
            $this->sem->release();
            sleep(1);
        }
    }
}

$pool = new \Jenner\SimpleFork\Pool();
$pool->execute(new \Jenner\SimpleFork\Process(new TestRunnable()));
$pool->execute(new \Jenner\SimpleFork\Process(new TestRunnable()));

$pool->wait();

进程池管理进程

$pool = new \Jenner\SimpleFork\Pool();
$pool->execute(new \Jenner\SimpleFork\Process(new TestRunnable()));
$pool->execute(new \Jenner\SimpleFork\Process(new TestRunnable()));
$pool->execute(new \Jenner\SimpleFork\Process(new TestRunnable()));

$pool->wait();

ParallelPool 管理进程

$fixed_pool = new \Jenner\SimpleFork\ParallelPool(new TestRunnable(), 10);
$fixed_pool->start();
$fixed_pool->keep(true);

FixedPool 管理进程

$pool = new \Jenner\SimpleFork\FixedPool(2);
$pool->execute(new \Jenner\SimpleFork\Process(new TestRunnable()));
$pool->execute(new \Jenner\SimpleFork\Process(new TestRunnable()));
$pool->execute(new \Jenner\SimpleFork\Process(new TestRunnable()));

$pool->wait();

SinglePool 管理进程

$pool = new \Jenner\SimpleFork\SinglePool();
$pool->execute(new \Jenner\SimpleFork\Process(new TestRunnable()));
$pool->execute(new \Jenner\SimpleFork\Process(new TestRunnable()));
$pool->execute(new \Jenner\SimpleFork\Process(new TestRunnable()));

$pool->wait();