react/promise-stream

ReactPHP之间Promise-land和Stream-land的缺失链接

资助包维护!
Open Collective

安装次数: 11,337,421

依赖: 45

建议者: 0

安全: 0

星标: 112

关注者: 10

分支: 13

公开问题: 1

v1.7.0 2023-12-13 11:32 UTC

This package is auto-updated.

Last update: 2024-09-13 13:00:02 UTC


README

CI status installs on Packagist

ReactPHP之间Promise-land和Stream-land的缺失链接。

目录

用法

这个轻量级的库仅包含几个简单的函数。所有函数都位于 React\Promise\Stream 命名空间下。

以下示例引用了所有函数,例如使用它们的完全限定名称

React\Promise\Stream\buffer(…);

从PHP 5.6+开始,您也可以将每个必需的函数导入到代码中,例如

use function React\Promise\Stream\buffer;

buffer(…);

或者,您也可以使用类似的导入语句

use React\Promise\Stream;

Stream\buffer(…);

buffer()

buffer(ReadableStreamInterface<string> $stream, ?int $maxLength = null): PromiseInterface<string> 函数可用于创建一个 Promise,该Promise将在流数据缓冲区被满足。

$stream = accessSomeJsonStream();

React\Promise\Stream\buffer($stream)->then(function (string $contents) {
    var_dump(json_decode($contents));
});

一旦流关闭,Promise将使用所有数据块连接成一个 string 来满足。

如果流已经关闭,Promise将使用一个空的 string 来满足。

如果流发生错误,Promise将使用一个 RuntimeException 来拒绝。

如果它被取消,Promise将使用一个 RuntimeException 来拒绝。

可选的 $maxLength 参数默认无限制。如果提供了最大长度,并且流在结束之前发出更多数据,Promise将使用一个 OverflowException 来拒绝。

$stream = accessSomeToLargeStream();

React\Promise\Stream\buffer($stream, 1024)->then(function ($contents) {
    var_dump(json_decode($contents));
}, function ($error) {
    // Reaching here when the stream buffer goes above the max size,
    // in this example that is 1024 bytes,
    // or when the stream emits an error.
});

first()

first(ReadableStreamInterface|WritableStreamInterface $stream, string $event = 'data'): PromiseInterface<mixed> 函数可用于创建一个 Promise,该Promise将在给定事件首次触发时满足。

$stream = accessSomeJsonStream();

React\Promise\Stream\first($stream)->then(function (string $chunk) {
    echo 'The first chunk arrived: ' . $chunk;
});

Promise将使用第一次发出的事件的 mixed 值来满足,如果事件没有传递任何数据,则为 null。如果您不传递自定义事件名称,则它将等待第一个 "data" 事件。对于常见的 ReadableStreamInterface<string> 类型流,这意味着它将使用包含第一个数据块的字符串来满足。

如果流发生错误,Promise将使用一个 RuntimeException 来拒绝 - 除非您正在等待 "error" 事件,在这种情况下它将满足。

一旦流关闭,Promise将使用一个 RuntimeException 来拒绝 - 除非您正在等待 "close" 事件,在这种情况下它将满足。

如果流已经关闭,Promise将使用一个 RuntimeException 来拒绝。

如果它被取消,Promise将使用一个 RuntimeException 来拒绝。

all()

all(ReadableStreamInterface|WritableStreamInterface $stream, string $event = 'data'): PromiseInterface<array> 函数可用于创建一个 Promise,该Promise将使用所有事件数据来满足。

$stream = accessSomeJsonStream();

React\Promise\Stream\all($stream)->then(function (array $chunks) {
    echo 'The stream consists of ' . count($chunks) . ' chunk(s)';
});

一旦流关闭,Promise将使用一个 array 来满足。数组将包含所有发出的事件或 null 值,如果事件没有传递任何数据。如果您不传递自定义事件名称,则它将等待所有 "data" 事件。对于常见的 ReadableStreamInterface<string> 类型流,这意味着它将使用包含所有数据块的字符串数组来满足。

如果流已经关闭,Promise将使用一个空的 array 来满足。

如果流发生错误,Promise将使用一个 RuntimeException 来拒绝。

如果它被取消,Promise将使用一个 RuntimeException 来拒绝。

unwrapReadable()

函数 unwrapReadable(PromiseInterface<ReadableStreamInterface<T>> $promise): ReadableStreamInterface<T> 可以用来解包一个将被 ReadableStreamInterface<T> 填充的 Promise

此函数立即返回一个可读流实例(实现 ReadableStreamInterface<T>),作为未来承诺解决的代理。一旦给定的 Promise 被一个 ReadableStreamInterface<T> 填充,其数据将被传输到输出流。

//$promise = someFunctionWhichResolvesWithAStream();
$promise = startDownloadStream($uri);

$stream = React\Promise\Stream\unwrapReadable($promise);

$stream->on('data', function (string $data) {
    echo $data;
});

$stream->on('end', function () {
    echo 'DONE';
});

如果给定的 Promise 被拒绝,或者以非 ReadableStreamInterface 实例填充,则输出流将发出一个 error 事件并关闭。

$promise = startDownloadStream($invalidUri);

$stream = React\Promise\Stream\unwrapReadable($promise);

$stream->on('error', function (Exception $error) {
    echo 'Error: ' . $error->getMessage();
});

给定的 $promise 应该是挂起的,即它不应该在调用此函数时已经解决。如果给定的 Promise 已经解决且不填充一个 ReadableStreamInterface 实例,则您将无法接收到 error 事件。

您可以在任何时候 close() 结果流,这将尝试 cancel() 挂起的承诺或尝试 close() 基础流。

$promise = startDownloadStream($uri);

$stream = React\Promise\Stream\unwrapReadable($promise);

$loop->addTimer(2.0, function () use ($stream) {
    $stream->close();
});

unwrapWritable()

函数 unwrapWritable(PromiseInterface<WritableStreamInterface<T>> $promise): WritableStreamInterface<T> 可以用来解包一个将被 WritableStreamInterface<T> 填充的 Promise

此函数立即返回一个可写流实例(实现 WritableStreamInterface<T>),作为未来承诺解决的代理。对此实例的任何写入都将缓冲在内存中,直到承诺解决。一旦给定的 Promise 被一个 WritableStreamInterface<T> 填充,您写入代理的数据将被透明地转发到内部流。

//$promise = someFunctionWhichResolvesWithAStream();
$promise = startUploadStream($uri);

$stream = React\Promise\Stream\unwrapWritable($promise);

$stream->write('hello');
$stream->end('world');

$stream->on('close', function () {
    echo 'DONE';
});

如果给定的 Promise 被拒绝,或者以非 WritableStreamInterface 实例填充,则输出流将发出一个 error 事件并关闭。

$promise = startUploadStream($invalidUri);

$stream = React\Promise\Stream\unwrapWritable($promise);

$stream->on('error', function (Exception $error) {
    echo 'Error: ' . $error->getMessage();
});

给定的 $promise 应该是挂起的,即它不应该在调用此函数时已经解决。如果给定的 Promise 已经解决且不填充一个 WritableStreamInterface 实例,则您将无法接收到 error 事件。

您可以在任何时候 close() 结果流,这将尝试 cancel() 挂起的承诺或尝试 close() 基础流。

$promise = startUploadStream($uri);

$stream = React\Promise\Stream\unwrapWritable($promise);

$loop->addTimer(2.0, function () use ($stream) {
    $stream->close();
});

安装

建议通过 Composer 安装此库。 Composer 初学者?

此项目遵循 SemVer。这将安装最新的支持版本。

composer require react/promise-stream:^1.7

有关版本升级的详细信息,请参阅 变更日志

此项目旨在在任何平台上运行,因此不需要任何 PHP 扩展,并支持从旧版 PHP 5.3 到当前 PHP 8+ 和 HHVM 的运行。强烈建议为此项目使用最新的支持版本。

测试

要运行测试套件,您首先需要克隆此仓库,然后通过 Composer 安装所有依赖项。

composer install

要运行测试套件,请转到项目根目录并运行

vendor/bin/phpunit

许可证

MIT,请参阅 许可文件