cubiche/async

异步库

dev-master 2017-03-16 11:29 UTC

README

Build Status Coverage Status Scrutinizer Code Quality

一个事件循环抽象层,基于 React PHP/Event Loop 构建,并为 PHP 提供了一个轻量级的 Promises/A+ 实现,灵感来源于 React PHP/Promise

安装

通过 Composer

$ composer require cubiche/async:dev-master

Promise API

Promises/A+ 建议将 Promise 描述为一个接口,用于与表示异步执行动作结果的对象交互,该动作可能在任何给定时间点完成或未完成。Promise 对象的目的是允许感兴趣的一方在任务完成时获取延迟任务的結果。

方法

  • PromiseInterface::then(callable $onFulfilled = null, callable $onRejected = null, callable $onNotify = null) – 无论 Promise 在何时被解析或拒绝,只要结果可用,就会异步调用 $onFulfilled$onRejected 回调之一。回调以单个参数调用:结果值或拒绝原因。此外,$onNotify 回调可以在 Promise 解析或拒绝之前被零次或多次调用,以提供进度指示。

此方法返回一个新的 Promise,该 Promise 通过 $onFulfilled$onRejected 回调的返回值解析或拒绝(除非该值是 Promise,在这种情况下,它使用 Promise 链在 Promise 中解析的值进行解析)。它还通过 $onNotify 回调的返回值进行通知。

  • PromiseInterface::done(callable $onFulfilled = null, callable $onRejected = null, callable $onNotify = null) – 此方法类似于 PromiseInterface::then,但不返回新的 Promise。如果 $onFulfilled$onRejected$onNotify 回调抛出异常,则会导致致命错误。
  • PromiseInterface::otherwise(callable $onRejected)PromiseInterface::then(null, $onRejected) 的缩写。
  • PromiseInterface::always(callable $onFulfilledOrRejected, callable $onNotify = null) – 允许您观察 Promise 的实现或拒绝,但无需修改最终值。这在释放资源或执行无论 Promise 被拒绝还是解析都需要执行的清理工作时非常有用。
  • PromiseInterface::state() – 返回 Promise 状态(State::PENDING()State::FULFILLED()State::REJECTED())。

Deferred API

Deferred 对象的目的是公开相关的 Promise 实例以及用于指示任务成功或失败完成以及任务状态的 API。

方法

  • DeferredInterface::resolve($value = null) – 使用 $value 解析派生的 Promise。
  • DeferredInterface::reject($reason = null) – 使用 $reason 拒绝派生的 Promise。
  • DeferredInterface::notify($state = null) – 在 Promise 执行的状态上提供更新。这可能在进行解析或拒绝之前被多次调用。
  • DeferredInterface::cancel() – 如果可能,使用 CancellationException 原因拒绝派生的 Promise。
  • DeferredInterface::promise() – 返回与该 deferred 关联的 Promise 对象。

基本用法

使用示例:与 React PHP/事件循环 结合使用

<?php

require 'vendor/autoload.php';

use Cubiche\Core\Async\Promise\Promises;
use React\EventLoop\Factory;
use React\EventLoop\LoopInterface;
use React\EventLoop\Timer\Timer;

$loop = Factory::create();
$promise = asyncTask($loop);

$promise->done(function($message){
    echo $message. ' Done!'. PHP_EOL; 
});

$loop->run();

function asyncTask(LoopInterface $loop)
{
    $deferred = Promises::defer();
    
    $loop->addTimer(1, function(Timer $timer) use ($deferred) {
        $deferred->resolve('hello world!');
    });
    
    return $deferred->promise();
}

相同的使用示例:与 Cubiche/Async 结合使用

<?php

require 'vendor/autoload.php';

use Cubiche\Core\Async\Loop\Loop;
use Cubiche\Core\Async\Loop\LoopInterface;
use Cubiche\Core\Async\Loop\Timer\TimerInterface;

$loop = new Loop();

//timer is a Promise
$timer = asyncTask($loop);

$timer->done(function($message){
    echo $message. ' Done!'. PHP_EOL; 
});

$loop->run();

function asyncTask(LoopInterface $loop)
{
    return $loop->timeout(function(TimerInterface $timer) {
        return 'hello world!';
    }, 1);
}

矩阵乘法示例

<?php

require 'vendor/autoload.php';

use Cubiche\Core\Async\Loop\Loop;
use Cubiche\Core\Async\Loop\LoopInterface;
use Cubiche\Core\Async\Promise\Promises;

$loop = new Loop();

$n = 10;
$a = matrix($n);
$b = matrix($n);

$promise = mult($a, $b, $n, $loop);

$promise->done(function($c) use ($n){
    echo 'Result matrix:'. PHP_EOL;
    for ($i = 0; $i < $n; $i++){
        echo '('. implode(',', $c[$i]). ')' . PHP_EOL; 
    }
    echo 'Done!'. PHP_EOL; 
});

$loop->run();

function mult($a, $b, $n, LoopInterface $loop)
{
    $promises = array();
    $c = array();
    for ($i = 0; $i < $n; $i++){
        $c[$i] = array();
        for ($j = 0; $j < $n; $j++){
            //scheduling in random order
            $delay = (float)\rand()/(float)\getrandmax();
            $timer = $loop->timeout(function() use ($a, $b, $i, $j, $n){
                return multRowCol($a, $b, $i, $j, $n);
            }, $delay);
            
            $promises[] = $timer->then(function ($value) use(&$c, $i, $j){
                echo 'seting c['.$i. ', ' . $j . '] = '. $value . PHP_EOL;
                $c[$i][$j] = $value; 
            });
        }
    }
    
    return Promises::all($promises)->then(function () use (&$c){
        return $c;
    });
}

function multRowCol($a, $b, $i, $j, $n)
{
    echo 'calculating c['.$i. ', ' . $j . ']' . PHP_EOL;
    $sum = 0;
    for ($k = 0; $k < $n; $k++) {
        $sum += $a[$i][$k] * $b[$k][$j];
    }
    return $sum;
}

function matrix($n)
{
    $matrix = array();
    for ($i = 0; $i < $n; $i++){
        $matrix[$i] = array();
        for ($j = 0; $j < $n; $j++){
            $matrix[$i][$j] = \rand(1, 10);
        }
    }
    return $matrix;
}

React PHP 适配器示例

要运行此示例,您需要安装 React PHP/Dns

$ composer require react/dns:~0.4.0
<?php

require 'vendor/autoload.php';

use Cubiche\Core\Async\Loop\LoopAdapter;
use Cubiche\Core\Async\Promise\Promises;
use Cubiche\Core\Async\Promise\ThenableInterface;
use React\Dns\Resolver\Factory;
use React\Promise\PromiseInterface as ReactPromiseInterface;

class ThenableAdapter implements ThenableInterface{

    /**
     * @var ReactPromiseInterface
     */
    private $promise;
    
    /**
     * @param ReactPromiseInterface $promise
     */
    public function __construct(ReactPromiseInterface $promise)
    {
        $this->promise = $promise;
    }

    /**
     * {@inheritdoc}
     */
    public function then(callable $onFulfilled = null, callable $onRejected = null, callable $onNotify = null)
    {
        $this->promise->then($onFulfilled, $onRejected, $onNotify);
    }
}

$loop = new LoopAdapter();
$factory = new Factory();
$dns = $factory->create('8.8.8.8', $loop);

$domains = array('github.com', 'google.com', 'amazon.com');
$promises = array();
foreach ($domains as $domain) {
    $reactPromise = $dns->resolve($domain);
    $promisor = Promises::promisor(new ThenableAdapter($reactPromise));
    $promises[$domain] = $promisor->promise();
}

//waiting for all promises and get the result
$hots = Promises::get(Promises::all($promises), $loop);
foreach ($hots as $domain => $host) {
    echo 'Domain: '. $domain. ' Host: '. $host. PHP_EOL;
}

##作者