sue/coroutine

基于sue/event-loop的协程组件提供

1.3.0 2023-10-25 04:18 UTC

This package is auto-updated.

Last update: 2024-09-25 06:14:58 UTC


README

基于sue/event-loop的协程组件提供

什么是sue\coroutine

sue\coroutine是基于sue\event-loop事件循环生态的协程方案,底层是php的迭代器,但是针对php5版本做了兼容处理以及一些针对协程的控制逻辑。sue\event-loop是基于流程的ReactPHP进行的开发

什么是ReactPHP?

ReactPHP是一款基于PHP的事件驱动的组件。核心是提供EventLoop,然后提供基于EventLoop上的各种组件,比如I/O处理等。sue/event-loop组件也是基于ReactPHP提供的EventLoop

环境需求

  • php >= 5.6.4

目录

安装

$ composer require sue\coroutine 安装

要求

php: >= 5.6.0

PHP5兼容方案

sue\coroutine是基于php generator实现。php7版本开始允许在迭代器中加入return用来控制返回,但是在php5的迭代器中使用return会报错。所以php5可以用Sue\Coroutine\SystemCall\returnValue来代替return

co(function () {
    if (someCondition()) {
        yield \Sue\Coroutine\SystemCall\returnValue(false);
        //以上语句等同于在php7的 return false
    }

    yield someAsyncPromise();
    doSomethingAfterPromiseResolved();
});

快速入门示例

use React\Promise\Deferred;

use function Sue\EventLoop\loop;
use function Sue\EventLoop\setTimeout;
use function Sue\Coroutine\co;


$deferred = new Deferred();
setTimeout(3, function () use ($deferred) {
    $deferred->resolve('foo');
})

//1. 传统基于thenable方法处理promise
$deferred->promise()->then(function ($value) {
    echo "promise value: {$value}\r\n";
});

//2. 使用协程的方式处理promise
co(function ($promise) {
    echo "start waiting promise to be resolved\r\n";
    $value = yield $promise; //协程会在此处暂停,直到promise被resolve或者reject
    echo "promise value: {$value}\r\n"
}, $deferred->promise());
loop()->run();

函数

co

\Sue\Coroutine\co($callable) 可以将一段callable且是迭代器的代码以协程方式执行,如果callable非迭代器(generator)的话,那么会直接返回结果,使用方法如下

use function Sue\EventLoop\loop;
use function Sue\Coroutine\co;

$callable = function ($worker_id) {
    $count = 3;
    while ($count--) {
        echo "{$worker_id}: " . yield $count . "\r\n";
    }
};
co($callable, 'foo');
co($callable, 'bar');
loop()->run();
/** expect out:
 foo: 2
 bar: 2
 foo: 1
 bar: 1
 foo: 0
 bar: 0
**/

coAs

\Sue\Coroutine\coAs($coroutine_class, $callable)方法同co,区别是可以使用自定义的协程class(需要继承\Sue\Coroutine\Coroutine)。用此方法启动的协程中的子协程也会自动继承父级使用的自定义协程class

namespace App\Custom;

class CustomCoroutine extends \Sue\Coroutine\Coroutine
{
    public function get()
    {
        $result = parent::get();
        Log::info('result got', ['data' => $result]);
    }
}
coAs(\App\Custom\CustomCoroutine::class, $callable);

go

*** 不要使用,计划未来移除 *** Sue\Coroutine\go($callable)方法和作用同co(),只是没有返回值

async

\Sue\Coroutine\async($callable, $timeout)方法可以临时启动一个eventloop来执行一段协程的代码,当协程执行完毕或者超时后eventloop将被关闭。该方法适用于传统的阻塞php模型(比如php-fpm)中使用异步的特性。方法返回协程返回值或者抛出异常 *** 该方法无法在一个已启动eventloop中使用 ***

try {
    $timeout = 15;
    $result = async(function () {
        $promise_a = someHeavyOperation();
        $promise_b = someHeavyOperation();
        //让两个promise并行处理
        list($value_a, $value_b) = yield [$promise_a, $promise_b];
        yield returnValue(['a' => $value_a, 'b' => $value_b]);
    }, $timeout);
    doSomething($result['a']);
    doSomething($result['b']);
    //handle result
} catch (Throwable $e) {
    //error handle
}

defer

Sue\Coroutine\defer($interval, $callable, ...$callable_params) 可以延迟一段时间再执行协程

use function Sue\EventLoop\loop;
use function Sue\EventLoop\setInterval;
use function Sue\Coroutine\defer;

setInterval(1, function () {
    echo "tick\r\n";
});
$callable = function ($worker_id) {
    $count = 3;
    while ($count--) {
        echo "{$worker_id}: " . yield $count . "\r\n";
    }
};
defer(5, $callable, 'foo');
defer(5, $callable, 'bar');
loop()->run();
/** expect out:
 tick
 tick
 tick
 tick
 tick
 foo: 2
 bar: 2
 foo: 1
 bar: 1
 foo: 0
 bar: 0
 tick
 tick
 ...
**/

pause

Sue\Coroutine\pause($seconds)可以生成一条系统指令,效果是让当前协程暂停执行并进行休眠,之后继续执行

use Sue\Coroutine\SystemCall;

use function Sue\Coroutine\SystemCall\sleep;
use function Sue\EventLoop\loop;
use function Sue\EventLoop\setInterval;

setInterval(1, function () {
    echo "tick\r\n";
});
$callable = function ($worker_id) {
    echo "before-sleep\n";
    yield SystemCall\sleep(3); //协程会在这里sleep 3秒后再执行
    $count = 3;
    while ($count--) {
        echo "{$worker_id}: " . yield $count . "\r\n";
    }
};
co($callable, 'foo');
loop()->run();
/** expect out:
 before-sleep
 tick
 tick
 tick
 foo: 2
 foo: 1
 foo: 0
 tick
 tick
 ...
**/

sleep

建议用\Sue\Coroutine\SystemCall\pause方法代替,效果一致,此方法未来移除 Sue\Coroutine\SystemCall\sleep($seconds)效果同\Sue\Coroutine\SystemCall\pause($seconds)

timeout

\Sue\Coroutine\SystemCall\timeout($seconds)生成一个系统指令,可以为当前协程设置最大运行时间,如果超过运行时间,则会抛出异常

use React\Promise\Deferred;
use Sue\Coroutine\SystemCall;
use Sue\Coroutine\Exceptions\TimeoutException;

use function Sue\EventLoop\loop;
use function Sue\EventLoop\setTimeout;
use function Sue\Coroutine\co;

$deferred = new Deferred();
$loop->addTimer(3, function () use ($deferred) {
    $deferred->resolve('foo'); //3秒后promise fulfill
});
$promise = $deferred->promise();

$children = (function () use ($promise) {
    yield SystemCall\timeout(2); //当前协程最多运行2秒
    return yield $promise;
})();

co(function () use ($children) {
    try {
        yield $children;
    } catch (TimeoutException $e) {
        //子协程超出最大运行时间时的异常处理
    }
});

loop()->run();

cancel

Sue\Coroutine\SystemCall\cancel($message, $code)生成一个系统指令,可以取消当前协程及其子协程,并抛出异常

use Sue\Coroutine\SystemCall;
use Sue\Coroutine\Exceptions\CancelException;

use function Sue\Coroutine\co;
use function Sue\EventLoop\loop;

$children = (function () {
    if (someConditionNotMatch()) {
        yield SystemCall\cancel('condition not match', 500);
    }
});
co(function () use ($children) {
    try {
        yield $children;
    } catch (CancelException $e) {
        //子协程取消异常处理
    }
});
loop()->run();

returnValue

*** php7以上版本可以直接用return *** Sue\Coroutine\SystemCall\returnValue($value)生成一条系统指令,可以中止当前协程及子协程,并返回value值

$children = (function () {
    if (someCondition()) {
        yield \Sue\Coroutine\SystemCall\returnValue\returnValue('foo');
        //等同于php7+中的: return 'foo';
    }

    yield someAsyncPromise();
    return 'bar';
})();

co(function () use ($children) {
    $name = yield $children;
});

tests

先安装依赖

composer install

然后执行单元测试

./vendor/bin/phpunit

许可证

MIT许可证 (MIT)

版权所有 (c) 2023 张东海

本许可证授予任何获得此软件及其相关文档副本(“软件”)的人免费使用软件的权利,不受任何限制,包括但不限于使用、复制、修改、合并、发布、分发、再许可和/或销售软件副本,并允许向提供软件的人提供使用软件的权利,前提是必须遵守以下条件

上述版权声明和本许可声明应包含在软件的所有副本或主要部分中。

软件按“现状”提供,不提供任何形式的保证,明示或暗示,包括但不限于对适销性、特定用途适用性和非侵权的保证。在任何情况下,作者或版权所有者均不对任何索赔、损害或其他责任负责,无论是由合同、侵权或其他行为引起的,无论是否与软件或其使用有关或与之相关。