alexanderc/threadator

此包已被弃用且不再维护。未建议替代包。

Thread[ator] 是尝试构建一个不错的 PHP 多线程库

安装: 33

依赖者: 0

建议者: 0

安全: 0

星星: 8

关注者: 3

分支: 0

开放问题: 0

语言:JavaScript

dev-master 2014-10-23 09:29 UTC

This package is not auto-updated.

Last update: 2020-10-12 11:16:47 UTC


README

Threadator 包的主要目标是提供一个简单的方式来在 PHP 中运行多线程应用程序。你可能已经注意到有其他类似的包,但是

  • 这是一个现代化的包(使用生成器、特质和其他语言糖 + 作为 composer 库构建)
  • 它提供了最多的原生实现(你所需要的只是 posix, pcntl)
  • 你可以完全控制你的线程(互斥和线程与主进程之间的双向通信)

安装

  • 通过 Composer

      "alexanderc/threadator": "dev-master"
    

基本用法

<?php
require '/path/to/vendor/autoload.php';

$runtime = new \Threadator\Runtime();
$factory = new \Threadator\Factory($runtime);

// instead of built in drivers names ["msgQueue", "redis"] you can use your own driver
// by providing fully qualified class name (with NS), like My\CommunicationDriver\TestDriver
// IMPORTANT: in order to get a stable work of communication- use "redis" instead of "msgQueue"
$communication = \Threadator\Communication\Communication::create($runtime, 'msgQueue' /* 'redis', ['127.0.0.1'] */ );
$runtime->setCommunication($communication);

// now we can create some threads
for($i = 0; $i < 5; $i++) {
	// you can also use your own thread implementation by calling $factory->create("You\\Thread\\Implementation\\TestThread");
	// IMPORTANT: your TestThread class should extend Threadator\Thread
	
    /** @var \Threadator\Thread $thread */
       $thread = $factory->createCallable(function($thread) {
              // create mutex
			  // for more mutex types check \Threadator\Mutex::T_* constants
              $mutex = $thread->createMutex("echo", \Threadator\Mutex::T_FUNCTION);
			  
			  // wait until we aquire the mutex
			  $mutex->waitAcquire();
			  
			  // do some work here...
			  sleep(mt_rand(1, 3));
			  echo "Running Thread #{$thread->getPid()}...\n";
			  
			  // wait until we get a message from the main process
              $thread->receiveMessage($message);
			  
			  // send back this message
              $thread->sendMessage("#{$thread->getPid()}: {$message}");
           });
}

echo "Main process #{$runtime->getPid()} running!\n";

// start all threads
$runtime->run();

// send a message to all threads
foreach($runtime->broadcastMessage(microtime(true)) as list($result, $thread)) {
	// if result == 1 than message was sent
    echo "Result for msg #{$thread->getPid()} -> {$result}\n";
}

// wait until we receive thread messages
$messages = [];
foreach($runtime->receiveMessage() as $result => $message) {
    if($result) {
        $messages[] = $message;
    }
}
echo "Thread messages: " . implode(", ", $messages) . "\n";

// wait until all the threads runs
$runtime->join();

exit("Main process #{$runtime->getPid()} stopped!\n");
For more examples check "test" folder

是否可扩展

Yes it is!

你可以轻松编写自己的通信驱动或线程实现。

如何扩展

如果你需要添加(例如)一个新的通信驱动 - 你只需要像这样扩展 Threadator\Communication\Driver

<?php
/**
 * @author AlexanderC <self@alexanderc.me>
 * @date 4/10/14
 * @time 2:04 PM
 */

use  Threadator\Communication\Driver\ADriver;


class TestDriver extends ADriver
{
    /**
     * @return void
     */
    protected function init()
    {
        // TODO: Implement init() method.
    }

    /**
     * @param int $key
     * @param mixed $message
     * @return bool
     */
    public function send($key, $message)
    {
        // TODO: Implement send() method.
    }

    /**
     * Try to get message, but do not block
     *
     * @param int $key
     * @param mixed $message
     * @return bool
     */
    public function touch($key, & $message)
    {
        // TODO: Implement touch() method.
    }

    /**
     * Block until the first message arrives
     *
     * @param int $key
     * @param mixed $message
     * @return bool
     */
    public function receive($key, & $message)
    {
        // TODO: Implement receive() method.
    }

} 	

待办事项

  • 创建单元测试
  • 实现更多通信驱动
  • ...你的建议

贡献者