react/stream

ReactPHP中用于非阻塞I/O的事件驱动可读和可写流

资助包维护!
Open Collective

安装次数: 52,758,420

依赖者: 188

建议者: 2

安全: 0

星标: 618

关注者: 28

分支: 62

开放问题: 2

v1.4.0 2024-06-11 12:45 UTC

README

CI status installs on Packagist

ReactPHP中的事件驱动可读和可写流,用于非阻塞I/O。

开发版本:此分支包含即将发布的v3版本的代码。要查看当前稳定版v1的代码,请查看1.x分支

即将发布的v3版本将是此包的发展方向。然而,我们仍将积极支持v1版本,以帮助尚未升级到最新版本的用户。有关更多详细信息,请参阅安装说明

为了使EventLoop更容易使用,此组件引入了强大的“流”概念。流允许您以小块的方式高效地处理大量数据(例如多吉字节文件下载),而不必一次性将所有内容存储在内存中。它们与PHP自身中的流非常相似,但具有更适合异步、非阻塞I/O的接口。

目录

流的使用

ReactPHP在其生态系统中使用“流”的概念,以提供对任意数据内容和大小的流处理的一致性高级抽象。虽然流本身是一个相当低级的概念,但它可以用作强大的抽象,在顶部构建高级组件和协议。

如果您对这个概念还不熟悉,可以将它们想象成水管:您可以从源消耗水,或者您可以生产水并将其(通过管道)传输到任何目的地(接收器)。

同样,流可以是

  • 可读的(例如STDIN终端输入)或
  • 可写的(例如STDOUT终端输出)或
  • 双向的(可读和可写,例如TCP/IP连接)

因此,此包定义了以下三个接口

ReadableStreamInterface

ReadableStreamInterface负责提供只读流和双向流的可读方面的接口。

除了定义一些方法外,此接口还实现了EventEmitterInterface,允许您对某些事件做出反应。

事件回调函数必须是有效的callable,遵守严格的参数定义,并且必须接受与文档中记录的参数完全一致。事件回调函数不得抛出Exception。事件回调函数的返回值将被忽略,没有影响,因此出于性能原因,建议不要返回任何过多的数据结构。

此接口的每个实现都必须遵循以下事件语义,才能被视为良好行为的流。

请注意,此接口的高级实现可能选择定义具有特定语义的附加事件,而这些语义并非作为本低级流规范的一部分定义。对这些事件语义的兼容性超出了此接口的范围,因此您可能还需要参考此类高级实现的文档。

data事件

data事件将在从该源流读取/接收某些数据时发出。该事件接收一个用于传入数据的单个混合参数。

$stream->on('data', function (mixed $data): void {
    echo $data;
});

此事件可能被发出多次,这可能是零次,如果此流根本不发送任何数据。它不应该在endclose事件之后发出。

提供的$data参数可以是混合类型,但通常建议它应该是string值或可以使用允许表示为string的类型,以实现最大兼容性。

许多常见的流(如TCP/IP连接或基于文件的流)将作为string值的块发出接收到的原始(二进制)有效负载数据。

由于此流是基于流的,发送者可以发送任意数量的块,块的大小各异。没有保证这些块将以发送者意图发送的精确框架接收。换句话说,许多低级协议(如TCP/IP)以块的形式传输数据,这些块的大小可能在单字节到数十千字节之间。您可能需要对这些低级数据块应用高级协议,以实现正确的消息框架。

end事件

end事件将在源流成功到达流的末尾(EOF)时发出。

$stream->on('end', function (): void {
    echo 'END';
});

此事件应该只发出一次或一次也不发,具体取决于是否检测到成功的结束。它不应该在先前的endclose事件之后发出。如果由于非成功的结束(例如,在先前的error事件之后)而关闭流,则不得发出此事件。

在流结束之后,它必须切换到非可读模式,请参阅isReadable()

此事件仅在成功到达end时才会发出,而不是在流由于不可恢复的错误或显式关闭而被中断时。并非所有流都了解“成功结束”的概念。许多用例涉及检测流何时关闭(终止),在这种情况下,您应该使用close事件。在流发出end事件后,通常随后会发出close事件。

许多常见的流(如TCP/IP连接或基于文件的流)在远程端关闭连接或成功读取文件句柄直到其末尾(EOF)时将发出此事件。

请注意,不要将此事件与end()方法混淆。此事件定义了从源流成功读取的成功结束,而end()方法定义向目标流写入成功结束。

error事件

error事件在发生致命错误时发出,通常在尝试从该流读取时。该事件接收一个用于错误实例的单一Exception参数。

$server->on('error', function (Exception $e): void {
    echo 'Error: ' . $e->getMessage() . PHP_EOL;
});

此事件应该在流检测到致命错误时发出,例如致命传输错误或意外data或过早end事件之后。它不应该在先前的errorendclose事件之后发出。如果这不是致命错误条件,则不得发出此事件,例如没有导致任何数据丢失的暂时性网络问题。

在流发生错误后,它必须关闭流,因此应随后发出close事件,然后切换到非可读模式,请参阅close()isReadable()

许多常见的流(如TCP/IP连接或基于文件的流)仅处理数据传输,并不对数据边界(如意外的data或提前的end事件)做假设。换句话说,许多低级协议(如TCP/IP)可能选择只在出现致命传输错误时发出一次,然后关闭(终止)流。

如果这个流是DuplexStreamInterface,你还应该注意流的可写端也实现了error事件。换句话说,在读取或写入流的过程中可能会发生错误,这应该导致相同的错误处理。

close事件

当流关闭(终止)时,会发出close事件。

$stream->on('close', function (): void {
    echo 'CLOSED';
});

根据流是否终止,这个事件应该只发出一次或根本不发出。它不应该在之前的close事件之后发出。

流关闭后,它必须切换到不可读模式,参见isReadable()

end事件不同,这个事件应该在流关闭时发出,无论这是由于不可恢复的错误隐式发生还是由于任一侧显式关闭流。如果你只想检测一个成功的结束,你应该使用end事件。

许多常见的流(如TCP/IP连接或基于文件的流)可能会在读取一个成功的end事件或致命传输error事件后发出此事件。

如果这个流是DuplexStreamInterface,你还应该注意流的可写端也实现了close事件。换句话说,在接收到此事件后,流必须切换到不可写且不可读模式,参见isWritable()。请注意,不要将此事件与end事件混淆。

isReadable()

isReadable(): bool方法可以用来检查此流是否处于可读状态(尚未关闭)。

此方法可以用来检查流是否仍然接受传入的数据事件,或者它是否已经结束或关闭。一旦流变为不可读,不应再发出任何dataend事件。

assert($stream->isReadable() === false);

$stream->on('data', assertNeverCalled());
$stream->on('end', assertNeverCalled());

成功打开的流始终必须以可读模式开始。

一旦流结束或关闭,它必须切换到不可读模式。这可以随时发生,可能是通过close()显式发生,也可能是由于远程关闭或不可恢复的传输错误隐式发生。一旦流切换到不可读模式,它不得再返回到可读模式。

如果这个流是DuplexStreamInterface,你还应该注意流的可写端也实现了isWritable()方法。除非这是一个半开式双工流,否则它们通常应该有相同的返回值。

pause()

pause(): void方法可以用来暂停读取传入的数据事件。

从事件循环中移除数据源文件描述符。这允许你控制传入的数据。

除非另有说明,成功打开的流不应以暂停状态开始。

一旦流被暂停,不应再发出任何dataend事件。

$stream->pause();

$stream->on('data', assertShouldNeverCalled());
$stream->on('end', assertShouldNeverCalled());

此方法仅提供咨询,尽管通常不推荐,流可能会继续发出data事件。

你可以通过再次调用resume()来继续处理事件。

请注意,这两个方法可以多次调用,特别是多次调用pause()不应有任何效果。

另请参阅resume()

resume()

resume(): void方法可以用来恢复读取传入的数据事件。

在之前的pause()后重新附加数据源。

$stream->pause();

Loop::addTimer(1.0, function () use ($stream): void {
    $stream->resume();
});

请注意,这两种方法可以调用任意次数,特别是调用 resume() 而没有先前的 pause() 不应该有任何影响。

另请参阅 pause()

pipe()

pipe(WritableStreamInterface $dest, array $options = []): WritableStreamInterface 方法可以用来将此可读源的所有数据管道传输到指定的可写目标。

自动将所有传入数据发送到目标。根据目标能处理的数据自动限制源的速度。

$source->pipe($dest);

同样,您也可以将实现 DuplexStreamInterface 的实例管道传输到自身,以便将接收到的所有数据写回。这可能是一个有用的功能,例如在 TCP/IP 回显服务中

$connection->pipe($connection);

此方法返回目标流,无需更改,可以用来设置管道流链

$source->pipe($decodeGzip)->pipe($filterBadWords)->pipe($dest);

默认情况下,当源流发出 end 事件时,将调用目标流的 end()。这可以通过以下方式禁用

$source->pipe($dest, ['end' => false]);

请注意,这仅适用于 end 事件。如果在源流上发生 error 或显式的 close 事件,您必须手动关闭目标流

$source->pipe($dest);
$source->on('close', function () use ($dest): void {
    $dest->end('BYE!');
});

如果源流不可读(已关闭状态),则这不会产生任何影响。

$source->close();
$source->pipe($dest); // NO-OP

如果目标流不可写(已关闭状态),则这只会限制(暂停)源流

$dest->close();
$source->pipe($dest); // calls $source->pause()

同样,如果目标流在管道仍然活跃时关闭,它也会限制(暂停)源流

$source->pipe($dest);
$dest->close(); // calls $source->pause()

一旦管道设置成功,目标流必须发出一个带有此源流的 pipe 事件作为事件参数。

close()

close(): void 方法可以用来关闭流(强制关闭)。

此方法可以用来(强制)关闭流。

$stream->close();

一旦流被关闭,它应该发出一个 close 事件。请注意,此事件不应该被多次发出,特别是如果此方法被多次调用。

调用此方法后,流必须切换到非可读模式,另请参阅 isReadable()。这意味着不应再发出任何 dataend 事件。

$stream->close();
assert($stream->isReadable() === false);

$stream->on('data', assertNeverCalled());
$stream->on('end', assertNeverCalled());

如果此流是 DuplexStreamInterface,您还应注意到流的可写端也实现了 close() 方法。换句话说,调用此方法后,流必须切换到非可写和非可读模式,另请参阅 isWritable()。请注意,此方法不应与 end() 方法混淆。

WritableStreamInterface

WritableStreamInterface 负责提供只写流和双工流的可写端接口。

除了定义一些方法外,此接口还实现了EventEmitterInterface,允许您对某些事件做出反应。

事件回调函数必须是有效的callable,遵守严格的参数定义,并且必须接受与文档中记录的参数完全一致。事件回调函数不得抛出Exception。事件回调函数的返回值将被忽略,没有影响,因此出于性能原因,建议不要返回任何过多的数据结构。

此接口的每个实现都必须遵循以下事件语义,才能被视为良好行为的流。

请注意,此接口的高级实现可能选择定义具有特定语义的附加事件,而这些语义并非作为本低级流规范的一部分定义。对这些事件语义的兼容性超出了此接口的范围,因此您可能还需要参考此类高级实现的文档。

drain事件

每当写入缓冲区之前已满而现在准备好接受更多数据时,将发出 drain 事件。

$stream->on('drain', function () use ($stream): void {
    echo 'Stream is now ready to accept more data';
});

此事件应该在缓冲区之前已满而现在准备好接受更多数据时发出一次。换句话说,此事件可能被多次发出,可能为零次,如果缓冲区从未满过。如果缓冲区之前从未满过,则不应发出此事件。

此事件主要用于内部,另请参阅 write() 以获取更多详细信息。

pipe事件

每当将可读流 pipe() 到此流时,将发出 pipe 事件。事件接收一个 ReadableStreamInterface 参数,用于源流。

$stream->on('pipe', function (ReadableStreamInterface $source) use ($stream): void {
    echo 'Now receiving piped data';

    // explicitly close target if source emits an error
    $source->on('error', function () use ($stream): void {
        $stream->close();
    });
});

$source->pipe($stream);

对于每个成功管道传输到此目标流的可读流,必须发出此事件一次。换句话说,此事件可能被多次发出,可能为零次,如果没有流被管道传输到此流。如果源不可读(已关闭)或此目标不可写(已关闭),则不应发出此事件。

此事件主要用于内部,另请参阅 pipe() 以获取更多详细信息。

error事件

当发生致命错误时,将发出 error 事件,通常是在尝试写入此流时。事件接收一个 Exception 参数,用于错误实例。

$stream->on('error', function (Exception $e): void {
    echo 'Error: ' . $e->getMessage() . PHP_EOL;
});

当流检测到致命错误,例如致命传输错误时,应该一次触发此事件。不应该在先前的errorclose事件之后触发。如果不是致命错误条件(例如,暂时性网络问题没有造成数据丢失),则不得触发此事件。

在流错误之后,它必须关闭流,因此应该随后触发一个close事件,然后切换到不可写模式,也可参见close()isWritable()

许多常见的流(如TCP/IP连接或基于文件的流)只处理数据传输,并且可能只在这种情况下触发一次致命传输错误,然后关闭(终止)流。

如果此流是DuplexStreamInterface,还应注意到流的可读部分也实现了error事件。换句话说,在读取或写入流的过程中可能会发生错误,应进行相同的错误处理。

close事件

当流关闭(终止)时,会发出close事件。

$stream->on('close', function (): void {
    echo 'CLOSED';
});

根据流是否终止,这个事件应该只发出一次或根本不发出。它不应该在之前的close事件之后发出。

流关闭后,它必须切换到不可写模式,也可参见isWritable()

无论是因为不可恢复的错误而隐式发生,还是由于任一方显式关闭流,每次流关闭时都应触发此事件。

许多常见的流(如TCP/IP连接或基于文件的流)可能会在从end()方法刷新缓冲区后、在接收到成功的end事件后或接收到致命传输error事件后触发此事件。

如果此流是DuplexStreamInterface,还应注意到流的可读部分也实现了close事件。换句话说,在接收到此事件后,流必须切换到不可写和非可读模式,也可参见isReadable()。请注意,此事件不应与end事件混淆。

isWritable()

isWritable(): bool方法可用于检查此流是否处于可写状态(尚未关闭)。

此方法可用于检查流是否还接受写入任何数据,或者它已经结束或关闭。向非可写流写入任何数据都是NO-OP。

assert($stream->isWritable() === false);

$stream->write('end'); // NO-OP
$stream->end('end'); // NO-OP

成功打开的流始终必须以可写模式开始。

一旦流结束或关闭,它必须切换到不可写模式。这可以随时发生,可以是显式通过end()close,或隐式由于远程关闭或不可恢复的传输错误。一旦流切换到不可写模式,它不得再切换回可写模式。

如果此流是DuplexStreamInterface,还应注意到流的可读部分也实现了isReadable()方法。除非这是半开式全双工流,否则它们通常应该有相同的返回值。

write()

write(mixed $data): bool方法可用于将一些数据写入流。

成功的写入必须通过布尔值true确认,这意味着数据要么立即写入(刷新),要么缓冲并计划未来写入。请注意,此接口不会让您控制显式刷新缓冲区数据,因为这超出了此接口的范围,留给此接口的实现。

许多常见的流(如TCP/IP连接或基于文件的流)可能会选择使用底层EventLoop检查资源何时实际上是可写的,以缓冲所有给定数据并计划未来的刷新。

如果流无法处理写入(或刷新)数据,它应该触发一个error事件,并且如果它无法从该错误中恢复,它可能关闭流。

如果在添加$data后内部缓冲区已满,则write()应返回false,表示调用方应停止发送数据,直到缓冲区清空。一旦缓冲区准备好接受更多数据,流应发送一个drain事件。

同样,如果流不可写(已经处于关闭状态),它不应处理给定的$data,并应返回false,表示调用方应停止发送数据。

给定的$data参数可能为混合类型,但通常建议它应为一个string值或可以使用表示为string的类型,以实现最大兼容性。

许多常见的流(如TCP/IP连接或基于文件的流)仅接受作为string值块传输的原始(二进制)有效载荷数据。

由于此流是基于流的,发送者可以发送任意数量的块,块的大小各异。没有保证这些块将以发送者意图发送的精确框架接收。换句话说,许多低级协议(如TCP/IP)以块的形式传输数据,这些块的大小可能在单字节到数十千字节之间。您可能需要对这些低级数据块应用高级协议,以实现正确的消息框架。

end()

可以使用end(mixed $data = null): void方法成功结束流(在可选地发送一些最终数据后)。

此方法可用于成功结束流,即发送完所有当前缓冲的数据后关闭流。

$stream->write('hello');
$stream->write('world');
$stream->end();

如果没有数据正在缓冲且没有要刷新的数据,则此方法可以立即调用close()来关闭流。

如果缓冲区中仍有需要先刷新的数据,则此方法应尝试写出这些数据,然后才调用close()来关闭流。一旦流被关闭,它应发出一个close事件。

请注意,此接口不会给您控制显式刷新缓冲数据的权限,因为确定适当的时间超出了此接口的范围,留给此接口的实现。

许多常见的流(如TCP/IP连接或基于文件的流)可能会选择使用底层EventLoop检查资源何时实际上是可写的,以缓冲所有给定数据并计划未来的刷新。

您可以选择传递一些在结束流之前写入流的数据。如果$data提供了一个非null值,则此方法的行为就像在结束前没有数据调用write($data)一样。

// shorter version
$stream->end('bye');

// same as longer version
$stream->write('bye');
$stream->end();

调用此方法后,流必须切换到非可写模式,参见isWritable()。这意味着无法进行进一步写入,因此任何额外的write()end()调用都没有效果。

$stream->end();
assert($stream->isWritable() === false);

$stream->write('nope'); // NO-OP
$stream->end(); // NO-OP

如果此流是DuplexStreamInterface,调用此方法还应结束其可读侧,除非流支持半开模式。换句话说,在调用此方法后,这些流应切换到非可写和非可读模式,参见isReadable()。这意味着在这种情况下,流不应再发出任何dataend事件。流可以选择使用pause()方法的逻辑,但必须特别注意确保对resume()方法的后续调用不应继续发出可读事件。

请注意,此方法不应与close()方法混淆。

close()

close(): void 方法可以用来关闭流(强制关闭)。

此方法可用于强制关闭流,即在没有等待任何缓冲数据被刷新的情况下关闭流。如果缓冲区中仍有数据,则应丢弃这些数据。

$stream->close();

一旦流被关闭,它应该发出一个 close 事件。请注意,此事件不应该被多次发出,特别是如果此方法被多次调用。

调用此方法后,流必须切换到非可写模式,参见isWritable()。这意味着无法进行进一步写入,因此任何额外的write()end()调用都没有效果。

$stream->close();
assert($stream->isWritable() === false);

$stream->write('nope'); // NO-OP
$stream->end(); // NO-OP

请注意,此方法不应与end()方法混淆。与end()方法不同,此方法不关心任何现有的缓冲区,并简单地丢弃任何缓冲区内容。同样,此方法也可以在流上调用end()后调用,以停止等待流刷新其最终数据。

$stream->end();
Loop::addTimer(1.0, function () use ($stream): void {
    $stream->close();
});

如果此流是DuplexStreamInterface,您还应注意流的可读侧也实现了close()方法。换句话说,在调用此方法后,流必须切换到非可写和非可读模式,参见isReadable()

DuplexStreamInterface

DuplexStreamInterface 负责提供双工流(可读和可写)的接口。

它建立在现有的可读和可写流接口之上,并遵循完全相同的方法和事件语义。如果您对这个概念还不熟悉,应首先了解 ReadableStreamInterfaceWritableStreamInterface

除了定义了一些方法之外,此接口还实现了 EventEmitterInterface,允许您对在 ReadableStreamInterfaceWritableStreamInterface 上定义的相同事件做出反应。

事件回调函数必须是有效的callable,遵守严格的参数定义,并且必须接受与文档中记录的参数完全一致。事件回调函数不得抛出Exception。事件回调函数的返回值将被忽略,没有影响,因此出于性能原因,建议不要返回任何过多的数据结构。

此接口的每个实现都必须遵循以下事件语义,才能被视为良好行为的流。

请注意,此接口的高级实现可能选择定义具有特定语义的附加事件,而这些语义并非作为本低级流规范的一部分定义。对这些事件语义的兼容性超出了此接口的范围,因此您可能还需要参考此类高级实现的文档。

有关更多详细信息,请参阅 ReadableStreamInterfaceWritableStreamInterface

创建流

ReactPHP 在其整个生态系统中使用“流”的概念,因此许多更高层次的用户只需处理 流的使用。这意味着流实例通常在某个更高层次的组件中创建,许多用户实际上不必处理创建流实例。

  • 如果您想接受传入或建立明文 TCP/IP 或 TLS 套接字连接流,请使用 react/socket
  • 如果您想接收传入的 HTTP 请求体流,请使用 react/http
  • 如果您想通过进程管道(如 STDIN、STDOUT、STDERR 等)与子进程通信,请使用 react/child-process
  • 如果您想从文件系统读取或写入,请使用实验性的 react/filesystem
  • 请参阅最后一章了解更多实际应用 更多实际应用

然而,如果您正在编写低级组件或想从流资源创建流实例,那么下一章就是为您准备的。

请注意,以下示例仅使用 fopen()stream_socket_client() 进行说明。这些函数不应该在真正的异步程序中使用,因为每个调用可能需要几秒钟才能完成,否则会阻塞事件循环。此外,在某些平台上,fopen() 调用将返回一个文件句柄,这可能不被所有事件循环实现支持。作为替代方案,您可能想使用上面列出的更高层次库。

ReadableResourceStream

ReadableResourceStream 是 PHP 流资源的 ReadableStreamInterface 的具体实现。

这可以用来表示只读资源,如以可读模式打开的文件流或 STDIN 这样的流。

$stream = new ReadableResourceStream(STDIN);
$stream->on('data', function (string $chunk): void {
    echo $chunk;
});
$stream->on('end', function (): void {
    echo 'END';
});

有关更多详细信息,请参阅 ReadableStreamInterface

构造函数提供的第一个参数必须是有效的流资源,该资源已以读取模式打开(例如,fopen() 模式 r)。否则,它将抛出 InvalidArgumentException

// throws InvalidArgumentException
$stream = new ReadableResourceStream(false);

有关读写流资源的其他信息,请参阅 DuplexResourceStream

内部,此类尝试在流资源上启用非阻塞模式,这可能不支持所有流资源。最值得注意的是,Windows 上的管道(如 STDIN 等)不支持此功能。如果失败,它将抛出 RuntimeException

// throws RuntimeException on Windows
$stream = new ReadableResourceStream(STDIN);

一旦使用有效的流资源调用了构造函数,此类将负责底层流资源。您应该只使用其公共 API,不应该手动干涉底层流资源。

此类接受一个可选的 LoopInterface|null $loop 参数,可以用来将事件循环实例传递给此对象使用。您可以使用一个 null 值来使用默认的循环。除非您确定需要显式使用某个事件循环实例,否则不应提供此值。

此类接受一个可选的 int|null $readChunkSize 参数,用于控制一次从流中读取的最大缓冲区大小(字节)。您可以使用 null 值来应用其默认值。除非您知道自己在做什么,否则不应更改此值。这可以是一个正数,表示一次最多读取 X 字节。注意,实际读取的字节数可能更低,如果流资源当前可用的字节数少于 X 字节。这可以是 -1,表示“读取流资源中所有可用的内容”。这应该读取直到流资源不再可读(即底层缓冲区清空),注意这并不一定意味着它达到了 EOF。

$stream = new ReadableResourceStream(STDIN, null, 8192);

PHP 错误警告:如果 PHP 进程是明确启动的,但没有 STDIN 流,那么尝试从 STDIN 读取可能会返回来自其他流资源的数据。如果您使用 php test.php < /dev/null 而不是 php test.php <&- 启动此进程,则不会发生这种情况。有关更多详细信息,请参阅 #81

变更日志:从 v1.2.0 版本开始,$loop 参数可以省略(或使用 null 跳过)以使用默认的循环。

WritableResourceStream

WritableResourceStream 是 PHP 流资源的一个具体实现,实现了 WritableStreamInterface

这可以用来表示一个写入只的资源,例如以可写模式打开的文件流或 STDOUTSTDERR 这样的流。

$stream = new WritableResourceStream(STDOUT);
$stream->write('hello!');
$stream->end();

有关更多详细信息,请参阅 WritableStreamInterface

构造函数提供的第一个参数必须是有效的流资源,且必须已打开用于写入。否则,它将抛出 InvalidArgumentException

// throws InvalidArgumentException
$stream = new WritableResourceStream(false);

有关读写流资源的其他信息,请参阅 DuplexResourceStream

内部,此类尝试在流资源上启用非阻塞模式,这可能不是所有流资源都支持的。特别是,Windows 上的管道(STDOUT、STDERR 等)不支持此功能。如果失败,它将抛出 RuntimeException

// throws RuntimeException on Windows
$stream = new WritableResourceStream(STDOUT);

一旦使用有效的流资源调用了构造函数,此类将负责底层流资源。您应该只使用其公共 API,不应该手动干涉底层流资源。

此类的任何 write() 调用都不会立即执行,而是在事件循环报告流资源已准备好接受数据后异步执行。为此,它使用一个内存缓冲区字符串来收集所有挂起的写入。此缓冲区有一个软限制,它定义了在调用者应该停止发送更多数据之前它愿意接受多少数据。

此类接受一个可选的 LoopInterface|null $loop 参数,可以用来将事件循环实例传递给此对象使用。您可以使用一个 null 值来使用默认的循环。除非您确定需要显式使用某个事件循环实例,否则不应提供此值。

此类接受一个可选的 int|null $writeBufferSoftLimit 参数,用于控制此最大缓冲区大小(字节)。您可以使用 null 值来应用其默认值。除非您知道自己在做什么,否则不应更改此值。

$stream = new WritableResourceStream(STDOUT, null, 8192);

这个类接受一个可选的 int|null $writeChunkSize 参数,该参数控制一次写入流的最大缓冲区大小(以字节为单位)。您可以使用 null 值来应用其默认值。除非您了解自己在做什么,否则不应更改此值。这可以是一个正数,表示一次最多写入 X 字节到底层流资源。请注意,如果底层流资源当前可用的字节数少于 X,则实际写入的字节数可能更低。这可以是 -1,表示“写入所有可用内容”到底层流资源。

$stream = new WritableResourceStream(STDOUT, null, null, 8192);

有关更多详细信息,请参阅 write()

变更日志:从 v1.2.0 版本开始,$loop 参数可以省略(或使用 null 跳过)以使用默认的循环。

DuplexResourceStream

DuplexResourceStream 是 PHP 流资源的 DuplexStreamInterface 的具体实现。

这可以用来表示一个读写资源,如以读写模式打开的文件流或 TCP/IP 连接等。

$conn = stream_socket_client('tcp://google.com:80');
$stream = new DuplexResourceStream($conn);
$stream->write('hello!');
$stream->end();

有关更多详细信息,请参阅 DuplexStreamInterface

构造函数给出的第一个参数必须是有效的流资源,并且已打开用于读写。否则,它将抛出 InvalidArgumentException

// throws InvalidArgumentException
$stream = new DuplexResourceStream(false);

有关只读和只写流资源的信息,请参阅 ReadableResourceStreamWritableResourceStream

内部,此类尝试在流资源上启用非阻塞模式,这可能不是所有流资源都支持的。特别是,Windows 上的管道(STDOUT、STDERR 等)不支持此功能。如果失败,它将抛出 RuntimeException

// throws RuntimeException on Windows
$stream = new DuplexResourceStream(STDOUT);

一旦使用有效的流资源调用了构造函数,此类将负责底层流资源。您应该只使用其公共 API,不应该手动干涉底层流资源。

此类接受一个可选的 LoopInterface|null $loop 参数,可以用来将事件循环实例传递给此对象使用。您可以使用一个 null 值来使用默认的循环。除非您确定需要显式使用某个事件循环实例,否则不应提供此值。

此类接受一个可选的 int|null $readChunkSize 参数,用于控制一次从流中读取的最大缓冲区大小(字节)。您可以使用 null 值来应用其默认值。除非您知道自己在做什么,否则不应更改此值。这可以是一个正数,表示一次最多读取 X 字节。注意,实际读取的字节数可能更低,如果流资源当前可用的字节数少于 X 字节。这可以是 -1,表示“读取流资源中所有可用的内容”。这应该读取直到流资源不再可读(即底层缓冲区清空),注意这并不一定意味着它达到了 EOF。

$conn = stream_socket_client('tcp://google.com:80');
$stream = new DuplexResourceStream($conn, null, 8192);

此类的任何 write() 调用都不会立即执行,而是在事件循环报告流资源已准备好接受数据后异步执行。为此,它使用一个内存缓冲区字符串来收集所有挂起的写入。此缓冲区有一个软限制,它定义了在调用者应该停止发送更多数据之前它愿意接受多少数据。

这个类还接受另一个可选的 WritableStreamInterface|null $buffer 参数,该参数控制此流的写入行为。您可以使用 null 值来应用其默认值。除非您了解自己在做什么,否则不应更改此值。

如果您想更改写入缓冲区的软限制,可以传递一个类似于这样的 WritableResourceStream 实例。

$conn = stream_socket_client('tcp://google.com:80');
$buffer = new WritableResourceStream($conn, null, 8192);
$stream = new DuplexResourceStream($conn, null, null, $buffer);

有关更多详细信息,请参阅 WritableResourceStream

变更日志:从 v1.2.0 版本开始,$loop 参数可以省略(或使用 null 跳过)以使用默认的循环。

ThroughStream

ThroughStream 实现了 DuplexStreamInterface,并将简单地将您写入的任何数据传递到其可读端。

$through = new ThroughStream();
$through->on('data', $this->expectCallableOnceWith('hello'));

$through->write('hello');

类似地,end() 方法将结束流并触发一个 end 事件,然后 close() 流。该方法将关闭流并触发一个 close 事件。因此,它也可以在 pipe() 上下文中使用,如下所示。

$through = new ThroughStream();
$source->pipe($through)->pipe($dest);

构造函数可以接受任何可调用的函数,然后将其用于 过滤 写入的数据。该函数接收一个数据参数,作为传递给可写端的数据,并必须返回数据,以便将其传递到其可读端。

$through = new ThroughStream('strtoupper');
$source->pipe($through)->pipe($dest);

请注意,此类对任何数据类型没有假设。它可以用来转换数据,例如将任何结构化数据转换为类似这样的换行符分隔的 JSON (NDJSON) 流。

$through = new ThroughStream(function (mixed $data): string {
    return json_encode($data) . PHP_EOL;
});
$through->on('data', $this->expectCallableOnceWith("[2, true]\n"));

$through->write([2, true]);

回调函数允许抛出 Exception。在这种情况下,流将触发一个 error 事件,然后 close() 流。

$through = new ThroughStream(function (mixed $data): string {
    if (!is_string($data)) {
        throw new \UnexpectedValueException('Only strings allowed');
    }
    return $data;
});
$through->on('error', $this->expectCallableOnce()));
$through->on('close', $this->expectCallableOnce()));
$through->on('data', $this->expectCallableNever()));

$through->write(2);

CompositeStream

CompositeStream实现了DuplexStreamInterface,可以用于从两个分别实现ReadableStreamInterfaceWritableStreamInterface的独立流中创建一个单双向流。

这在需要单个DuplexStreamInterface或使用这样的单流实例更方便时非常有用。

$stdin = new ReadableResourceStream(STDIN);
$stdout = new WritableResourceStream(STDOUT);

$stdio = new CompositeStream($stdin, $stdout);

$stdio->on('data', function (string $chunk) use ($stdio): void {
    $stdio->write('You said: ' . $chunk);
});

这是一个表现良好的流,它会将底层流的所有流事件传递出去,并将所有流调用传递到底层流。

如果你向双向流中write(),它将简单地向可写端write()并返回其状态。

如果你结束双向流的end(),它将结束可写端并将可读端pause()

如果你关闭双向流,两个输入流都将关闭。如果两个输入流中的任何一个在构造双向流时已经发出close事件,双向流也将关闭。如果两个输入流中的任何一个在构造双向流时已经关闭,它将关闭另一个端并返回一个已关闭的流。

使用

以下示例可用于将源文件的内容传入目标文件,而不必将整个文件读入内存

$source = new React\Stream\ReadableResourceStream(fopen('source.txt', 'r'));
$dest = new React\Stream\WritableResourceStream(fopen('destination.txt', 'w'));

$source->pipe($dest);

请注意,此示例仅用于说明目的,仅使用fopen()。这不应在真正的异步程序中使用,因为文件系统本质上具有阻塞性,每次调用可能需要几秒钟。有关更复杂的示例,请参阅创建流

安装

建议通过Composer安装此库。你是Composer的新手?

发布后,此项目将遵循SemVer。目前,这将安装最新的开发版本

composer require react/stream:^3@dev

有关版本升级的详细信息,请参阅变更日志

此项目旨在在所有平台上运行,因此不需要任何PHP扩展,并支持在PHP 7.1到当前PHP 8+上运行。强烈建议为此项目使用最新支持的PHP版本

测试

要运行测试套件,您首先需要克隆此存储库,然后通过Composer安装所有依赖项

composer install

要运行测试套件,请转到项目根目录并运行

vendor/bin/phpunit

测试套件还包括一些依赖稳定互联网连接的功能集成测试。如果您不想运行这些测试,可以像这样简单地跳过

vendor/bin/phpunit --exclude-group internet

除此之外,我们使用PHPStan在最高级别上确保整个项目中的类型安全

vendor/bin/phpstan

许可

MIT许可,请参阅许可文件

更多