react / promise-stream
ReactPHP之间Promise-land和Stream-land的缺失链接
Requires
- php: >=5.3
- react/promise: ^3 || ^2.1 || ^1.2
- react/stream: ^1.0 || ^0.7 || ^0.6 || ^0.5 || ^0.4.6
Requires (Dev)
- phpunit/phpunit: ^9.6 || ^5.7 || ^4.8.36
README
ReactPHP之间Promise-land和Stream-land的缺失链接。
目录
用法
这个轻量级的库仅包含几个简单的函数。所有函数都位于 React\Promise\Stream
命名空间下。
以下示例引用了所有函数,例如使用它们的完全限定名称
React\Promise\Stream\buffer(…);
从PHP 5.6+开始,您也可以将每个必需的函数导入到代码中,例如
use function React\Promise\Stream\buffer; buffer(…);
或者,您也可以使用类似的导入语句
use React\Promise\Stream; Stream\buffer(…);
buffer()
buffer(ReadableStreamInterface<string> $stream, ?int $maxLength = null): PromiseInterface<string>
函数可用于创建一个 Promise
,该Promise将在流数据缓冲区被满足。
$stream = accessSomeJsonStream(); React\Promise\Stream\buffer($stream)->then(function (string $contents) { var_dump(json_decode($contents)); });
一旦流关闭,Promise将使用所有数据块连接成一个 string
来满足。
如果流已经关闭,Promise将使用一个空的 string
来满足。
如果流发生错误,Promise将使用一个 RuntimeException
来拒绝。
如果它被取消,Promise将使用一个 RuntimeException
来拒绝。
可选的 $maxLength
参数默认无限制。如果提供了最大长度,并且流在结束之前发出更多数据,Promise将使用一个 OverflowException
来拒绝。
$stream = accessSomeToLargeStream(); React\Promise\Stream\buffer($stream, 1024)->then(function ($contents) { var_dump(json_decode($contents)); }, function ($error) { // Reaching here when the stream buffer goes above the max size, // in this example that is 1024 bytes, // or when the stream emits an error. });
first()
first(ReadableStreamInterface|WritableStreamInterface $stream, string $event = 'data'): PromiseInterface<mixed>
函数可用于创建一个 Promise
,该Promise将在给定事件首次触发时满足。
$stream = accessSomeJsonStream(); React\Promise\Stream\first($stream)->then(function (string $chunk) { echo 'The first chunk arrived: ' . $chunk; });
Promise将使用第一次发出的事件的 mixed
值来满足,如果事件没有传递任何数据,则为 null
。如果您不传递自定义事件名称,则它将等待第一个 "data" 事件。对于常见的 ReadableStreamInterface<string>
类型流,这意味着它将使用包含第一个数据块的字符串来满足。
如果流发生错误,Promise将使用一个 RuntimeException
来拒绝 - 除非您正在等待 "error" 事件,在这种情况下它将满足。
一旦流关闭,Promise将使用一个 RuntimeException
来拒绝 - 除非您正在等待 "close" 事件,在这种情况下它将满足。
如果流已经关闭,Promise将使用一个 RuntimeException
来拒绝。
如果它被取消,Promise将使用一个 RuntimeException
来拒绝。
all()
all(ReadableStreamInterface|WritableStreamInterface $stream, string $event = 'data'): PromiseInterface<array>
函数可用于创建一个 Promise
,该Promise将使用所有事件数据来满足。
$stream = accessSomeJsonStream(); React\Promise\Stream\all($stream)->then(function (array $chunks) { echo 'The stream consists of ' . count($chunks) . ' chunk(s)'; });
一旦流关闭,Promise将使用一个 array
来满足。数组将包含所有发出的事件或 null
值,如果事件没有传递任何数据。如果您不传递自定义事件名称,则它将等待所有 "data" 事件。对于常见的 ReadableStreamInterface<string>
类型流,这意味着它将使用包含所有数据块的字符串数组来满足。
如果流已经关闭,Promise将使用一个空的 array
来满足。
如果流发生错误,Promise将使用一个 RuntimeException
来拒绝。
如果它被取消,Promise将使用一个 RuntimeException
来拒绝。
unwrapReadable()
函数 unwrapReadable(PromiseInterface<ReadableStreamInterface<T>> $promise): ReadableStreamInterface<T>
可以用来解包一个将被 ReadableStreamInterface<T>
填充的 Promise
。
此函数立即返回一个可读流实例(实现 ReadableStreamInterface<T>
),作为未来承诺解决的代理。一旦给定的 Promise 被一个 ReadableStreamInterface<T>
填充,其数据将被传输到输出流。
//$promise = someFunctionWhichResolvesWithAStream(); $promise = startDownloadStream($uri); $stream = React\Promise\Stream\unwrapReadable($promise); $stream->on('data', function (string $data) { echo $data; }); $stream->on('end', function () { echo 'DONE'; });
如果给定的 Promise 被拒绝,或者以非 ReadableStreamInterface
实例填充,则输出流将发出一个 error
事件并关闭。
$promise = startDownloadStream($invalidUri); $stream = React\Promise\Stream\unwrapReadable($promise); $stream->on('error', function (Exception $error) { echo 'Error: ' . $error->getMessage(); });
给定的 $promise
应该是挂起的,即它不应该在调用此函数时已经解决。如果给定的 Promise 已经解决且不填充一个 ReadableStreamInterface
实例,则您将无法接收到 error
事件。
您可以在任何时候 close()
结果流,这将尝试 cancel()
挂起的承诺或尝试 close()
基础流。
$promise = startDownloadStream($uri); $stream = React\Promise\Stream\unwrapReadable($promise); $loop->addTimer(2.0, function () use ($stream) { $stream->close(); });
unwrapWritable()
函数 unwrapWritable(PromiseInterface<WritableStreamInterface<T>> $promise): WritableStreamInterface<T>
可以用来解包一个将被 WritableStreamInterface<T>
填充的 Promise
。
此函数立即返回一个可写流实例(实现 WritableStreamInterface<T>
),作为未来承诺解决的代理。对此实例的任何写入都将缓冲在内存中,直到承诺解决。一旦给定的 Promise 被一个 WritableStreamInterface<T>
填充,您写入代理的数据将被透明地转发到内部流。
//$promise = someFunctionWhichResolvesWithAStream(); $promise = startUploadStream($uri); $stream = React\Promise\Stream\unwrapWritable($promise); $stream->write('hello'); $stream->end('world'); $stream->on('close', function () { echo 'DONE'; });
如果给定的 Promise 被拒绝,或者以非 WritableStreamInterface
实例填充,则输出流将发出一个 error
事件并关闭。
$promise = startUploadStream($invalidUri); $stream = React\Promise\Stream\unwrapWritable($promise); $stream->on('error', function (Exception $error) { echo 'Error: ' . $error->getMessage(); });
给定的 $promise
应该是挂起的,即它不应该在调用此函数时已经解决。如果给定的 Promise 已经解决且不填充一个 WritableStreamInterface
实例,则您将无法接收到 error
事件。
您可以在任何时候 close()
结果流,这将尝试 cancel()
挂起的承诺或尝试 close()
基础流。
$promise = startUploadStream($uri); $stream = React\Promise\Stream\unwrapWritable($promise); $loop->addTimer(2.0, function () use ($stream) { $stream->close(); });
安装
建议通过 Composer 安装此库。 Composer 初学者?
此项目遵循 SemVer。这将安装最新的支持版本。
composer require react/promise-stream:^1.7
有关版本升级的详细信息,请参阅 变更日志。
此项目旨在在任何平台上运行,因此不需要任何 PHP 扩展,并支持从旧版 PHP 5.3 到当前 PHP 8+ 和 HHVM 的运行。强烈建议为此项目使用最新的支持版本。
测试
要运行测试套件,您首先需要克隆此仓库,然后通过 Composer 安装所有依赖项。
composer install
要运行测试套件,请转到项目根目录并运行
vendor/bin/phpunit
许可证
MIT,请参阅 许可文件。