react/async

ReactPHP 的异步工具和纤程

维护者

详细信息

github.com/reactphp/async

源代码

问题

资助包维护!
开放集体

安装 3,766,526

依赖者: 138

建议者: 4

安全: 0

星级: 193

观察者: 16

分支: 18

开放问题: 5

v4.3.0 2024-06-04 14:40 UTC

This package is auto-updated.

Last update: 2024-08-27 08:46:55 UTC


README

CI status installs on Packagist

ReactPHP 提供异步工具和纤程。

该库允许您管理异步控制流。它提供了一系列基于 Promise 的 API。您可以将回调声明为列表,并以异步方式顺序解决。React/Async 不会自动将阻塞代码更改为异步。您需要实际的事件循环以及与该事件循环交互的非阻塞库才能使其工作。只要您有一个在事件循环中运行的基于 Promise 的 API,就可以使用此库。

目录

用法

此轻量级库仅包含几个简单函数。所有函数都位于 React\Async 命名空间下。

以下示例使用完全限定名称引用所有函数,如下所示

React\Async\await(…);

从 PHP 5.6+ 开始,您还可以像这样将每个所需函数导入到您的代码中

use function React\Async\await;

await(…);

或者,您也可以使用类似这样的导入语句

use React\Async;

Async\await(…);

async()

函数 async(callable():(PromiseInterface<T>|T) $function): (callable():PromiseInterface<T>) 可以用于返回一个异步函数,该函数内部使用 await()

此函数专门设计用于补充 await() 函数。从调用代码的角度来看,await() 函数 可以视为 阻塞。您可以通过将其包装在 async() 函数调用中来避免此阻塞行为。此函数内的所有内容仍然会被阻塞,但此函数之外的所有内容都可以异步执行而不会阻塞。

Loop::addTimer(0.5, React\Async\async(function () {
    echo 'a';
    React\Async\await(React\Promise\Timer\sleep(1.0));
    echo 'c';
}));

Loop::addTimer(1.0, function () {
    echo 'b';
});

// prints "a" at t=0.5s
// prints "b" at t=1.0s
// prints "c" at t=1.5s

有关更多详细信息,请参阅 await() 函数

请注意,此函数仅与 await() 函数 配合使用。特别是,此函数不会“神奇”地将任何阻塞函数转换为非阻塞函数

Loop::addTimer(0.5, React\Async\async(function () {
    echo 'a';
    sleep(1); // broken: using PHP's blocking sleep() for demonstration purposes
    echo 'c';
}));

Loop::addTimer(1.0, function () {
    echo 'b';
});

// prints "a" at t=0.5s
// prints "c" at t=1.5s: Correct timing, but wrong order
// prints "b" at t=1.5s: Triggered too late because it was blocked

作为替代方案,您应始终确保与 await() 函数 以及一个返回 Promise 的异步 API 一起使用此函数,如上一个示例所示。

async() 函数专门为用作回调(例如事件循环计时器、事件监听器或 Promise 回调)的情况而设计。因此,它返回一个新函数,该函数包装了给定的 $function 而不是直接调用它并返回其值。

use function React\Async\async;

Loop::addTimer(1.0, async(function () { … }));
$connection->on('close', async(function () { … }));
$stream->on('data', async(function ($data) { … }));
$promise->then(async(function (int $result) { … }));

您可以调用此包装函数以使用给定的任何参数调用 $function。该函数始终返回一个 Promise,该 Promise 将使用 $function 返回的内容来满足。同样,如果从您的 $function 中抛出 ExceptionThrowable,它将返回一个 Promise。这允许您轻松创建基于 Promise 的函数

$promise = React\Async\async(function (): int {
    $browser = new React\Http\Browser();
    $urls = [
        'https://example.com/alice',
        'https://example.com/bob'
    ];

    $bytes = 0;
    foreach ($urls as $url) {
        $response = React\Async\await($browser->get($url));
        assert($response instanceof Psr\Http\Message\ResponseInterface);
        $bytes += $response->getBody()->getSize();
    }
    return $bytes;
})();

$promise->then(function (int $bytes) {
    echo 'Total size: ' . $bytes . PHP_EOL;
}, function (Exception $e) {
    echo 'Error: ' . $e->getMessage() . PHP_EOL;
});

上一个例子使用循环中的await()来突出如何极大地简化异步操作的消耗。同时,这个简单的例子没有利用并发执行,因为它将基本上在每次操作之间“await”。为了利用给定$function中的并发执行,您可以通过使用单个await()以及像这样的基于Promise的原始方法来“await”多个promise。

$promise = React\Async\async(function (): int {
    $browser = new React\Http\Browser();
    $urls = [
        'https://example.com/alice',
        'https://example.com/bob'
    ];

    $promises = [];
    foreach ($urls as $url) {
        $promises[] = $browser->get($url);
    }

    try {
        $responses = React\Async\await(React\Promise\all($promises));
    } catch (Exception $e) {
        foreach ($promises as $promise) {
            $promise->cancel();
        }
        throw $e;
    }

    $bytes = 0;
    foreach ($responses as $response) {
        assert($response instanceof Psr\Http\Message\ResponseInterface);
        $bytes += $response->getBody()->getSize();
    }
    return $bytes;
})();

$promise->then(function (int $bytes) {
    echo 'Total size: ' . $bytes . PHP_EOL;
}, function (Exception $e) {
    echo 'Error: ' . $e->getMessage() . PHP_EOL;
});

返回的promise是以一种方式实现的,可以在它仍然挂起时取消。取消挂起的promise将取消该fiber或任何嵌套fiber中的任何awaited promises。因此,以下示例将只输出ab并取消挂起的delay()。本例中的await()调用将抛出一个从已取消的delay()调用中抛出的RuntimeException,并通过fibers向上传递。

$promise = async(static function (): int {
    echo 'a';
    await(async(static function (): void {
        echo 'b';
        delay(2);
        echo 'c';
    })());
    echo 'd';

    return time();
})();

$promise->cancel();
await($promise);

await()

可以使用await(PromiseInterface<T> $promise): T函数来阻塞等待给定的$promise得到满足。

$result = React\Async\await($promise);

此函数仅在给定的$promise解决后才会返回,即已解决或被拒绝。当promise挂起时,从调用代码的角度来看,此函数可以被认为是阻塞的。您可以通过将其包裹在一个async()函数调用中来避免这种阻塞行为。此函数内部的所有内容仍然会被阻塞,但此函数之外的所有内容可以异步执行而不会阻塞。

Loop::addTimer(0.5, React\Async\async(function () {
    echo 'a';
    React\Async\await(React\Promise\Timer\sleep(1.0));
    echo 'c';
}));

Loop::addTimer(1.0, function () {
    echo 'b';
});

// prints "a" at t=0.5s
// prints "b" at t=1.0s
// prints "c" at t=1.5s

有关更多详细信息,请参阅async()函数。

一旦promise得到满足,此函数将返回promise解决的任何内容。

一旦promise被拒绝,这将抛出promise拒绝的任何内容。如果promise没有使用ExceptionThrowable拒绝,则此函数将抛出一个UnexpectedValueException

try {
    $result = React\Async\await($promise);
    // promise successfully fulfilled with $result
    echo 'Result: ' . $result;
} catch (Throwable $e) {
    // promise rejected with $e
    echo 'Error: ' . $e->getMessage();
}

coroutine()

可以使用coroutine(callable(mixed ...$args):(\Generator|PromiseInterface<T>|T) $function, mixed ...$args): PromiseInterface<T>函数来执行基于Generator的协程以“await”promise。

React\Async\coroutine(function () {
    $browser = new React\Http\Browser();

    try {
        $response = yield $browser->get('https://example.com/');
        assert($response instanceof Psr\Http\Message\ResponseInterface);
        echo $response->getBody();
    } catch (Exception $e) {
        echo 'Error: ' . $e->getMessage() . PHP_EOL;
    }
});

使用基于Generator的协程是直接使用底层promise API的替代方案。对于许多用例,这使得使用基于promise的API更加简单,因为它更接近于同步代码流。上面的例子执行了直接使用promise API的等效操作。

$browser = new React\Http\Browser();

$browser->get('https://example.com/')->then(function (Psr\Http\Message\ResponseInterface $response) {
    echo $response->getBody();
}, function (Exception $e) {
    echo 'Error: ' . $e->getMessage() . PHP_EOL;
});

可以使用yield关键字“await”promise的解决。内部,它将整个给定的$function转换为Generator。这允许在promise解决时中断执行并从同一位置恢复。yield语句返回promise解决的任何内容。如果promise被拒绝,它将抛出一个ExceptionThrowable

coroutine()函数将始终返回一个promise,该promise将被你的$function返回的内容满足。同样,它将返回一个promise,如果你从你的$function中抛出一个ExceptionThrowable,它将被拒绝。这允许你轻松地创建基于Promise的函数。

$promise = React\Async\coroutine(function () {
    $browser = new React\Http\Browser();
    $urls = [
        'https://example.com/alice',
        'https://example.com/bob'
    ];

    $bytes = 0;
    foreach ($urls as $url) {
        $response = yield $browser->get($url);
        assert($response instanceof Psr\Http\Message\ResponseInterface);
        $bytes += $response->getBody()->getSize();
    }
    return $bytes;
});

$promise->then(function (int $bytes) {
    echo 'Total size: ' . $bytes . PHP_EOL;
}, function (Exception $e) {
    echo 'Error: ' . $e->getMessage() . PHP_EOL;
});

上一个例子使用循环中的yield语句来突出如何极大地简化异步操作的消耗。同时,这个简单的例子没有利用并发执行,因为它将基本上在每次操作之间“await”。为了利用给定$function中的并发执行,您可以通过使用单个yield以及像这样的基于Promise的原始方法来“await”多个promise。

$promise = React\Async\coroutine(function () {
    $browser = new React\Http\Browser();
    $urls = [
        'https://example.com/alice',
        'https://example.com/bob'
    ];

    $promises = [];
    foreach ($urls as $url) {
        $promises[] = $browser->get($url);
    }

    try {
        $responses = yield React\Promise\all($promises);
    } catch (Exception $e) {
        foreach ($promises as $promise) {
            $promise->cancel();
        }
        throw $e;
    }

    $bytes = 0;
    foreach ($responses as $response) {
        assert($response instanceof Psr\Http\Message\ResponseInterface);
        $bytes += $response->getBody()->getSize();
    }
    return $bytes;
});

$promise->then(function (int $bytes) {
    echo 'Total size: ' . $bytes . PHP_EOL;
}, function (Exception $e) {
    echo 'Error: ' . $e->getMessage() . PHP_EOL;
});

delay()

可以使用 delay(float $seconds): void 函数来延迟程序执行,延迟时间由 $seconds 参数指定。

React\Async\delay($seconds);

该函数将在给定的 $seconds 秒数过去后返回。如果没有其他事件附加到该循环,则其行为类似于 PHP 的 sleep() 函数

echo 'a';
React\Async\delay(1.0);
echo 'b';

// prints "a" at t=0.0s
// prints "b" at t=1.0s

与 PHP 的 sleep() 函数不同,此函数不一定停止整个进程线程的执行。相反,它允许事件循环运行同一循环中附加的任何其他事件,直到延迟返回。

echo 'a';
Loop::addTimer(1.0, function (): void {
    echo 'b';
});
React\Async\delay(3.0);
echo 'c';

// prints "a" at t=0.0s
// prints "b" at t=1.0s
// prints "c" at t=3.0s

这种行为在需要延迟特定例程的执行时特别有用,例如在构建简单的轮询或重试机制时。

try {
    something();
} catch (Throwable) {
    // in case of error, retry after a short delay
    React\Async\delay(1.0);
    something();
}

由于此函数仅在一段时间过去后返回,因此从调用代码的角度来看,它可以被认为是 阻塞的。您可以通过将其包装在 async() 函数 调用来避免此阻塞行为。此函数内的所有内容仍将被阻塞,但此函数之外的所有内容都可以异步执行,而不会阻塞。

Loop::addTimer(0.5, React\Async\async(function (): void {
    echo 'a';
    React\Async\delay(1.0);
    echo 'c';
}));

Loop::addTimer(1.0, function (): void {
    echo 'b';
});

// prints "a" at t=0.5s
// prints "b" at t=1.0s
// prints "c" at t=1.5s

有关更多详细信息,请参阅async()函数。

内部,$seconds 参数将用作循环的计时器,以便它在计时器触发之前继续运行。这意味着如果您传递一个非常小(或负数)的值,它仍然会启动一个计时器,因此将在将来可能的最早时间触发。

该函数的实现方式是,当它在 async() 函数 内运行时可以被取消。取消生成的承诺将清理任何挂起的计时器,并从挂起的延迟中抛出一个 RuntimeException,从而拒绝生成的承诺。

$promise = async(function (): void {
    echo 'a';
    delay(3.0);
    echo 'b';
})();

Loop::addTimer(2.0, function () use ($promise): void {
    $promise->cancel();
});

// prints "a" at t=0.0s
// rejects $promise at t=2.0
// never prints "b"

parallel()

可以使用以下方式使用 parallel(iterable> $tasks): PromiseInterface>> 函数

<?php

use React\EventLoop\Loop;
use React\Promise\Promise;

React\Async\parallel([
    function () {
        return new Promise(function ($resolve) {
            Loop::addTimer(1, function () use ($resolve) {
                $resolve('Slept for a whole second');
            });
        });
    },
    function () {
        return new Promise(function ($resolve) {
            Loop::addTimer(1, function () use ($resolve) {
                $resolve('Slept for another whole second');
            });
        });
    },
    function () {
        return new Promise(function ($resolve) {
            Loop::addTimer(1, function () use ($resolve) {
                $resolve('Slept for yet another whole second');
            });
        });
    },
])->then(function (array $results) {
    foreach ($results as $result) {
        var_dump($result);
    }
}, function (Exception $e) {
    echo 'Error: ' . $e->getMessage() . PHP_EOL;
});

series()

可以使用以下方式使用 series(iterable>> $tasks): PromiseInterface>> 函数

<?php

use React\EventLoop\Loop;
use React\Promise\Promise;

React\Async\series([
    function () {
        return new Promise(function ($resolve) {
            Loop::addTimer(1, function () use ($resolve) {
                $resolve('Slept for a whole second');
            });
        });
    },
    function () {
        return new Promise(function ($resolve) {
            Loop::addTimer(1, function () use ($resolve) {
                $resolve('Slept for another whole second');
            });
        });
    },
    function () {
        return new Promise(function ($resolve) {
            Loop::addTimer(1, function () use ($resolve) {
                $resolve('Slept for yet another whole second');
            });
        });
    },
])->then(function (array $results) {
    foreach ($results as $result) {
        var_dump($result);
    }
}, function (Exception $e) {
    echo 'Error: ' . $e->getMessage() . PHP_EOL;
});

waterfall()

可以使用以下方式使用 waterfall(iterable>> $tasks): PromiseInterface> 函数

<?php

use React\EventLoop\Loop;
use React\Promise\Promise;

$addOne = function ($prev = 0) {
    return new Promise(function ($resolve) use ($prev) {
        Loop::addTimer(1, function () use ($prev, $resolve) {
            $resolve($prev + 1);
        });
    });
};

React\Async\waterfall([
    $addOne,
    $addOne,
    $addOne
])->then(function ($prev) {
    echo "Final result is $prev\n";
}, function (Exception $e) {
    echo 'Error: ' . $e->getMessage() . PHP_EOL;
});

待办事项

  • 实现 queue() 函数

安装

推荐使用 Composer 安装此库。 您是 Composer 新手?

此项目遵循 SemVer。这将安装从该分支的最新支持的版本

composer require react/async:^4.3

有关版本升级的详细信息,请参阅 CHANGELOG

此项目旨在在任何平台上运行,因此不需要任何 PHP 扩展,并支持在 PHP 8.1+ 上运行。强烈建议为此项目使用最新的支持 PHP 版本

我们致力于提供长期支持(LTS)选项,并提供平滑的升级路径。如果您使用的是较旧的 PHP 版本,您可以使用 3.x 分支(PHP 7.1+)或 2.x 分支(PHP 5.3+),这两个分支都提供兼容的 API,但未利用新语言功能。您可以同时针对多个版本进行目标定位,以支持更广泛的 PHP 版本,如下所示

composer require "react/async:^4 || ^3 || ^2"

测试

要运行测试套件,您首先需要克隆此存储库,然后通过 Composer 安装所有依赖项

composer install

要运行测试套件,请转到项目根目录并运行

vendor/bin/phpunit

除此之外,我们使用PHPStan的最大级别来确保整个项目中的类型安全。

vendor/bin/phpstan

许可

遵循MIT许可,见许可文件

此项目深受async.js的影响。