brunonatali/stream

ReactPHP中用于非阻塞I/O的基于事件驱动的可读和可写流,具有特定的套接字实现。

dev-master 2019-09-25 03:22 UTC

This package is auto-updated.

Last update: 2024-09-25 15:56:02 UTC


README

  • 从 'ReactPHP' 原封不动复制,并对一些测试函数和特定使用实现进行了更改。
  • 删除了原始的 'tests' 和 'examples' 文件夹。

Build Status

ReactPHP中用于非阻塞I/O的基于事件驱动的可读和可写流。

为了使EventLoop更容易使用,此组件引入了强大的“流”概念。流允许您以小批量处理大量数据(例如多吉字节文件下载),而不必一次将所有内容存储在内存中。它们与PHP本身中的流非常相似,但具有更适合异步、非阻塞I/O的接口。

目录

流的使用

ReactPHP在其生态系统中使用“流”的概念,以提供对任意数据内容和大小的流的一致性高级抽象。虽然流本身是一个相当低级的概念,但它可以用作强大的抽象,在顶层构建组件和协议。

如果您对这个概念不熟悉,可以将它们想象成水管:您可以从源消费水,或者您可以生产水并将其(通过管道)传输到任何目的地(汇)。

同样,流可以是

  • 可读的(例如 STDIN 终端输入)或
  • 可写的(例如 STDOUT 终端输出)或
  • 全双工的(可读和可写,例如TCP/IP连接)

因此,此包定义了以下三个接口

ReadableStreamInterface

ReadableStreamInterface 负责提供只读流和全双工流的可读端的接口。

除了定义一些方法之外,此接口还实现了 EventEmitterInterface,允许您响应某些事件。

事件回调函数必须是有效的 callable,必须遵守严格的参数定义,并且必须接受与文档中描述的精确事件参数。事件回调函数不得抛出 Exception。事件回调函数的返回值将被忽略并没有任何效果,因此出于性能原因,建议不要返回任何过多的数据结构。

此接口的每个实现都必须遵循以下事件语义才能被视为良好行为的流。

请注意,此接口的高级实现可以选择定义具有专用语义的额外事件,这些语义不是作为此低级流规范的一部分定义的。对这些事件语义的遵守超出此接口的范围,因此您可能还需要参考此类高级实现的文档。

data事件

当从此源流读取/接收数据时,将发出 data 事件。该事件接收一个混合参数作为传入数据。

$stream->on('data', function ($data) {
    echo $data;
});

此事件可能被触发多次,如果整个流没有发送任何数据,则可能一次也不会触发。它不应该在endclose事件之后触发。

给定的$data参数可以是混合类型,但通常建议它应该是一个string值或可以使用允许表示为string的类型,以实现最大兼容性。

许多常见的流(如TCP/IP连接或基于文件的流)将作为string值的块来触发接收到的原始(二进制)有效负载数据。

由于这种基于流的特性,发送者可能发送任意数量的不同大小的块。无法保证这些块将以发送者意图发送的精确帧格式接收。换句话说,许多底层协议(如TCP/IP)以从单个字节到几十千字节之间的块来传输数据。您可能需要在这些底层数据块上应用更高层的协议,以实现正确的消息帧。

end事件

当源流成功到达流的末尾(EOF)时,将触发end事件。

$stream->on('end', function () {
    echo 'END';
});

根据是否检测到成功的结束,此事件应该只触发一次或根本不触发。它不应该在之前的endclose事件之后触发。如果流因为非成功结束而关闭(例如,在之前的error事件之后),则不得触发此事件。

流结束之后,它必须切换到非可读模式,也请参阅isReadable()

只有当成功到达end时,此事件才会触发,而不是如果流被不可恢复的错误或显式关闭中断。并非所有流都了解“成功结束”这一概念。许多用例涉及检测流何时关闭(终止),在这种情况下,您应使用close事件。在流触发end事件后,通常会跟随一个close事件。

许多常见的流(如TCP/IP连接或基于文件的流)会在远程端关闭连接或成功读取到文件末尾(EOF)时触发此事件。

请注意,不要将此事件与end()方法混淆。此事件定义了从源流成功读取结束,而end()方法定义了向目标流写入成功的结束。

error事件

一旦发生致命错误,通常在尝试从流中读取时,将触发error事件。事件接收一个用于错误实例的单一Exception参数。

$server->on('error', function (Exception $e) {
    echo 'Error: ' . $e->getMessage() . PHP_EOL;
});

在流检测到致命错误(如致命传输错误或意外的data或过早的end事件)后,应该只触发一次此事件。它不应该在之前的errorendclose事件之后触发。如果这不是致命错误条件(例如,没有造成数据丢失的暂时性网络问题),则不得触发此事件。

流发生错误后,它必须关闭流,因此应该跟随一个close事件,然后切换到非可读模式,也请参阅close()isReadable()

许多常见的流(如TCP/IP连接或基于文件的流)仅处理数据传输,并不假设数据边界(如意外的data或过早的end事件)。换句话说,许多底层协议(如TCP/IP)可能只选择在发生一次致命传输错误后触发此事件,然后响应地关闭(终止)流。

如果此流是DuplexStreamInterface,您还应注意到流的可写端也实现了error事件。换句话说,在读取或写入流的过程中可能发生错误,这将导致相同的错误处理。

close事件

当流关闭(终止)时,将触发close事件。

$stream->on('close', function () {
    echo 'CLOSED';
});

此事件应根据流是否终止而发出一次或永不发出。它不应在之前的 close 事件之后发出。

流关闭后,它必须切换到非可读模式,也请参阅 isReadable()

end 事件不同,此事件应在流关闭时发出,无论这是由于不可恢复的错误隐式发生还是由于任一方显式关闭流。如果您只想检测一个 成功的 结束,请改用 end 事件。

许多常见的流(如TCP/IP连接或基于文件的流)可能会选择在读取一个 成功的 end 事件或致命传输 error 事件后发出此事件。

如果此流是 DuplexStreamInterface,您还应注意到流的可写端也实现了 close 事件。换句话说,在收到此事件后,流必须切换到非可写和非可读模式,也请参阅 isWritable()。请注意,此事件不应与 end 事件混淆。

isReadable()

isReadable(): bool 方法可用于检查此流是否处于可读状态(尚未关闭)。

此方法可用于检查流是否仍然接受传入数据事件或是否已经结束或关闭。一旦流变为非可读,不应再发出进一步的 dataend 事件。

assert($stream->isReadable() === false);

$stream->on('data', assertNeverCalled());
$stream->on('end', assertNeverCalled());

成功打开的流始终必须以可读模式开始。

一旦流结束或关闭,它必须切换到非可读模式。这可以随时发生,可以是显式通过 close() 或隐式由于远程关闭或不可恢复的传输错误。一旦流切换到非可读模式,它不得再切换回可读模式。

如果此流是 DuplexStreamInterface,您还应注意到流的可写端也实现了 isWritable() 方法。除非这是一个半开双工流,否则它们通常具有相同的返回值。

pause()

pause(): void 方法可用于暂停读取传入数据事件。

从事件循环中移除数据源文件描述符。这允许您节流传入数据。

除非另有说明,否则成功打开的流不应以暂停状态开始。

一旦流被暂停,不应再发出进一步的 dataend 事件。

$stream->pause();

$stream->on('data', assertShouldNeverCalled());
$stream->on('end', assertShouldNeverCalled());

此方法仅提供咨询,尽管通常不推荐,但流可能会继续发出 data 事件。

您可以通过再次调用 resume() 来继续处理事件。

请注意,这两种方法可以多次调用,特别是多次调用 pause() 应没有任何效果。

另请参阅 resume()

resume()

resume(): void 方法可用于恢复读取传入数据事件。

在之前的 pause() 之后重新连接数据源。

$stream->pause();

$loop->addTimer(1.0, function () use ($stream) {
    $stream->resume();
});

请注意,这两种方法可以多次调用,特别是没有先前的 pause() 调用 resume() 应没有任何效果。

另请参阅 pause()

pipe()

pipe(WritableStreamInterface $dest, array $options = []) 方法可用于将所有数据从此可读源管道传输到指定的可写目标。

自动将所有传入数据发送到目标。根据目标可以处理的内容自动节流源。

$source->pipe($dest);

同样,您也可以将实现 DuplexStreamInterface 的实例管道传输到自身,以将接收到的所有数据写回。这可能是一个有用的功能,用于TCP/IP回声服务。

$connection->pipe($connection);

此方法返回目标流原样,可用于设置管道流链。

$source->pipe($decodeGzip)->pipe($filterBadWords)->pipe($dest);

默认情况下,这将在源流发出 end 事件时在目标流上调用 end()。这可以通过以下方式禁用

$source->pipe($dest, array('end' => false));

请注意,这仅适用于 end 事件。如果源流上发生 error 或显式的 close 事件,您必须手动关闭目标流。

$source->pipe($dest);
$source->on('close', function () use ($dest) {
    $dest->end('BYE!');
});

如果源流不可读(处于关闭状态),则此操作为无操作(NO-OP)。

$source->close();
$source->pipe($dest); // NO-OP

如果目标流不可写(处于关闭状态),则将简单地节流(暂停)源流。

$dest->close();
$source->pipe($dest); // calls $source->pause()

同样,如果目标流在管道仍然活动时被关闭,它也将节流(暂停)源流。

$source->pipe($dest);
$dest->close(); // calls $source->pause()

一旦管道设置成功,目标流必须发出一个带有此源流的 pipe 事件作为事件参数。

close()

可以使用 close(): void 方法来关闭流(强制关闭)。

此方法可用于(强制)关闭流。

$stream->close();

一旦流关闭,它应该发出一个 close 事件。请注意,此事件不应该被触发多次,尤其是在调用此方法多次时。

调用此方法后,流必须切换到非可读模式,请参阅 isReadable()。这意味着不应该再发出任何 dataend 事件。

$stream->close();
assert($stream->isReadable() === false);

$stream->on('data', assertNeverCalled());
$stream->on('end', assertNeverCalled());

如果此流是 DuplexStreamInterface,您还应该注意流的可写部分也实现了 close() 方法。换句话说,在调用此方法后,流必须切换到非可写和非可读模式,请参阅 isWritable()。请注意,此方法不应与 end() 方法混淆。

WritableStreamInterface

WritableStreamInterface 负责提供写入仅流和双工流的可写部分的接口。

除了定义一些方法之外,此接口还实现了 EventEmitterInterface,允许您响应某些事件。

事件回调函数必须是有效的 callable,必须遵守严格的参数定义,并且必须接受与文档中描述的精确事件参数。事件回调函数不得抛出 Exception。事件回调函数的返回值将被忽略并没有任何效果,因此出于性能原因,建议不要返回任何过多的数据结构。

此接口的每个实现都必须遵循以下事件语义才能被视为良好行为的流。

请注意,此接口的高级实现可以选择定义具有专用语义的额外事件,这些语义不是作为此低级流规范的一部分定义的。对这些事件语义的遵守超出此接口的范围,因此您可能还需要参考此类高级实现的文档。

drain事件

每当写入缓冲区之前已满,现在准备好接受更多数据时,将发出 drain 事件。

$stream->on('drain', function () use ($stream) {
    echo 'Stream is now ready to accept more data';
});

此事件应该在缓冲区之前已满,现在准备好接受更多数据时发出一次。换句话说,此事件可能被触发任意次数,如果缓冲区最初从未满过,则可能为0次。如果缓冲区之前从未满过,则不应该发出此事件。

此事件主要用于内部,请参阅 write() 获取更多详细信息。

pipe事件

每当一个可读流被 pipe() 到此流中时,将发出 pipe 事件。此事件接收一个 ReadableStreamInterface 参数,作为源流。

$stream->on('pipe', function (ReadableStreamInterface $source) use ($stream) {
    echo 'Now receiving piped data';

    // explicitly close target if source emits an error
    $source->on('error', function () use ($stream) {
        $stream->close();
    });
});

$source->pipe($stream);

对于每个成功通过此目标流管道的可读流,必须发出此事件一次。换句话说,此事件可能被触发任意次数,如果从未有流被管道到此流中,则可能为0次。如果源流不可读(已关闭)或此目标不可写(已关闭),则不应该发出此事件。

此事件主要用于内部,请参阅 pipe() 获取更多详细信息。

error事件

当发生致命错误时(通常在尝试写入此流时),将发出 error 事件。此事件接收一个 Exception 参数作为错误实例。

$stream->on('error', function (Exception $e) {
    echo 'Error: ' . $e->getMessage() . PHP_EOL;
});

当流检测到致命错误(例如致命传输错误)时,应该发出此事件。它不应该在先前的 errorclose 事件之后发出。如果这不是一个致命错误条件,例如没有导致数据丢失的暂时性网络问题,则不应该发出此事件。

在流发生错误后,必须关闭流,因此应该随后跟随一个 close 事件,然后切换到非可写模式,请参阅 close()isWritable()

许多常见的流(如TCP/IP连接或基于文件的流)仅处理数据传输,并且可能会在发生致命传输错误时只发出一次,然后响应地关闭(终止)流。

如果这个流是DuplexStreamInterface,你应该注意流的可读部分也实现了error事件。换句话说,在读取或写入流时可能会发生错误,这应该导致相同的错误处理。

close事件

当流关闭(终止)时,将触发close事件。

$stream->on('close', function () {
    echo 'CLOSED';
});

此事件应根据流是否终止而发出一次或永不发出。它不应在之前的 close 事件之后发出。

在流关闭后,它必须切换到非可写模式,参见isWritable()

无论这种关闭是由于不可恢复的错误隐式发生,还是由于任一方显式关闭流而发生的,此事件都应在流关闭时发出。

许多常见的流(如TCP/IP连接或基于文件的流)可能会在从end()方法刷新缓冲区后、接收到一个成功的end事件或接收到一个致命的传输error事件后发出此事件。

如果这个流是DuplexStreamInterface,你应该注意流的可读部分也实现了close事件。换句话说,在接收到此事件后,流必须切换到非可写和非可读模式,参见isReadable()。请注意,此事件不应与end事件混淆。

isWritable()

isWritable(): bool方法可以用来检查此流是否处于可写状态(尚未关闭)。

此方法可以用来检查流是否仍然接受写入任何数据,或者它是否已经结束或关闭。向非可写流写入任何数据都是无效操作。

assert($stream->isWritable() === false);

$stream->write('end'); // NO-OP
$stream->end('end'); // NO-OP

一个成功打开的流始终必须以可写模式开始。

一旦流结束或关闭,它必须切换到非可写模式。这可以在任何时候发生,可以是通过end()close()显式发生,或者由于远程关闭或不可恢复的传输错误隐式发生。一旦流切换到非可写模式,它不得返回到可写模式。

如果这个流是DuplexStreamInterface,你应该注意流的可读部分也实现了isReadable()方法。除非这是一个半开双工流,否则它们通常应该有相同的返回值。

write()

write(mixed $data): bool方法可以用来将一些数据写入流。

成功的写入必须用布尔值true来确认,这意味着数据要么立即写入(刷新),要么是缓存的并计划在将来写入。请注意,此接口不会给你控制显式刷新缓冲数据的权限,因为找到合适的时机超出了此接口的范围,并留给此接口的实现。

许多常见的流(如TCP/IP连接或文件流)可能会选择使用底层的EventLoop来检查资源何时实际上是可写的,通过使用它来缓冲所有给定的数据并计划未来的刷新。

如果流无法处理写入(或刷新)数据,它应该发出一个error事件,并且如果它无法从该错误中恢复,则可以close()流。

如果在添加$data后内部缓冲区已满,则write()应该返回false,表示调用者应停止发送数据,直到缓冲区排空。流应在缓冲区准备好接受更多数据时发送一个drain事件。

同样,如果流不可写(已经处于关闭状态),它不得处理给定的$data,并且应该返回false,表示调用者应停止发送数据。

给定的 $data 参数可能为混合类型,但通常建议它应该是 string 类型的值,或者可以使用可以表示为 string 的类型,以实现最大兼容性。

许多常见的流(如 TCP/IP 连接或基于文件的流)只接受作为 string 值块传输的原始(二进制)有效负载数据。

由于这种基于流的特性,发送者可能发送任意数量的不同大小的块。无法保证这些块将以发送者意图发送的精确帧格式接收。换句话说,许多底层协议(如TCP/IP)以从单个字节到几十千字节之间的块来传输数据。您可能需要在这些底层数据块上应用更高层的协议,以实现正确的消息帧。

end()

可以使用 end(mixed $data = null): void 方法成功结束流(在可选地发送一些最终数据后)。

此方法可以用来成功结束流,即关闭流,发送所有当前缓存的全部数据。

$stream->write('hello');
$stream->write('world');
$stream->end();

如果没有当前缓存的数据需要刷新,则此方法可以立即调用 close() 来关闭流。

如果缓冲区中仍有需要先刷新的数据,则此方法应尝试写入这些数据,然后才调用 close() 关闭流。一旦流关闭,它应发出一个 close 事件。

请注意,此接口不会提供显式刷新缓冲数据的控制,因为确定适当的时间超出了此接口的范围,留给此接口的实现。

许多常见的流(如TCP/IP连接或文件流)可能会选择使用底层的EventLoop来检查资源何时实际上是可写的,通过使用它来缓冲所有给定的数据并计划未来的刷新。

可以在结束流之前可选地传递一些要写入流的数据。如果给定的 $data 为非 null 值,则此方法将像在结束之前调用 write($data) 一样表现。

// shorter version
$stream->end('bye');

// same as longer version
$stream->write('bye');
$stream->end();

调用此方法后,流必须切换到非可写模式,有关详细信息,请参阅 isWritable()。这意味着无法进行进一步写入,因此任何额外的 write()end() 调用都没有效果。

$stream->end();
assert($stream->isWritable() === false);

$stream->write('nope'); // NO-OP
$stream->end(); // NO-OP

如果此流是 DuplexStreamInterface,调用此方法应也会结束其可读侧,除非流支持半开模式。换句话说,调用此方法后,这些流应切换到非可写和非可读模式,有关详细信息,请参阅 isReadable()。这意味着在这种情况下,流不应再发出任何 dataend 事件。流可以选择使用 pause() 方法的逻辑,但必须特别注意确保后续对 resume() 方法的调用不应继续发出可读事件。

请注意,此方法不应与 close() 方法混淆。

close()

可以使用 close(): void 方法来关闭流(强制关闭)。

此方法可以用来强制关闭流,即关闭流而无需等待任何缓冲数据被刷新。如果缓冲区中仍有数据,则应丢弃这些数据。

$stream->close();

一旦流关闭,它应该发出一个 close 事件。请注意,此事件不应该被触发多次,尤其是在调用此方法多次时。

调用此方法后,流必须切换到非可写模式,有关详细信息,请参阅 isWritable()。这意味着无法进行进一步写入,因此任何额外的 write()end() 调用都没有效果。

$stream->close();
assert($stream->isWritable() === false);

$stream->write('nope'); // NO-OP
$stream->end(); // NO-OP

请注意,此方法不应与 end() 方法混淆。与 end() 方法不同,此方法不处理任何现有缓冲区,并简单地丢弃任何缓冲区内容。同样,在流上调用 end() 后,也可以调用此方法,以停止等待流刷新其最终数据。

$stream->end();
$loop->addTimer(1.0, function () use ($stream) {
    $stream->close();
});

如果此流是 DuplexStreamInterface,您还应该注意流的可读侧也实现了 close() 方法。换句话说,在调用此方法后,流必须切换到非可写和非可读模式,有关详细信息,请参阅 isReadable()

DuplexStreamInterface

DuplexStreamInterface 负责为双工流(可读和可写)提供接口。

它建立在现有的可读和可写流接口之上,并遵循完全相同的方法和事件语义。如果您是此概念的初学者,应首先查看 ReadableStreamInterfaceWritableStreamInterface

除了定义了一些方法之外,此接口还实现了EventEmitterInterface,允许你对在ReadableStreamInterfaceWritableStreamInterface上定义的相同事件做出响应。

事件回调函数必须是有效的 callable,必须遵守严格的参数定义,并且必须接受与文档中描述的精确事件参数。事件回调函数不得抛出 Exception。事件回调函数的返回值将被忽略并没有任何效果,因此出于性能原因,建议不要返回任何过多的数据结构。

此接口的每个实现都必须遵循以下事件语义才能被视为良好行为的流。

请注意,此接口的高级实现可以选择定义具有专用语义的额外事件,这些语义不是作为此低级流规范的一部分定义的。对这些事件语义的遵守超出此接口的范围,因此您可能还需要参考此类高级实现的文档。

有关更多详细信息,请参阅ReadableStreamInterfaceWritableStreamInterfaceReadableStreamInterfaceWritableStreamInterface

创建流

ReactPHP在其生态系统中使用“流”的概念,因此许多高级用户只需处理stream usage。这意味着流实例通常在某个高级组件内部创建,许多用户实际上根本不需要处理创建流实例。

  • 如果您想要接受传入或建立出站明文TCP/IP或安全TLS套接字连接流,请使用react/socket
  • 如果您想要接收传入的HTTP请求体流,请使用react/http
  • 如果您想要通过进程管道(如STDIN、STDOUT、STDERR等)与子进程通信,请使用react/child-process
  • 如果您想要从/向文件系统读取/写入,请使用实验性的react/filesystem
  • 请参阅最后一章,以了解更多实际应用场景。

但是,如果您正在编写低级组件或想要从流资源创建流实例,那么下一章就是为您准备的。

注意,以下示例仅使用fopen()stream_socket_client()进行说明。这些函数不应该在实际的异步程序中使用,因为每个调用可能需要几秒钟才能完成,否则会阻塞事件循环。此外,fopen()调用在某些平台上可能返回文件句柄,而这些文件句柄可能不是所有事件循环实现都支持的。作为替代,您可能想要使用上面列出的高级库。

ReadableResourceStream

ReadableResourceStream是PHP流资源的ReadableStreamInterface的具体实现。

这可以用来表示只读资源,例如以可读模式打开的文件流或类似STDIN的流。

$stream = new ReadableResourceStream(STDIN, $loop);
$stream->on('data', function ($chunk) {
    echo $chunk;
});
$stream->on('end', function () {
    echo 'END';
});

有关更多详细信息,请参阅ReadableStreamInterface

构造函数提供的第一个参数必须是打开在读取模式下的有效流资源(例如,fopen()模式r)。否则,它将抛出InvalidArgumentException异常。

// throws InvalidArgumentException
$stream = new ReadableResourceStream(false, $loop);

有关读写流资源,请参阅DuplexResourceStream

内部,此类会尝试在流资源上启用非阻塞模式,这可能不支持所有流资源。最值得注意的是,Windows上的管道(如STDIN等)不支持。如果失败,它将抛出RuntimeException异常。

// throws RuntimeException on Windows
$stream = new ReadableResourceStream(STDIN, $loop);

一旦使用有效的流资源调用构造函数,此类将负责底层流资源。您应该只使用其公共API,不应该手动干预底层流资源。

此类接受一个可选的 int|null $readChunkSize 参数,该参数控制一次从流中读取的最大缓冲区字节数。您可以使用 null 值来应用其默认值。除非您知道自己在做什么,否则不应更改此值。这可以是一个正数,表示一次最多从底层流资源中读取 X 个字节。请注意,实际读取的字节数可能更低,如果流资源当前可用小于 X 个字节。这可以是 -1,表示从底层流资源中“读取所有可用的内容”。这将读取,直到流资源不再可读(即底层缓冲区耗尽),请注意,这并不一定意味着它到达了文件末尾。

$stream = new ReadableResourceStream(STDIN, $loop, 8192);

PHP 错误警告:如果 PHP 进程已被明确启动而没有 STDIN 流,那么尝试从 STDIN 读取可能会返回另一个流资源的数据。如果您使用空流启动而不是 php test.php <&-,而是使用 php test.php < /dev/null,则不会发生这种情况。有关更多详细信息,请参阅 #81

WritableResourceStream

WritableResourceStream 是 PHP 流资源的 WritableStreamInterface 的具体实现。

这可以用来表示一个只写的资源,如以可写模式打开的文件流或如 STDOUTSTDERR 的流。

$stream = new WritableResourceStream(STDOUT, $loop);
$stream->write('hello!');
$stream->end();

有关更多详细信息,请参阅 WritableStreamInterface

传递给构造函数的第一个参数必须是有效的流资源,并且已打开用于写入。否则,它将抛出 InvalidArgumentException

// throws InvalidArgumentException
$stream = new WritableResourceStream(false, $loop);

有关读写流资源,请参阅DuplexResourceStream

内部,此类尝试在流资源上启用非阻塞模式,这可能不是所有流资源都支持的。最值得注意的是,这不支持 Windows 上的管道(STDOUT、STDERR 等)。如果失败,它将抛出 RuntimeException

// throws RuntimeException on Windows
$stream = new WritableResourceStream(STDOUT, $loop);

一旦使用有效的流资源调用构造函数,此类将负责底层流资源。您应该只使用其公共API,不应该手动干预底层流资源。

此类的任何 write() 调用都不会立即执行,而是在事件循环报告流资源已准备好接收数据时异步执行。为此,它使用一个内存缓冲区字符串来收集所有挂起的写入。此缓冲区有一个软限制,定义了它在调用者应该停止发送更多数据之前愿意接受多少数据。

此类接受一个可选的 int|null $writeBufferSoftLimit 参数,该参数控制此最大缓冲区字节数。您可以使用 null 值来应用其默认值。除非您知道自己在做什么,否则不应更改此值。

$stream = new WritableResourceStream(STDOUT, $loop, 8192);

此类接受一个可选的 int|null $writeChunkSize 参数,该参数控制一次写入到流中的最大缓冲区字节数。您可以使用 null 值来应用其默认值。除非您知道自己在做什么,否则不应更改此值。这可以是一个正数,表示一次最多将 X 个字节写入到底层流资源。请注意,实际写入的字节数可能更低,如果流资源当前可用小于 X 个字节。这可以是 -1,表示将“所有可用的内容”写入到底层流资源。

$stream = new WritableResourceStream(STDOUT, $loop, null, 8192);

有关更多详细信息,请参阅 write()

DuplexResourceStream

DuplexResourceStream 是 PHP 流资源的 DuplexStreamInterface 的具体实现。

这可以用来表示一个读写资源,如以读写模式打开的文件流或 TCP/IP 连接等流。

$conn = stream_socket_client('tcp://google.com:80');
$stream = new DuplexResourceStream($conn, $loop);
$stream->write('hello!');
$stream->end();

另请参阅 DuplexStreamInterface 以获取更多详细信息。

构造函数传入的第一个参数必须是有效的流资源,并且该资源已打开用于读写。否则,将抛出 InvalidArgumentException

// throws InvalidArgumentException
$stream = new DuplexResourceStream(false, $loop);

另请参阅 ReadableResourceStream 用于只读流和 WritableResourceStream 用于只写流资源。

内部,此类尝试在流资源上启用非阻塞模式,这可能不是所有流资源都支持的。最值得注意的是,这不支持 Windows 上的管道(STDOUT、STDERR 等)。如果失败,它将抛出 RuntimeException

// throws RuntimeException on Windows
$stream = new DuplexResourceStream(STDOUT, $loop);

一旦使用有效的流资源调用构造函数,此类将负责底层流资源。您应该只使用其公共API,不应该手动干预底层流资源。

此类接受一个可选的 int|null $readChunkSize 参数,该参数控制一次从流中读取的最大缓冲区字节数。您可以使用 null 值来应用其默认值。除非您知道自己在做什么,否则不应更改此值。这可以是一个正数,表示一次最多从底层流资源中读取 X 个字节。请注意,实际读取的字节数可能更低,如果流资源当前可用小于 X 个字节。这可以是 -1,表示从底层流资源中“读取所有可用的内容”。这将读取,直到流资源不再可读(即底层缓冲区耗尽),请注意,这并不一定意味着它到达了文件末尾。

$conn = stream_socket_client('tcp://google.com:80');
$stream = new DuplexResourceStream($conn, $loop, 8192);

此类的任何 write() 调用都不会立即执行,而是在事件循环报告流资源已准备好接收数据时异步执行。为此,它使用一个内存缓冲区字符串来收集所有挂起的写入。此缓冲区有一个软限制,定义了它在调用者应该停止发送更多数据之前愿意接受多少数据。

此类还接受另一个可选的 WritableStreamInterface|null $buffer 参数,该参数控制此流的写入行为。您可以使用 null 值来应用其默认值。除非您知道自己在做什么,否则不应更改此值。

如果您想更改写入缓冲区软限制,可以传递一个 WritableResourceStream 实例,如下所示

$conn = stream_socket_client('tcp://google.com:80');
$buffer = new WritableResourceStream($conn, $loop, 8192);
$stream = new DuplexResourceStream($conn, $loop, null, $buffer);

另请参阅 WritableResourceStream 以获取更多详细信息。

ThroughStream

ThroughStream 实现了 DuplexStreamInterface,并将简单地将其接收到的任何数据传递到其可读端。

$through = new ThroughStream();
$through->on('data', $this->expectCallableOnceWith('hello'));

$through->write('hello');

类似地,end() 方法将结束流并发出 end 事件,然后 close() 流。close() 方法将关闭流并发出 close 事件。因此,这也可以在 pipe() 上下文中使用,如下所示

$through = new ThroughStream();
$source->pipe($through)->pipe($dest);

可选地,其构造函数可以接受任何可调用的函数,然后该函数将用于 过滤 传递给它的任何数据。此函数接收一个数据参数,该参数传递给可写端,并必须返回数据,该数据将传递到其可读端

$through = new ThroughStream('strtoupper');
$source->pipe($through)->pipe($dest);

请注意,此类不对任何数据类型做出假设。它可以用于转换数据,例如,将任何结构化数据转换为类似这样的换行符分隔的 JSON (NDJSON) 流

$through = new ThroughStream(function ($data) {
    return json_encode($data) . PHP_EOL;
});
$through->on('data', $this->expectCallableOnceWith("[2, true]\n"));

$through->write(array(2, true));

回调函数允许抛出 Exception。在这种情况下,流将发出 error 事件,然后 close() 流。

$through = new ThroughStream(function ($data) {
    if (!is_string($data)) {
        throw new \UnexpectedValueException('Only strings allowed');
    }
    return $data;
});
$through->on('error', $this->expectCallableOnce()));
$through->on('close', $this->expectCallableOnce()));
$through->on('data', $this->expectCallableNever()));

$through->write(2);

CompositeStream

CompositeStream 实现了 DuplexStreamInterface,并且可以从两个分别实现 ReadableStreamInterfaceWritableStreamInterface 的单个流创建单个双向流。

对于可能需要单个 DuplexStreamInterface 的某些 API,或者因为通常更方便地使用单个流实例,这很有用

$stdin = new ReadableResourceStream(STDIN, $loop);
$stdout = new WritableResourceStream(STDOUT, $loop);

$stdio = new CompositeStream($stdin, $stdout);

$stdio->on('data', function ($chunk) use ($stdio) {
    $stdio->write('You said: ' . $chunk);
});

这是一个行为良好的流,它会转发来自底层流的全部流事件,并将所有流调用转发到底层流。

如果您向双向流 write(),它将简单地向可写端 write() 并返回其状态。

如果您结束双向流 end(),它将结束可写端并将可读端 pause()

如果您关闭双向流 close(),两个输入流都将关闭。如果两个输入流中的任何一个发出 close 事件,双向流也将关闭。如果在构造双向流时两个输入流中的任何一个已经关闭,它将关闭另一端并返回一个已关闭的流。

使用

以下示例可用于将源文件的全部内容传输到目标文件,而不必将整个文件读入内存。

$loop = new React\EventLoop\StreamSelectLoop;

$source = new React\Stream\ReadableResourceStream(fopen('source.txt', 'r'), $loop);
$dest = new React\Stream\WritableResourceStream(fopen('destination.txt', 'w'), $loop);

$source->pipe($dest);

$loop->run();

请注意,此示例仅用于说明目的,使用了 fopen()。在真正的异步程序中不应使用此方法,因为文件系统本质上是阻塞的,每次调用可能需要几秒钟。有关更复杂的示例,请参阅创建流

安装

建议通过Composer安装此库。您是Composer的新手吗?

本项目遵循SemVer。这将安装最新的支持版本。

$ composer require react/stream:^1.0

有关版本升级的详细信息,请参阅变更日志

本项目旨在在所有平台上运行,因此不需要任何PHP扩展,并支持在旧版PHP 5.3到当前PHP 7+和HHVM上运行。由于性能大幅提升,强烈建议使用PHP 7+。

测试

要运行测试套件,首先需要克隆此存储库,然后通过Composer安装所有依赖项。

$ composer install

要运行测试套件,请进入项目根目录并运行

$ php vendor/bin/phpunit

测试套件还包含一些依赖于稳定互联网连接的功能集成测试。如果您不想运行这些测试,可以简单地像这样跳过它们

$ php vendor/bin/phpunit --exclude-group internet

许可证

MIT授权,请参阅授权文件

更多信息