raimx/stream

ReactPHP中的非阻塞I/O事件驱动可读和可写流

维护者

详细信息

github.com/RaiMX/stream

源代码

v1.1.0 2019-01-01 16:15 UTC

README

Build Status

ReactPHP中的非阻塞I/O事件驱动可读和可写流。

为了使EventLoop更易于使用,该组件引入了强大的“流”概念。流允许您以小块的形式有效地处理大量数据(例如多吉字节文件下载),而无需一次性将所有内容存储在内存中。它们与PHP本身中的流非常相似,但具有更适合异步、非阻塞I/O的接口。

目录

流的使用

ReactPHP在其整个生态系统中使用“流”的概念,为处理任意数据内容和大小的流提供一致的高级抽象。虽然流本身是一个相当低级的概念,但它可以用作强大的抽象来构建高级组件和协议。

如果您对这个概念不熟悉,可以帮助您将其视为水管:您可以从源中消耗水,或者您可以生产水并将其(管道)传输到任何目的地(汇)。

同样,流可以是

  • 可读的(例如终端输入STDIN)或
  • 可写的(例如终端输出STDOUT)或
  • 全双工的(可读和可写,例如TCP/IP连接)

因此,此包定义了以下三个接口

ReadableStreamInterface

ReadableStreamInterface 负责提供只读流和全双工流的可读侧的接口。

除了定义一些方法外,此接口还实现了 EventEmitterInterface,允许您对某些事件做出反应。

事件回调函数必须是有效的 callable,必须遵守严格的参数定义,并且必须接受事件参数,正如文档中所述。事件回调函数不得抛出 Exception。事件回调函数的返回值将被忽略,没有效果,因此出于性能原因,建议不要返回过多的数据结构。

此接口的每个实现都必须遵循以下事件语义,才能被视为行为良好的流。

请注意,此接口的高级实现可能选择定义具有特定语义的附加事件,这些语义不是作为低级流规范的一部分定义的。对这些事件语义的符合性不在本接口的范围内,因此您还可能需要参考此类高级实现的文档。

数据事件

每当从这个源流读取/接收数据时,都会触发data事件。该事件接收一个用于传入数据的单一混合参数。

$stream->on('data', function ($data) {
    echo $data;
});

此事件可以多次触发,也可能一次都不触发(如果此流根本不发送任何数据)。它不应该在endclose事件之后触发。

提供的$data参数可能为混合类型,但通常建议它应该是string值或可以使用允许表示为string的类型,以实现最大的兼容性。

许多常见的流(如TCP/IP连接或基于文件的流)会以string值的块的形式发出接收到的原始(二进制)有效载荷数据。

由于这种基于流的特点,发送者可以发送任意数量的块,这些块的大小各不相同。不能保证这些块会以发送者意图发送的精确帧格式接收。换句话说,许多底层协议(如TCP/IP)在单字节值到几十千字节之间传输数据块。您可能需要应用更高层次的协议来处理这些底层数据块,以实现正确的消息封装。

结束事件

一旦源流成功到达流的末尾(EOF),就会触发end事件。

$stream->on('end', function () {
    echo 'END';
});

根据是否检测到成功的结束,此事件应该触发一次或根本不触发。它不应该在先前的endclose事件之后触发。如果由于非成功的结束而关闭流(例如,在先前的error事件之后),则绝对不应该触发此事件。

流结束之后,它必须切换到非可读模式,请参阅isReadable()

只有当成功到达结束点时,才会触发此事件,而不是如果流被不可恢复的错误中断或明确关闭。并非所有流都知道“成功的结束”这一概念。许多用例涉及检测流何时关闭(终止),在这种情况下,您应使用close事件。在流发出end事件之后,通常随后会跟随一个close事件。

许多常见的流(如TCP/IP连接或基于文件的流)会在此事件触发,如果远程端关闭了连接或成功读取到文件末尾(EOF)。

请注意,此事件不应与end()方法混淆。此事件定义了从源流成功读取的结束,而end()方法定义了向目标流写入成功的结束。

错误事件

当发生致命错误时,通常在尝试从该流读取时,会触发error事件。该事件接收一个用于错误实例的单一Exception参数。

$server->on('error', function (Exception $e) {
    echo 'Error: ' . $e->getMessage() . PHP_EOL;
});

在流检测到致命错误(如致命传输错误或意外的data或过早的end事件)之后,应该触发此事件。它不应该在先前的errorendclose事件之后触发。如果这不是致命错误条件(例如,暂时性的网络问题,未导致任何数据丢失),则绝对不应该触发此事件。

在流发生错误后,它必须关闭流,因此应该跟随一个close事件,然后切换到非可读模式,请参阅close()isReadable()

许多常见的流(如TCP/IP连接或基于文件的流)只处理数据传输,并不假设数据边界(如意外数据或提前结束事件)。换句话说,许多底层协议(如TCP/IP)可能选择只在致命传输错误发生一次后发出信号,然后关闭(终止)流。

如果这个流是DuplexStreamInterface,你还应该注意流的可写端也实现了error事件。换句话说,在读取或写入流的过程中可能发生错误,这应导致相同的错误处理。

关闭事件

当流关闭(终止)时,将发出close事件。

$stream->on('close', function () {
    echo 'CLOSED';
});

根据流是否终止,此事件应仅发出一次或根本不发出。它不应在先前的close事件之后发出。

流关闭后,它必须切换到非可读模式,请参阅isReadable()

end事件不同,此事件应在流关闭时发出,无论这是由于无法恢复的错误隐式发生还是由于任一端明确关闭流。如果你只想检测一个成功的结束,应使用end事件。

许多常见的流(如TCP/IP连接或基于文件的流)可能会在读取一个成功的end事件或致命传输error事件后发出此事件。

如果这个流是DuplexStreamInterface,你还应该注意流的可写端也实现了close事件。换句话说,在接收到此事件后,流必须切换到非可写和非可读模式,请参阅isWritable()。请注意,此事件不应与end事件混淆。

isReadable()

可以使用isReadable(): bool方法来检查此流是否处于可读状态(尚未关闭)。

此方法可以用来检查流是否仍接受传入的数据事件,或者它是否已经结束或关闭。一旦流变为非可读,不应再发出任何dataend事件。

assert($stream->isReadable() === false);

$stream->on('data', assertNeverCalled());
$stream->on('end', assertNeverCalled());

成功打开的流始终必须以可读模式开始。

一旦流结束或关闭,它必须切换到非可读模式。这可以随时发生,可以是显式通过close(),也可以是隐式由于远程关闭或无法恢复的传输错误。一旦流切换到非可读模式,它不应再切换回可读模式。

如果这个流是DuplexStreamInterface,你还应该注意流的可写端也实现了isWritable()方法。除非这是半开双工流,否则它们通常应有相同的返回值。

pause()

可以使用pause(): void方法来暂停读取传入的数据事件。

从事件循环中移除数据源文件描述符。这允许你限制传入的数据。

除非另有说明,否则成功打开的流不应以暂停状态开始。

一旦流暂停,不应再发出任何dataend事件。

$stream->pause();

$stream->on('data', assertShouldNeverCalled());
$stream->on('end', assertShouldNeverCalled());

此方法仅是建议性的,尽管通常不推荐,流可能会继续发出data事件。

你可以通过再次调用resume()来继续处理事件。

请注意,这两种方法可以调用任意次数,特别是多次调用pause()不应有任何效果。

另请参阅resume()

resume()

resume(): void方法可用于恢复读取传入的数据事件。

在之前的pause()之后重新连接数据源。

$stream->pause();

$loop->addTimer(1.0, function () use ($stream) {
    $stream->resume();
});

请注意,这两种方法可以调用任意次数,特别是没有先前的pause()就调用resume()不应有任何效果。

另请参阅pause()

pipe()

pipe(WritableStreamInterface $dest, array $options = [])方法可用于将所有数据从该可读源传输到指定的可写目标。

自动将所有传入数据发送到目标。根据目标处理能力自动调节源速度。

$source->pipe($dest);

同样,您还可以将实现DuplexStreamInterface的实例管道到自己,以便将接收到的所有数据写回。这可能是一个有用的功能,用于TCP/IP回声服务。

$connection->pipe($connection);

此方法返回目标流原样,可用于设置管道流的链。

$source->pipe($decodeGzip)->pipe($filterBadWords)->pipe($dest);

默认情况下,此方法将在源流发出end事件时在目标流上调用end()。可以通过这种方式禁用

$source->pipe($dest, array('end' => false));

请注意,这仅适用于end事件。如果源流上发生error或显式close事件,您必须手动关闭目标流。

$source->pipe($dest);
$source->on('close', function () use ($dest) {
    $dest->end('BYE!');
});

如果源流不可读(关闭状态),则此操作无任何效果。

$source->close();
$source->pipe($dest); // NO-OP

如果目标流不可写(关闭状态),则此操作将简单地调节(暂停)源流。

$dest->close();
$source->pipe($dest); // calls $source->pause()

同样,如果管道活动期间目标流关闭,它也会调节(暂停)源流。

$source->pipe($dest);
$dest->close(); // calls $source->pause()

一旦管道设置成功,目标流必须发出一个带有此源流的事件参数的pipe事件。

close()

close(): void方法可用于关闭流(强制)。

此方法可用于(强制)关闭流。

$stream->close();

一旦流关闭,它应该发出一个close事件。请注意,此事件不应多次发出,尤其是如果该方法多次调用。

调用此方法后,流必须切换到非可读模式,请参阅isReadable()。这意味着不应再发出任何dataend事件。

$stream->close();
assert($stream->isReadable() === false);

$stream->on('data', assertNeverCalled());
$stream->on('end', assertNeverCalled());

如果此流是DuplexStreamInterface,您还应该注意流的可写端也实现了close()方法。换句话说,在调用此方法后,流必须切换到非可写和非可读模式,请参阅isWritable()。请注意,此方法不应与end()方法混淆。

WritableStreamInterface

WritableStreamInterface负责为只写流和双工流的可写端提供接口。

除了定义一些方法外,此接口还实现了 EventEmitterInterface,允许您对某些事件做出反应。

事件回调函数必须是有效的 callable,必须遵守严格的参数定义,并且必须接受事件参数,正如文档中所述。事件回调函数不得抛出 Exception。事件回调函数的返回值将被忽略,没有效果,因此出于性能原因,建议不要返回过多的数据结构。

此接口的每个实现都必须遵循以下事件语义,才能被视为行为良好的流。

请注意,此接口的高级实现可能选择定义具有特定语义的附加事件,这些语义不是作为低级流规范的一部分定义的。对这些事件语义的符合性不在本接口的范围内,因此您还可能需要参考此类高级实现的文档。

drain事件

每当写入缓冲区之前已满,现在准备好接受更多数据时,将发出drain事件。

$stream->on('drain', function () use ($stream) {
    echo 'Stream is now ready to accept more data';
});

此事件应在缓冲区之前每次变满并现在准备好接受更多数据时发出一次。换句话说,此事件可以发出任意次数,这可能为零次,如果缓冲区最初从未变满。如果缓冲区之前从未变满,则不应发出此事件。

此事件主要用于内部使用,请参阅write()以获取更多详细信息。

管道事件

每当可读流被pipe()方法管道到该流时,将触发pipe事件。该事件接收一个用于源流的单个ReadableStreamInterface参数。

$stream->on('pipe', function (ReadableStreamInterface $source) use ($stream) {
    echo 'Now receiving piped data';

    // explicitly close target if source emits an error
    $source->on('error', function () use ($stream) {
        $stream->close();
    });
});

$source->pipe($stream);

对于成功管道到该目标流的每个可读流,此事件必须触发一次。换句话说,此事件可能被触发任意次数,可能为零次,如果从未有流被管道到该流。如果源不可读(已关闭)或该目标不可写(已关闭),则此事件不得触发。

此事件主要在内部使用,有关更多详细信息,请参阅pipe()

错误事件

当发生致命错误时,通常在尝试写入该流时,将触发error事件。该事件接收一个用于错误实例的单个Exception参数。

$stream->on('error', function (Exception $e) {
    echo 'Error: ' . $e->getMessage() . PHP_EOL;
});

当流检测到致命错误,例如致命传输错误时,应触发此事件。它不应在先前的errorclose事件之后触发。如果这不是致命错误条件,例如没有导致数据丢失的暂时性网络问题,则不得触发此事件。

在流发生错误后,它必须关闭流,因此应随后触发一个close事件,然后切换到非可写模式,请参阅close()isWritable()

许多常见流(如TCP/IP连接或基于文件的流)仅处理数据传输,并且可能选择仅在发生致命传输错误时触发一次,然后关闭(终止)流作为响应。

如果此流是DuplexStreamInterface,还应注意到流的可读部分也实现了error事件。换句话说,在读取或写入流时可能发生错误,这将导致相同的错误处理。

关闭事件

当流关闭(终止)时,将发出close事件。

$stream->on('close', function () {
    echo 'CLOSED';
});

根据流是否终止,此事件应仅发出一次或根本不发出。它不应在先前的close事件之后发出。

在流关闭后,它必须切换到非可写模式,请参阅isWritable()

此事件应在流关闭时触发,无论这是由于不可恢复的错误隐式发生,还是由于任一侧显式关闭流。

许多常见流(如TCP/IP连接或基于文件的流)可能会选择在从end()方法刷新缓冲区后、在接收到一个成功的 end事件或致命传输error事件后触发此事件。

如果此流是DuplexStreamInterface,还应注意到流的可读部分也实现了close事件。换句话说,在接收到此事件后,流必须切换到非可写和非可读模式,请参阅isReadable()。请注意,此事件不应与end事件混淆。

isWritable()

可以使用isWritable(): bool方法来检查此流是否处于可写状态(尚未关闭)。

此方法可用于检查流是否仍接受写入任何数据或它是否已结束或关闭。向非可写流写入任何数据都是无操作。

assert($stream->isWritable() === false);

$stream->write('end'); // NO-OP
$stream->end('end'); // NO-OP

成功打开的流始终必须以可写模式开始。

一旦流结束或关闭,它必须切换到非可写模式。这可能发生在任何时候,无论是通过显式的end()close(),还是通过远程关闭或不可恢复的传输错误隐式发生。一旦流切换到非可写模式,它不得返回到可写模式。

如果这个流是 DuplexStreamInterface,您还应该注意流的可读部分也实现了 isReadable() 方法。除非这是一个半开式全双工流,否则它们通常应该有相同的返回值。

write()

write(mixed $data): bool 方法可以用来将一些数据写入流中。

成功写入必须用布尔值 true 进行确认,这意味着数据要么被立即写入(刷新),要么被缓冲并安排在未来的写入。请注意,此接口不会给您控制显式刷新缓冲数据的权限,因为这超出了此接口的范围,并留给此接口的实现。

许多常见的流(如TCP/IP连接或基于文件的流)可能会选择将所有给定数据缓冲起来,并使用底层的EventLoop检查何时资源实际上是可写的,以此来安排未来的刷新。

如果流无法处理写入(或刷新)数据,它应该发出一个 error 事件,并且如果它不能从这个错误中恢复,则可以 close() 流。

如果在添加 $data 后内部缓冲区已满,那么 write() 应该返回 false,表示调用者应停止发送数据,直到缓冲区排空。一旦缓冲区准备好接受更多数据,流应该发送一个 drain 事件。

类似地,如果流不可写(已经处于关闭状态),它不得处理给定的 $data,并且应该返回 false,表示调用者应停止发送数据。

给定的 $data 参数可能为混合类型,但通常建议它应该是 string 值或可能使用可以表示为 string 的类型,以实现最大兼容性。

许多常见的流(如TCP/IP连接或基于文件的流)将只接受作为 string 值的块传输的原始(二进制)有效载荷数据。

由于这种基于流的特点,发送者可以发送任意数量的块,这些块的大小各不相同。不能保证这些块会以发送者意图发送的精确帧格式接收。换句话说,许多底层协议(如TCP/IP)在单字节值到几十千字节之间传输数据块。您可能需要应用更高层次的协议来处理这些底层数据块,以实现正确的消息封装。

end()

end(mixed $data = null): void 方法可以用来成功结束流(在可选地发送一些最终数据之后)。

此方法可以用来成功结束流,即关闭流,在发送出所有当前缓冲的数据之后。

$stream->write('hello');
$stream->write('world');
$stream->end();

如果没有数据当前在缓冲区中并且没有需要刷新的内容,则此方法可以立即 close 流。

如果缓冲区中仍有需要先刷新的数据,则此方法应尝试写入此数据,然后才 close 流。一旦流被关闭,它应发出一个 close 事件。

请注意,此接口不会给您控制显式刷新缓冲数据的权限,因为找到合适的时间超出了此接口的范围,并留给此接口的实现。

许多常见的流(如TCP/IP连接或基于文件的流)可能会选择将所有给定数据缓冲起来,并使用底层的EventLoop检查何时资源实际上是可写的,以此来安排未来的刷新。

您可以在结束流之前可选地传递一些最终数据写入流。如果提供了非 null$data 值,则此方法的行为将与在结束前调用 write($data) 相同,但没有数据。

// shorter version
$stream->end('bye');

// same as longer version
$stream->write('bye');
$stream->end();

调用此方法后,流必须切换到非可写模式,也参见 isWritable()。这意味着不再可能进行进一步的写入,所以任何额外的 write()end() 调用都没有效果。

$stream->end();
assert($stream->isWritable() === false);

$stream->write('nope'); // NO-OP
$stream->end(); // NO-OP

如果这个流是 DuplexStreamInterface,调用此方法应该也结束其可读端,除非流支持半开模式。换句话说,在调用此方法后,这些流应该切换到不可写且不可读模式,也可以参见 isReadable()。这意味着在这种情况下,流不应该再发出任何 dataend 事件。流可以选择使用 pause() 方法的逻辑,但可能需要特别小心,以确保后续对 resume() 方法的调用不应继续发出可读事件。

请注意,此方法不应与 close() 方法混淆。

close()

close(): void方法可用于关闭流(强制)。

此方法可以用于强制关闭流,即在不等待任何缓冲数据被刷新的情况下关闭流。如果缓冲区中仍有数据,则应丢弃这些数据。

$stream->close();

一旦流关闭,它应该发出一个close事件。请注意,此事件不应多次发出,尤其是如果该方法多次调用。

调用此方法后,流必须切换到非可写模式,也参见 isWritable()。这意味着不再可能进行进一步的写入,所以任何额外的 write()end() 调用都没有效果。

$stream->close();
assert($stream->isWritable() === false);

$stream->write('nope'); // NO-OP
$stream->end(); // NO-OP

请注意,此方法不应与 end() 方法混淆。与 end() 方法不同,此方法不处理任何现有缓冲区,并简单地丢弃任何缓冲区内容。同样,此方法也可以在调用流上的 end() 之后调用,以便停止等待流刷新最终数据。

$stream->end();
$loop->addTimer(1.0, function () use ($stream) {
    $stream->close();
});

如果此流是 DuplexStreamInterface,还应注意流的可读端也实现了 close() 方法。换句话说,在调用此方法后,流必须切换到不可写且不可读模式,也可以参见 isReadable()

DuplexStreamInterface

DuplexStreamInterface 负责提供双工流(可读和可写)的接口。

它建立在现有的可读和可写流接口之上,并遵循完全相同的方法和事件语义。如果您是此概念的初学者,应首先查看 ReadableStreamInterfaceWritableStreamInterface

除了定义一些方法之外,此接口还实现了 EventEmitterInterface,这允许您对在 ReadableStreamInterfaceWritableStreamInterface 上定义的同一事件做出反应。

事件回调函数必须是有效的 callable,必须遵守严格的参数定义,并且必须接受事件参数,正如文档中所述。事件回调函数不得抛出 Exception。事件回调函数的返回值将被忽略,没有效果,因此出于性能原因,建议不要返回过多的数据结构。

此接口的每个实现都必须遵循以下事件语义,才能被视为行为良好的流。

请注意,此接口的高级实现可能选择定义具有特定语义的附加事件,这些语义不是作为低级流规范的一部分定义的。对这些事件语义的符合性不在本接口的范围内,因此您还可能需要参考此类高级实现的文档。

有关详细信息,请参见 ReadableStreamInterfaceWritableStreamInterface

创建流

ReactPHP 在其整个生态系统中使用“流”的概念,因此许多此包的高级消费者只需处理 流的使用。这意味着流实例通常在某个高级组件中创建,并且许多消费者实际上并不需要处理创建流实例。

  • 如果您想接受传入或建立输出纯文本 TCP/IP 或安全 TLS 套接字连接流,请使用 react/socket
  • 如果您想接收传入的 HTTP 请求体流,请使用 react/http
  • 如果您想通过进程管道(如 STDIN、STDOUT、STDERR 等)与子进程通信,请使用 react/child-process
  • 如果您想从文件系统读取或写入,请使用实验性的 react/filesystem
  • 有关更多实际应用,请参阅最后一章。

然而,如果您正在编写低级组件或需要从流资源创建流实例,那么下一章将适合您。

请注意,以下示例仅用于说明目的,使用了 fopen()stream_socket_client()。这些函数不应该在实际的异步程序中使用,因为每次调用可能需要几秒钟才能完成,否则会阻塞事件循环。此外,fopen() 调用在某些平台上可能会返回一个文件句柄,这可能不是所有事件循环实现都支持的。作为替代方案,您可能希望使用上面列出的高级库。

可读资源流

ReadableResourceStream 是 PHP 流资源的 ReadableStreamInterface 的具体实现。

这可以用来表示只读资源,如以可读模式打开的文件流或类似 STDIN 的流。

$stream = new ReadableResourceStream(STDIN, $loop);
$stream->on('data', function ($chunk) {
    echo $chunk;
});
$stream->on('end', function () {
    echo 'END';
});

有关更多详细信息,请参阅 ReadableStreamInterface

传递给构造函数的第一个参数必须是有效的流资源,并且已以读取模式打开(例如,fopen() 模式 r)。否则,它将抛出 InvalidArgumentException

// throws InvalidArgumentException
$stream = new ReadableResourceStream(false, $loop);

有关其他读取和写入流资源,请参阅 DuplexResourceStream

内部,此类尝试在流资源上启用非阻塞模式,这可能不是所有流资源都支持的。最值得注意的是,Windows 上的管道(如 STDIN 等)不支持此功能。如果失败,它将抛出 RuntimeException

// throws RuntimeException on Windows
$stream = new ReadableResourceStream(STDIN, $loop);

一旦使用有效的流资源调用构造函数,此类将负责底层的流资源。您应该只使用其公共 API,不应该手动干预底层的流资源。

此类接受一个可选的 int|null $readChunkSize 参数,用于控制一次性从流中读取的最大缓冲区大小(以字节为单位)。您可以使用 null 值来应用其默认值。除非您知道自己在做什么,否则不应该更改此值。这可以是一个正数,表示一次性从底层流资源中读取多达 X 字节。请注意,实际读取的字节数可能更低,如果流资源目前可用的字节数少于 X。这可以是 -1,表示从底层流资源中“读取所有可用的内容”。这将读取直到流资源不再可读(即底层缓冲区耗尽),请注意,这并不一定意味着它达到了 EOF。

$stream = new ReadableResourceStream(STDIN, $loop, 8192);

PHP 错误警告:如果 PHP 进程明确没有 STDIN 流启动,那么尝试从 STDIN 读取可能会返回来自另一个流资源的数据。如果您用空流(如 php test.php < /dev/null)而不是 php test.php <&- 启动此进程,则不会发生这种情况。有关更多详细信息,请参阅 #81

可写资源流

WritableResourceStream 是 PHP 流资源的 WritableStreamInterface 的具体实现。

这可以用来表示只写资源,如以可写模式打开的文件流或类似 STDOUTSTDERR 的流。

$stream = new WritableResourceStream(STDOUT, $loop);
$stream->write('hello!');
$stream->end();

有关更多详细信息,请参阅 WritableStreamInterface

构造函数的第一个参数必须是一个有效的写入打开的流资源。否则,它将抛出InvalidArgumentException异常。

// throws InvalidArgumentException
$stream = new WritableResourceStream(false, $loop);

有关其他读取和写入流资源,请参阅 DuplexResourceStream

此类内部尝试在流资源上启用非阻塞模式,但这可能不是所有流资源都支持的。特别是,在Windows上的管道(如STDOUT、STDERR等)不支持。如果失败,它将抛出RuntimeException异常。

// throws RuntimeException on Windows
$stream = new WritableResourceStream(STDOUT, $loop);

一旦使用有效的流资源调用构造函数,此类将负责底层的流资源。您应该只使用其公共 API,不应该手动干预底层的流资源。

对这个类的任何write()调用都不会立即执行,而是会在EventLoop报告流资源准备好接收数据后异步执行。为此,它使用一个内存缓冲字符串来收集所有未完成的写入。这个缓冲区有一个软限制,定义了在调用者应该停止发送更多数据之前它愿意接受多少数据。

此类接受一个可选的int|null $writeBufferSoftLimit参数,用于控制这个最大缓冲区字节数。您可以使用null值来应用其默认值。除非您知道自己在做什么,否则不应该更改此值。

$stream = new WritableResourceStream(STDOUT, $loop, 8192);

此类接受一个可选的int|null $writeChunkSize参数,用于控制一次性写入流的最大缓冲区字节数。您可以使用null值来应用其默认值。除非您知道自己在做什么,否则不应该更改此值。这可以是一个正数,表示一次最多写入X个字节到基础流资源。注意,实际写入的字节数可能更低,如果流资源当前可用字节数少于X字节。这也可以是-1,表示“写入所有可用”到基础流资源。

$stream = new WritableResourceStream(STDOUT, $loop, null, 8192);

有关更多详细信息,请参阅write()

DuplexResourceStream

DuplexResourceStream是PHP流资源的DuplexStreamInterface的具体实现。

这可以用来表示读写资源,如以读写模式打开的文件流或TCP/IP连接等。

$conn = stream_socket_client('tcp://google.com:80');
$stream = new DuplexResourceStream($conn, $loop);
$stream->write('hello!');
$stream->end();

有关更多详细信息,请参阅DuplexStreamInterface

构造函数给出的第一个参数必须是一个有效的读写打开的流资源。否则,它将抛出InvalidArgumentException异常。

// throws InvalidArgumentException
$stream = new DuplexResourceStream(false, $loop);

有关其他情况,请参阅ReadableResourceStream(只读)和WritableResourceStream(只写)流资源。

此类内部尝试在流资源上启用非阻塞模式,但这可能不是所有流资源都支持的。特别是,在Windows上的管道(如STDOUT、STDERR等)不支持。如果失败,它将抛出RuntimeException异常。

// throws RuntimeException on Windows
$stream = new DuplexResourceStream(STDOUT, $loop);

一旦使用有效的流资源调用构造函数,此类将负责底层的流资源。您应该只使用其公共 API,不应该手动干预底层的流资源。

此类接受一个可选的 int|null $readChunkSize 参数,用于控制一次性从流中读取的最大缓冲区大小(以字节为单位)。您可以使用 null 值来应用其默认值。除非您知道自己在做什么,否则不应该更改此值。这可以是一个正数,表示一次性从底层流资源中读取多达 X 字节。请注意,实际读取的字节数可能更低,如果流资源目前可用的字节数少于 X。这可以是 -1,表示从底层流资源中“读取所有可用的内容”。这将读取直到流资源不再可读(即底层缓冲区耗尽),请注意,这并不一定意味着它达到了 EOF。

$conn = stream_socket_client('tcp://google.com:80');
$stream = new DuplexResourceStream($conn, $loop, 8192);

对这个类的任何write()调用都不会立即执行,而是会在EventLoop报告流资源准备好接收数据后异步执行。为此,它使用一个内存缓冲字符串来收集所有未完成的写入。这个缓冲区有一个软限制,定义了在调用者应该停止发送更多数据之前它愿意接受多少数据。

此类还接受另一个可选的WritableStreamInterface|null $buffer参数,用于控制此流的写入行为。您可以使用null值来应用其默认值。除非您知道自己在做什么,否则不应该更改此值。

如果您想更改写入缓冲区软限制,可以传递一个WritableResourceStream实例,如下所示:

$conn = stream_socket_client('tcp://google.com:80');
$buffer = new WritableResourceStream($conn, $loop, 8192);
$stream = new DuplexResourceStream($conn, $loop, null, $buffer);

有关更多详细信息,请参阅WritableResourceStream

ThroughStream

ThroughStream实现了DuplexStreamInterface,并将简单地将您写入的数据传递到其可读端。

$through = new ThroughStream();
$through->on('data', $this->expectCallableOnceWith('hello'));

$through->write('hello');

同样地,end() 方法 将结束流并触发一个 end 事件,然后 close() 流。该 close() 方法 将关闭流并触发一个 close 事件。相应地,这也可以用在类似这样的 pipe() 上下文中

$through = new ThroughStream();
$source->pipe($through)->pipe($dest);

可选地,其构造函数接受任何可调用函数,然后将其用于 过滤 写入的数据。此函数接收一个数据参数,作为传递给可写端的数据,必须返回数据,就像它将传递到其可读端一样

$through = new ThroughStream('strtoupper');
$source->pipe($through)->pipe($dest);

请注意,此类不假设任何数据类型。它可以用于转换数据,例如,将任何结构化数据转换为类似这样的换行分隔JSON (NDJSON) 流

$through = new ThroughStream(function ($data) {
    return json_encode($data) . PHP_EOL;
});
$through->on('data', $this->expectCallableOnceWith("[2, true]\n"));

$through->write(array(2, true));

回调函数可以抛出 Exception。在这种情况下,流将触发一个 error 事件,然后 close() 流。

$through = new ThroughStream(function ($data) {
    if (!is_string($data)) {
        throw new \UnexpectedValueException('Only strings allowed');
    }
    return $data;
});
$through->on('error', $this->expectCallableOnce()));
$through->on('close', $this->expectCallableOnce()));
$through->on('data', $this->expectCallableNever()));

$through->write(2);

CompositeStream

CompositeStream 实现了 DuplexStreamInterface,并可用于从两个分别实现 ReadableStreamInterfaceWritableStreamInterface 的单独流创建单个双工流。

这对于某些可能需要单个 DuplexStreamInterface 的API很有用,或者仅仅因为它通常更方便与单个流实例一起工作

$stdin = new ReadableResourceStream(STDIN, $loop);
$stdout = new WritableResourceStream(STDOUT, $loop);

$stdio = new CompositeStream($stdin, $stdout);

$stdio->on('data', function ($chunk) use ($stdio) {
    $stdio->write('You said: ' . $chunk);
});

这是一个表现良好的流,它转发底层流的所有流事件,并将所有流调用转发到底层流。

如果你向双工流 write(),它将简单地向可写端 write() 并返回其状态。

如果你向双工流 end(),它将 end 可写端并将可读端 pause()

如果你关闭双工流,两个输入流都将关闭。如果两个输入流中的任何一个触发 close 事件,双工流也将关闭。如果在构建双工流时两个输入流中的任何一个已经关闭,它将关闭另一端并返回一个已关闭的流。

使用方法

以下示例可用于将源文件的正文内容导入目标文件,而无需将整个文件全部读入内存

$loop = new React\EventLoop\StreamSelectLoop;

$source = new React\Stream\ReadableResourceStream(fopen('source.txt', 'r'), $loop);
$dest = new React\Stream\WritableResourceStream(fopen('destination.txt', 'w'), $loop);

$source->pipe($dest);

$loop->run();

请注意,此示例仅用于说明目的使用 fopen()。这不应真正用于异步程序,因为文件系统本质上是阻塞的,每次调用可能需要几秒钟。有关更复杂的示例,请参阅创建流

安装

安装此库的推荐方法是 通过 Composer您是 Composer 新手吗?

此项目遵循 SemVer。这将安装最新的支持版本

$ composer require react/stream:^1.0

另请参阅CHANGELOG了解版本升级的详细信息。

本项目旨在在任何平台上运行,因此不需要任何PHP扩展,并支持在旧版PHP 5.3到当前PHP 7+和HHVM上运行。由于性能有了很大提升,**强烈建议**使用PHP 7+。

测试

要运行测试套件,首先需要克隆此仓库,然后通过Composer安装所有依赖项。

$ composer install

要运行测试套件,请转到项目根目录并执行

$ php vendor/bin/phpunit

测试套件还包含一些依赖稳定网络连接的功能集成测试。如果您不想运行这些测试,可以像这样简单地跳过

$ php vendor/bin/phpunit --exclude-group internet

许可证

MIT,请参阅LICENSE文件

更多信息