leocavalcante/swoole-futures

Futures + async/await 用于 PHP 的 Swoole 并发运行时。

v0.1.0 2020-02-29 00:59 UTC

This package is auto-updated.

Last update: 2024-09-09 04:31:05 UTC


README

https://github.com/leocavalcante/swoole-futures/actions?query=workflow%3ACI https://shepherd.dev/github/leocavalcante/swoole-futures

⏳ 为 PHP 的 Swoole 异步运行时提供的 Futures、Streams 和 Async/Await。

灵感来自 futures Crate,它是 Rust 的 Tokio 异步运行时的实现。

它建立在 Swoole 的协程系统之上,没有特别的魔法,只有糖。

安装

composer require leocavalcante/swoole-futures

使用方法

异步 / 等待

以不同于 Swoole 协程的另一种风格创建并等待异步计算。

$future = Futures\async(fn() => 1);
$result = $future->await(); // 1

Futures 是懒加载的,只有当您调用 await 时才会运行。

连接

将一系列 Futures 连接到单个 Future 中,该 Future 等待一系列结果。

$slow_rand = function (): int {
    Co::sleep(3);
    return rand(1, 100);
};

$n1 = Futures\async($slow_rand);
$n2 = Futures\async($slow_rand);
$n3 = Futures\async($slow_rand);

$n = Futures\join([$n1, $n2, $n3]);

print_r($n->await());

这需要 3 秒钟,而不是 9 秒,Futures 并发运行!(顺序不可保证)

竞赛

返回第一个完成的 Future 的结果。

use Swoole\Coroutine\Http\Client;

$site1 = Futures\async(function() {
    $client = new Client('www.google.com', 443, true);
    $client->get('/');
    return $client->body;
});

$site2 = Futures\async(function() {
    $client = new Client('www.swoole.co.uk', 443, true);
    $client->get('/');
    return $client->body;
});

$site3 = Futures\async(function() {
    $client = new Client('leocavalcante.dev', 443, true);
    $client->get('/');
    return $client->body;
});

$first_to_load = Futures\race([$site1, $site2, $site3]);

echo $first_to_load;

还有一个 Futures\select 别名。

异步映射

将数组映射到一系列并发运行的 Futures 中。

$list = [1, 2, 3];
$multiply = fn(int $a) => fn(int $b) => $a * $b;
$double = $multiply(2);

$doubles = Futures\join(Futures\async_map($list, $double))->await();

print_r($doubles);

然后

对 Future 的步骤进行排序,是 join 的顺序版本

use function Futures\async;

$future = async(fn() => 2)
    ->then(fn(int $i) => async(fn() => $i + 3))
    ->then(fn(int $i) => async(fn() => $i * 4))
    ->then(fn(int $i) => async(fn() => $i - 5));

echo $future->await(); // 15

在操作之间从 sinklisten 流转值/事件。

$stream = Futures\stream()
    ->map(fn($val) => $val + 1)
    ->filter(fn($val) => $val % 2 === 0)
    ->map(fn($val) => $val * 2)
    ->listen(fn($val) => print("$val\n")); // 4 8 12 16 20

foreach (range(0, 9) as $n) {
    $stream->sink($n);
}

MIT © 2020