react / stream
ReactPHP中用于非阻塞I/O的事件驱动可读和可写流
Requires
- php: >=5.3.8
- evenement/evenement: ^3.0 || ^2.0 || ^1.0
- react/event-loop: ^1.2
Requires (Dev)
- clue/stream-filter: ~1.2
- phpunit/phpunit: ^9.6 || ^5.7 || ^4.8.36
This package is auto-updated.
Last update: 2024-09-11 13:25:22 UTC
README
ReactPHP中的事件驱动可读和可写流,用于非阻塞I/O。
开发版本:此分支包含即将发布的v3版本的代码。要查看当前稳定版v1的代码,请查看
1.x
分支。即将发布的v3版本将是此包的发展方向。然而,我们仍将积极支持v1版本,以帮助尚未升级到最新版本的用户。有关更多详细信息,请参阅安装说明。
为了使EventLoop更容易使用,此组件引入了强大的“流”概念。流允许您以小块的方式高效地处理大量数据(例如多吉字节文件下载),而不必一次性将所有内容存储在内存中。它们与PHP自身中的流非常相似,但具有更适合异步、非阻塞I/O的接口。
目录
流的使用
ReactPHP在其生态系统中使用“流”的概念,以提供对任意数据内容和大小的流处理的一致性高级抽象。虽然流本身是一个相当低级的概念,但它可以用作强大的抽象,在顶部构建高级组件和协议。
如果您对这个概念还不熟悉,可以将它们想象成水管:您可以从源消耗水,或者您可以生产水并将其(通过管道)传输到任何目的地(接收器)。
同样,流可以是
- 可读的(例如
STDIN
终端输入)或 - 可写的(例如
STDOUT
终端输出)或 - 双向的(可读和可写,例如TCP/IP连接)
因此,此包定义了以下三个接口
ReadableStreamInterface
ReadableStreamInterface
负责提供只读流和双向流的可读方面的接口。
除了定义一些方法外,此接口还实现了EventEmitterInterface
,允许您对某些事件做出反应。
事件回调函数必须是有效的callable
,遵守严格的参数定义,并且必须接受与文档中记录的参数完全一致。事件回调函数不得抛出Exception
。事件回调函数的返回值将被忽略,没有影响,因此出于性能原因,建议不要返回任何过多的数据结构。
此接口的每个实现都必须遵循以下事件语义,才能被视为良好行为的流。
请注意,此接口的高级实现可能选择定义具有特定语义的附加事件,而这些语义并非作为本低级流规范的一部分定义。对这些事件语义的兼容性超出了此接口的范围,因此您可能还需要参考此类高级实现的文档。
data事件
data
事件将在从该源流读取/接收某些数据时发出。该事件接收一个用于传入数据的单个混合参数。
$stream->on('data', function (mixed $data): void { echo $data; });
此事件可能被发出多次,这可能是零次,如果此流根本不发送任何数据。它不应该在end
或close
事件之后发出。
提供的$data
参数可以是混合类型,但通常建议它应该是string
值或可以使用允许表示为string
的类型,以实现最大兼容性。
许多常见的流(如TCP/IP连接或基于文件的流)将作为string
值的块发出接收到的原始(二进制)有效负载数据。
由于此流是基于流的,发送者可以发送任意数量的块,块的大小各异。没有保证这些块将以发送者意图发送的精确框架接收。换句话说,许多低级协议(如TCP/IP)以块的形式传输数据,这些块的大小可能在单字节到数十千字节之间。您可能需要对这些低级数据块应用高级协议,以实现正确的消息框架。
end事件
end
事件将在源流成功到达流的末尾(EOF)时发出。
$stream->on('end', function (): void { echo 'END'; });
此事件应该只发出一次或一次也不发,具体取决于是否检测到成功的结束。它不应该在先前的end
或close
事件之后发出。如果由于非成功的结束(例如,在先前的error
事件之后)而关闭流,则不得发出此事件。
在流结束之后,它必须切换到非可读模式,请参阅isReadable()
。
此事件仅在成功到达end时才会发出,而不是在流由于不可恢复的错误或显式关闭而被中断时。并非所有流都了解“成功结束”的概念。许多用例涉及检测流何时关闭(终止),在这种情况下,您应该使用close
事件。在流发出end
事件后,通常随后会发出close
事件。
许多常见的流(如TCP/IP连接或基于文件的流)在远程端关闭连接或成功读取文件句柄直到其末尾(EOF)时将发出此事件。
请注意,不要将此事件与end()
方法混淆。此事件定义了从源流成功读取的成功结束,而end()
方法定义向目标流写入成功结束。
error事件
error
事件在发生致命错误时发出,通常在尝试从该流读取时。该事件接收一个用于错误实例的单一Exception
参数。
$server->on('error', function (Exception $e): void { echo 'Error: ' . $e->getMessage() . PHP_EOL; });
此事件应该在流检测到致命错误时发出,例如致命传输错误或意外data
或过早end
事件之后。它不应该在先前的error
、end
或close
事件之后发出。如果这不是致命错误条件,则不得发出此事件,例如没有导致任何数据丢失的暂时性网络问题。
在流发生错误后,它必须关闭流,因此应随后发出close
事件,然后切换到非可读模式,请参阅close()
和isReadable()
。
许多常见的流(如TCP/IP连接或基于文件的流)仅处理数据传输,并不对数据边界(如意外的data
或提前的end
事件)做假设。换句话说,许多低级协议(如TCP/IP)可能选择只在出现致命传输错误时发出一次,然后关闭(终止)流。
如果这个流是DuplexStreamInterface
,你还应该注意流的可写端也实现了error
事件。换句话说,在读取或写入流的过程中可能会发生错误,这应该导致相同的错误处理。
close事件
当流关闭(终止)时,会发出close
事件。
$stream->on('close', function (): void { echo 'CLOSED'; });
根据流是否终止,这个事件应该只发出一次或根本不发出。它不应该在之前的close
事件之后发出。
流关闭后,它必须切换到不可读模式,参见isReadable()
。
与end
事件不同,这个事件应该在流关闭时发出,无论这是由于不可恢复的错误隐式发生还是由于任一侧显式关闭流。如果你只想检测一个成功的结束,你应该使用end
事件。
许多常见的流(如TCP/IP连接或基于文件的流)可能会在读取一个成功的end
事件或致命传输error
事件后发出此事件。
如果这个流是DuplexStreamInterface
,你还应该注意流的可写端也实现了close
事件。换句话说,在接收到此事件后,流必须切换到不可写且不可读模式,参见isWritable()
。请注意,不要将此事件与end
事件混淆。
isReadable()
isReadable(): bool
方法可以用来检查此流是否处于可读状态(尚未关闭)。
此方法可以用来检查流是否仍然接受传入的数据事件,或者它是否已经结束或关闭。一旦流变为不可读,不应再发出任何data
或end
事件。
assert($stream->isReadable() === false); $stream->on('data', assertNeverCalled()); $stream->on('end', assertNeverCalled());
成功打开的流始终必须以可读模式开始。
一旦流结束或关闭,它必须切换到不可读模式。这可以随时发生,可能是通过close()
显式发生,也可能是由于远程关闭或不可恢复的传输错误隐式发生。一旦流切换到不可读模式,它不得再返回到可读模式。
如果这个流是DuplexStreamInterface
,你还应该注意流的可写端也实现了isWritable()
方法。除非这是一个半开式双工流,否则它们通常应该有相同的返回值。
pause()
pause(): void
方法可以用来暂停读取传入的数据事件。
从事件循环中移除数据源文件描述符。这允许你控制传入的数据。
除非另有说明,成功打开的流不应以暂停状态开始。
一旦流被暂停,不应再发出任何data
或end
事件。
$stream->pause(); $stream->on('data', assertShouldNeverCalled()); $stream->on('end', assertShouldNeverCalled());
此方法仅提供咨询,尽管通常不推荐,流可能会继续发出data
事件。
你可以通过再次调用resume()
来继续处理事件。
请注意,这两个方法可以多次调用,特别是多次调用pause()
不应有任何效果。
另请参阅resume()
。
resume()
resume(): void
方法可以用来恢复读取传入的数据事件。
在之前的pause()
后重新附加数据源。
$stream->pause(); Loop::addTimer(1.0, function () use ($stream): void { $stream->resume(); });
请注意,这两种方法可以调用任意次数,特别是调用 resume()
而没有先前的 pause()
不应该有任何影响。
另请参阅 pause()
。
pipe()
pipe(WritableStreamInterface $dest, array $options = []): WritableStreamInterface
方法可以用来将此可读源的所有数据管道传输到指定的可写目标。
自动将所有传入数据发送到目标。根据目标能处理的数据自动限制源的速度。
$source->pipe($dest);
同样,您也可以将实现 DuplexStreamInterface
的实例管道传输到自身,以便将接收到的所有数据写回。这可能是一个有用的功能,例如在 TCP/IP 回显服务中
$connection->pipe($connection);
此方法返回目标流,无需更改,可以用来设置管道流链
$source->pipe($decodeGzip)->pipe($filterBadWords)->pipe($dest);
默认情况下,当源流发出 end
事件时,将调用目标流的 end()
。这可以通过以下方式禁用
$source->pipe($dest, ['end' => false]);
请注意,这仅适用于 end
事件。如果在源流上发生 error
或显式的 close
事件,您必须手动关闭目标流
$source->pipe($dest); $source->on('close', function () use ($dest): void { $dest->end('BYE!'); });
如果源流不可读(已关闭状态),则这不会产生任何影响。
$source->close(); $source->pipe($dest); // NO-OP
如果目标流不可写(已关闭状态),则这只会限制(暂停)源流
$dest->close(); $source->pipe($dest); // calls $source->pause()
同样,如果目标流在管道仍然活跃时关闭,它也会限制(暂停)源流
$source->pipe($dest); $dest->close(); // calls $source->pause()
一旦管道设置成功,目标流必须发出一个带有此源流的 pipe
事件作为事件参数。
close()
close(): void
方法可以用来关闭流(强制关闭)。
此方法可以用来(强制)关闭流。
$stream->close();
一旦流被关闭,它应该发出一个 close
事件。请注意,此事件不应该被多次发出,特别是如果此方法被多次调用。
调用此方法后,流必须切换到非可读模式,另请参阅 isReadable()
。这意味着不应再发出任何 data
或 end
事件。
$stream->close(); assert($stream->isReadable() === false); $stream->on('data', assertNeverCalled()); $stream->on('end', assertNeverCalled());
如果此流是 DuplexStreamInterface
,您还应注意到流的可写端也实现了 close()
方法。换句话说,调用此方法后,流必须切换到非可写和非可读模式,另请参阅 isWritable()
。请注意,此方法不应与 end()
方法混淆。
WritableStreamInterface
WritableStreamInterface
负责提供只写流和双工流的可写端接口。
除了定义一些方法外,此接口还实现了EventEmitterInterface
,允许您对某些事件做出反应。
事件回调函数必须是有效的callable
,遵守严格的参数定义,并且必须接受与文档中记录的参数完全一致。事件回调函数不得抛出Exception
。事件回调函数的返回值将被忽略,没有影响,因此出于性能原因,建议不要返回任何过多的数据结构。
此接口的每个实现都必须遵循以下事件语义,才能被视为良好行为的流。
请注意,此接口的高级实现可能选择定义具有特定语义的附加事件,而这些语义并非作为本低级流规范的一部分定义。对这些事件语义的兼容性超出了此接口的范围,因此您可能还需要参考此类高级实现的文档。
drain事件
每当写入缓冲区之前已满而现在准备好接受更多数据时,将发出 drain
事件。
$stream->on('drain', function () use ($stream): void { echo 'Stream is now ready to accept more data'; });
此事件应该在缓冲区之前已满而现在准备好接受更多数据时发出一次。换句话说,此事件可能被多次发出,可能为零次,如果缓冲区从未满过。如果缓冲区之前从未满过,则不应发出此事件。
此事件主要用于内部,另请参阅 write()
以获取更多详细信息。
pipe事件
每当将可读流 pipe()
到此流时,将发出 pipe
事件。事件接收一个 ReadableStreamInterface
参数,用于源流。
$stream->on('pipe', function (ReadableStreamInterface $source) use ($stream): void { echo 'Now receiving piped data'; // explicitly close target if source emits an error $source->on('error', function () use ($stream): void { $stream->close(); }); }); $source->pipe($stream);
对于每个成功管道传输到此目标流的可读流,必须发出此事件一次。换句话说,此事件可能被多次发出,可能为零次,如果没有流被管道传输到此流。如果源不可读(已关闭)或此目标不可写(已关闭),则不应发出此事件。
此事件主要用于内部,另请参阅 pipe()
以获取更多详细信息。
error事件
当发生致命错误时,将发出 error
事件,通常是在尝试写入此流时。事件接收一个 Exception
参数,用于错误实例。
$stream->on('error', function (Exception $e): void { echo 'Error: ' . $e->getMessage() . PHP_EOL; });
当流检测到致命错误,例如致命传输错误时,应该一次触发此事件。不应该在先前的error
或close
事件之后触发。如果不是致命错误条件(例如,暂时性网络问题没有造成数据丢失),则不得触发此事件。
在流错误之后,它必须关闭流,因此应该随后触发一个close
事件,然后切换到不可写模式,也可参见close()
和isWritable()
。
许多常见的流(如TCP/IP连接或基于文件的流)只处理数据传输,并且可能只在这种情况下触发一次致命传输错误,然后关闭(终止)流。
如果此流是DuplexStreamInterface
,还应注意到流的可读部分也实现了error
事件。换句话说,在读取或写入流的过程中可能会发生错误,应进行相同的错误处理。
close事件
当流关闭(终止)时,会发出close
事件。
$stream->on('close', function (): void { echo 'CLOSED'; });
根据流是否终止,这个事件应该只发出一次或根本不发出。它不应该在之前的close
事件之后发出。
流关闭后,它必须切换到不可写模式,也可参见isWritable()
。
无论是因为不可恢复的错误而隐式发生,还是由于任一方显式关闭流,每次流关闭时都应触发此事件。
许多常见的流(如TCP/IP连接或基于文件的流)可能会在从end()
方法刷新缓冲区后、在接收到成功的end
事件后或接收到致命传输error
事件后触发此事件。
如果此流是DuplexStreamInterface
,还应注意到流的可读部分也实现了close
事件。换句话说,在接收到此事件后,流必须切换到不可写和非可读模式,也可参见isReadable()
。请注意,此事件不应与end
事件混淆。
isWritable()
isWritable(): bool
方法可用于检查此流是否处于可写状态(尚未关闭)。
此方法可用于检查流是否还接受写入任何数据,或者它已经结束或关闭。向非可写流写入任何数据都是NO-OP。
assert($stream->isWritable() === false); $stream->write('end'); // NO-OP $stream->end('end'); // NO-OP
成功打开的流始终必须以可写模式开始。
一旦流结束或关闭,它必须切换到不可写模式。这可以随时发生,可以是显式通过end()
或close
,或隐式由于远程关闭或不可恢复的传输错误。一旦流切换到不可写模式,它不得再切换回可写模式。
如果此流是DuplexStreamInterface
,还应注意到流的可读部分也实现了isReadable()
方法。除非这是半开式全双工流,否则它们通常应该有相同的返回值。
write()
write(mixed $data): bool
方法可用于将一些数据写入流。
成功的写入必须通过布尔值true
确认,这意味着数据要么立即写入(刷新),要么缓冲并计划未来写入。请注意,此接口不会让您控制显式刷新缓冲区数据,因为这超出了此接口的范围,留给此接口的实现。
许多常见的流(如TCP/IP连接或基于文件的流)可能会选择使用底层EventLoop检查资源何时实际上是可写的,以缓冲所有给定数据并计划未来的刷新。
如果流无法处理写入(或刷新)数据,它应该触发一个error
事件,并且如果它无法从该错误中恢复,它可能关闭流。
如果在添加$data
后内部缓冲区已满,则write()
应返回false
,表示调用方应停止发送数据,直到缓冲区清空。一旦缓冲区准备好接受更多数据,流应发送一个drain
事件。
同样,如果流不可写(已经处于关闭状态),它不应处理给定的$data
,并应返回false
,表示调用方应停止发送数据。
给定的$data
参数可能为混合类型,但通常建议它应为一个string
值或可以使用表示为string
的类型,以实现最大兼容性。
许多常见的流(如TCP/IP连接或基于文件的流)仅接受作为string
值块传输的原始(二进制)有效载荷数据。
由于此流是基于流的,发送者可以发送任意数量的块,块的大小各异。没有保证这些块将以发送者意图发送的精确框架接收。换句话说,许多低级协议(如TCP/IP)以块的形式传输数据,这些块的大小可能在单字节到数十千字节之间。您可能需要对这些低级数据块应用高级协议,以实现正确的消息框架。
end()
可以使用end(mixed $data = null): void
方法成功结束流(在可选地发送一些最终数据后)。
此方法可用于成功结束流,即发送完所有当前缓冲的数据后关闭流。
$stream->write('hello'); $stream->write('world'); $stream->end();
如果没有数据正在缓冲且没有要刷新的数据,则此方法可以立即调用close()
来关闭流。
如果缓冲区中仍有需要先刷新的数据,则此方法应尝试写出这些数据,然后才调用close()
来关闭流。一旦流被关闭,它应发出一个close
事件。
请注意,此接口不会给您控制显式刷新缓冲数据的权限,因为确定适当的时间超出了此接口的范围,留给此接口的实现。
许多常见的流(如TCP/IP连接或基于文件的流)可能会选择使用底层EventLoop检查资源何时实际上是可写的,以缓冲所有给定数据并计划未来的刷新。
您可以选择传递一些在结束流之前写入流的数据。如果$data
提供了一个非null
值,则此方法的行为就像在结束前没有数据调用write($data)
一样。
// shorter version $stream->end('bye'); // same as longer version $stream->write('bye'); $stream->end();
调用此方法后,流必须切换到非可写模式,参见isWritable()
。这意味着无法进行进一步写入,因此任何额外的write()
或end()
调用都没有效果。
$stream->end(); assert($stream->isWritable() === false); $stream->write('nope'); // NO-OP $stream->end(); // NO-OP
如果此流是DuplexStreamInterface
,调用此方法还应结束其可读侧,除非流支持半开模式。换句话说,在调用此方法后,这些流应切换到非可写和非可读模式,参见isReadable()
。这意味着在这种情况下,流不应再发出任何data
或end
事件。流可以选择使用pause()
方法的逻辑,但必须特别注意确保对resume()
方法的后续调用不应继续发出可读事件。
请注意,此方法不应与close()
方法混淆。
close()
close(): void
方法可以用来关闭流(强制关闭)。
此方法可用于强制关闭流,即在没有等待任何缓冲数据被刷新的情况下关闭流。如果缓冲区中仍有数据,则应丢弃这些数据。
$stream->close();
一旦流被关闭,它应该发出一个 close
事件。请注意,此事件不应该被多次发出,特别是如果此方法被多次调用。
调用此方法后,流必须切换到非可写模式,参见isWritable()
。这意味着无法进行进一步写入,因此任何额外的write()
或end()
调用都没有效果。
$stream->close(); assert($stream->isWritable() === false); $stream->write('nope'); // NO-OP $stream->end(); // NO-OP
请注意,此方法不应与end()
方法混淆。与end()
方法不同,此方法不关心任何现有的缓冲区,并简单地丢弃任何缓冲区内容。同样,此方法也可以在流上调用end()
后调用,以停止等待流刷新其最终数据。
$stream->end(); Loop::addTimer(1.0, function () use ($stream): void { $stream->close(); });
如果此流是DuplexStreamInterface
,您还应注意流的可读侧也实现了close()
方法。换句话说,在调用此方法后,流必须切换到非可写和非可读模式,参见isReadable()
。
DuplexStreamInterface
DuplexStreamInterface
负责提供双工流(可读和可写)的接口。
它建立在现有的可读和可写流接口之上,并遵循完全相同的方法和事件语义。如果您对这个概念还不熟悉,应首先了解 ReadableStreamInterface
和 WritableStreamInterface
。
除了定义了一些方法之外,此接口还实现了 EventEmitterInterface
,允许您对在 ReadableStreamInterface
和 WritableStreamInterface
上定义的相同事件做出反应。
事件回调函数必须是有效的callable
,遵守严格的参数定义,并且必须接受与文档中记录的参数完全一致。事件回调函数不得抛出Exception
。事件回调函数的返回值将被忽略,没有影响,因此出于性能原因,建议不要返回任何过多的数据结构。
此接口的每个实现都必须遵循以下事件语义,才能被视为良好行为的流。
请注意,此接口的高级实现可能选择定义具有特定语义的附加事件,而这些语义并非作为本低级流规范的一部分定义。对这些事件语义的兼容性超出了此接口的范围,因此您可能还需要参考此类高级实现的文档。
有关更多详细信息,请参阅 ReadableStreamInterface
和 WritableStreamInterface
。
创建流
ReactPHP 在其整个生态系统中使用“流”的概念,因此许多更高层次的用户只需处理 流的使用。这意味着流实例通常在某个更高层次的组件中创建,许多用户实际上不必处理创建流实例。
- 如果您想接受传入或建立明文 TCP/IP 或 TLS 套接字连接流,请使用 react/socket。
- 如果您想接收传入的 HTTP 请求体流,请使用 react/http。
- 如果您想通过进程管道(如 STDIN、STDOUT、STDERR 等)与子进程通信,请使用 react/child-process。
- 如果您想从文件系统读取或写入,请使用实验性的 react/filesystem。
- 请参阅最后一章了解更多实际应用 更多实际应用。
然而,如果您正在编写低级组件或想从流资源创建流实例,那么下一章就是为您准备的。
请注意,以下示例仅使用
fopen()
和stream_socket_client()
进行说明。这些函数不应该在真正的异步程序中使用,因为每个调用可能需要几秒钟才能完成,否则会阻塞事件循环。此外,在某些平台上,fopen()
调用将返回一个文件句柄,这可能不被所有事件循环实现支持。作为替代方案,您可能想使用上面列出的更高层次库。
ReadableResourceStream
ReadableResourceStream
是 PHP 流资源的 ReadableStreamInterface
的具体实现。
这可以用来表示只读资源,如以可读模式打开的文件流或 STDIN
这样的流。
$stream = new ReadableResourceStream(STDIN); $stream->on('data', function (string $chunk): void { echo $chunk; }); $stream->on('end', function (): void { echo 'END'; });
有关更多详细信息,请参阅 ReadableStreamInterface
。
构造函数提供的第一个参数必须是有效的流资源,该资源已以读取模式打开(例如,fopen()
模式 r
)。否则,它将抛出 InvalidArgumentException
。
// throws InvalidArgumentException $stream = new ReadableResourceStream(false);
有关读写流资源的其他信息,请参阅 DuplexResourceStream
。
内部,此类尝试在流资源上启用非阻塞模式,这可能不支持所有流资源。最值得注意的是,Windows 上的管道(如 STDIN 等)不支持此功能。如果失败,它将抛出 RuntimeException
。
// throws RuntimeException on Windows $stream = new ReadableResourceStream(STDIN);
一旦使用有效的流资源调用了构造函数,此类将负责底层流资源。您应该只使用其公共 API,不应该手动干涉底层流资源。
此类接受一个可选的 LoopInterface|null $loop
参数,可以用来将事件循环实例传递给此对象使用。您可以使用一个 null
值来使用默认的循环。除非您确定需要显式使用某个事件循环实例,否则不应提供此值。
此类接受一个可选的 int|null $readChunkSize
参数,用于控制一次从流中读取的最大缓冲区大小(字节)。您可以使用 null
值来应用其默认值。除非您知道自己在做什么,否则不应更改此值。这可以是一个正数,表示一次最多读取 X 字节。注意,实际读取的字节数可能更低,如果流资源当前可用的字节数少于 X 字节。这可以是 -1
,表示“读取流资源中所有可用的内容”。这应该读取直到流资源不再可读(即底层缓冲区清空),注意这并不一定意味着它达到了 EOF。
$stream = new ReadableResourceStream(STDIN, null, 8192);
PHP 错误警告:如果 PHP 进程是明确启动的,但没有
STDIN
流,那么尝试从STDIN
读取可能会返回来自其他流资源的数据。如果您使用php test.php < /dev/null
而不是php test.php <&-
启动此进程,则不会发生这种情况。有关更多详细信息,请参阅 #81。
变更日志:从 v1.2.0 版本开始,
$loop
参数可以省略(或使用null
跳过)以使用默认的循环。
WritableResourceStream
WritableResourceStream
是 PHP 流资源的一个具体实现,实现了 WritableStreamInterface
。
这可以用来表示一个写入只的资源,例如以可写模式打开的文件流或 STDOUT
或 STDERR
这样的流。
$stream = new WritableResourceStream(STDOUT); $stream->write('hello!'); $stream->end();
有关更多详细信息,请参阅 WritableStreamInterface
。
构造函数提供的第一个参数必须是有效的流资源,且必须已打开用于写入。否则,它将抛出 InvalidArgumentException
。
// throws InvalidArgumentException $stream = new WritableResourceStream(false);
有关读写流资源的其他信息,请参阅 DuplexResourceStream
。
内部,此类尝试在流资源上启用非阻塞模式,这可能不是所有流资源都支持的。特别是,Windows 上的管道(STDOUT、STDERR 等)不支持此功能。如果失败,它将抛出 RuntimeException
。
// throws RuntimeException on Windows $stream = new WritableResourceStream(STDOUT);
一旦使用有效的流资源调用了构造函数,此类将负责底层流资源。您应该只使用其公共 API,不应该手动干涉底层流资源。
此类的任何 write()
调用都不会立即执行,而是在事件循环报告流资源已准备好接受数据后异步执行。为此,它使用一个内存缓冲区字符串来收集所有挂起的写入。此缓冲区有一个软限制,它定义了在调用者应该停止发送更多数据之前它愿意接受多少数据。
此类接受一个可选的 LoopInterface|null $loop
参数,可以用来将事件循环实例传递给此对象使用。您可以使用一个 null
值来使用默认的循环。除非您确定需要显式使用某个事件循环实例,否则不应提供此值。
此类接受一个可选的 int|null $writeBufferSoftLimit
参数,用于控制此最大缓冲区大小(字节)。您可以使用 null
值来应用其默认值。除非您知道自己在做什么,否则不应更改此值。
$stream = new WritableResourceStream(STDOUT, null, 8192);
这个类接受一个可选的 int|null $writeChunkSize
参数,该参数控制一次写入流的最大缓冲区大小(以字节为单位)。您可以使用 null
值来应用其默认值。除非您了解自己在做什么,否则不应更改此值。这可以是一个正数,表示一次最多写入 X 字节到底层流资源。请注意,如果底层流资源当前可用的字节数少于 X,则实际写入的字节数可能更低。这可以是 -1
,表示“写入所有可用内容”到底层流资源。
$stream = new WritableResourceStream(STDOUT, null, null, 8192);
有关更多详细信息,请参阅 write()
。
变更日志:从 v1.2.0 版本开始,
$loop
参数可以省略(或使用null
跳过)以使用默认的循环。
DuplexResourceStream
DuplexResourceStream
是 PHP 流资源的 DuplexStreamInterface
的具体实现。
这可以用来表示一个读写资源,如以读写模式打开的文件流或 TCP/IP 连接等。
$conn = stream_socket_client('tcp://google.com:80'); $stream = new DuplexResourceStream($conn); $stream->write('hello!'); $stream->end();
有关更多详细信息,请参阅 DuplexStreamInterface
。
构造函数给出的第一个参数必须是有效的流资源,并且已打开用于读写。否则,它将抛出 InvalidArgumentException
。
// throws InvalidArgumentException $stream = new DuplexResourceStream(false);
有关只读和只写流资源的信息,请参阅 ReadableResourceStream
和 WritableResourceStream
。
内部,此类尝试在流资源上启用非阻塞模式,这可能不是所有流资源都支持的。特别是,Windows 上的管道(STDOUT、STDERR 等)不支持此功能。如果失败,它将抛出 RuntimeException
。
// throws RuntimeException on Windows $stream = new DuplexResourceStream(STDOUT);
一旦使用有效的流资源调用了构造函数,此类将负责底层流资源。您应该只使用其公共 API,不应该手动干涉底层流资源。
此类接受一个可选的 LoopInterface|null $loop
参数,可以用来将事件循环实例传递给此对象使用。您可以使用一个 null
值来使用默认的循环。除非您确定需要显式使用某个事件循环实例,否则不应提供此值。
此类接受一个可选的 int|null $readChunkSize
参数,用于控制一次从流中读取的最大缓冲区大小(字节)。您可以使用 null
值来应用其默认值。除非您知道自己在做什么,否则不应更改此值。这可以是一个正数,表示一次最多读取 X 字节。注意,实际读取的字节数可能更低,如果流资源当前可用的字节数少于 X 字节。这可以是 -1
,表示“读取流资源中所有可用的内容”。这应该读取直到流资源不再可读(即底层缓冲区清空),注意这并不一定意味着它达到了 EOF。
$conn = stream_socket_client('tcp://google.com:80'); $stream = new DuplexResourceStream($conn, null, 8192);
此类的任何 write()
调用都不会立即执行,而是在事件循环报告流资源已准备好接受数据后异步执行。为此,它使用一个内存缓冲区字符串来收集所有挂起的写入。此缓冲区有一个软限制,它定义了在调用者应该停止发送更多数据之前它愿意接受多少数据。
这个类还接受另一个可选的 WritableStreamInterface|null $buffer
参数,该参数控制此流的写入行为。您可以使用 null
值来应用其默认值。除非您了解自己在做什么,否则不应更改此值。
如果您想更改写入缓冲区的软限制,可以传递一个类似于这样的 WritableResourceStream
实例。
$conn = stream_socket_client('tcp://google.com:80'); $buffer = new WritableResourceStream($conn, null, 8192); $stream = new DuplexResourceStream($conn, null, null, $buffer);
有关更多详细信息,请参阅 WritableResourceStream
。
变更日志:从 v1.2.0 版本开始,
$loop
参数可以省略(或使用null
跳过)以使用默认的循环。
ThroughStream
ThroughStream
实现了 DuplexStreamInterface
,并将简单地将您写入的任何数据传递到其可读端。
$through = new ThroughStream(); $through->on('data', $this->expectCallableOnceWith('hello')); $through->write('hello');
类似地,end()
方法将结束流并触发一个 end
事件,然后 close()
流。该方法将关闭流并触发一个 close
事件。因此,它也可以在 pipe()
上下文中使用,如下所示。
$through = new ThroughStream(); $source->pipe($through)->pipe($dest);
构造函数可以接受任何可调用的函数,然后将其用于 过滤 写入的数据。该函数接收一个数据参数,作为传递给可写端的数据,并必须返回数据,以便将其传递到其可读端。
$through = new ThroughStream('strtoupper'); $source->pipe($through)->pipe($dest);
请注意,此类对任何数据类型没有假设。它可以用来转换数据,例如将任何结构化数据转换为类似这样的换行符分隔的 JSON (NDJSON) 流。
$through = new ThroughStream(function (mixed $data): string { return json_encode($data) . PHP_EOL; }); $through->on('data', $this->expectCallableOnceWith("[2, true]\n")); $through->write([2, true]);
回调函数允许抛出 Exception
。在这种情况下,流将触发一个 error
事件,然后 close()
流。
$through = new ThroughStream(function (mixed $data): string { if (!is_string($data)) { throw new \UnexpectedValueException('Only strings allowed'); } return $data; }); $through->on('error', $this->expectCallableOnce())); $through->on('close', $this->expectCallableOnce())); $through->on('data', $this->expectCallableNever())); $through->write(2);
CompositeStream
CompositeStream
实现了DuplexStreamInterface
,可以用于从两个分别实现ReadableStreamInterface
和WritableStreamInterface
的独立流中创建一个单双向流。
这在需要单个DuplexStreamInterface
或使用这样的单流实例更方便时非常有用。
$stdin = new ReadableResourceStream(STDIN); $stdout = new WritableResourceStream(STDOUT); $stdio = new CompositeStream($stdin, $stdout); $stdio->on('data', function (string $chunk) use ($stdio): void { $stdio->write('You said: ' . $chunk); });
这是一个表现良好的流,它会将底层流的所有流事件传递出去,并将所有流调用传递到底层流。
如果你向双向流中write()
,它将简单地向可写端write()
并返回其状态。
如果你结束双向流的end()
,它将结束可写端并将可读端pause()
。
如果你关闭双向流,两个输入流都将关闭。如果两个输入流中的任何一个在构造双向流时已经发出close
事件,双向流也将关闭。如果两个输入流中的任何一个在构造双向流时已经关闭,它将关闭另一个端并返回一个已关闭的流。
使用
以下示例可用于将源文件的内容传入目标文件,而不必将整个文件读入内存
$source = new React\Stream\ReadableResourceStream(fopen('source.txt', 'r')); $dest = new React\Stream\WritableResourceStream(fopen('destination.txt', 'w')); $source->pipe($dest);
请注意,此示例仅用于说明目的,仅使用
fopen()
。这不应在真正的异步程序中使用,因为文件系统本质上具有阻塞性,每次调用可能需要几秒钟。有关更复杂的示例,请参阅创建流。
安装
建议通过Composer安装此库。你是Composer的新手?
发布后,此项目将遵循SemVer。目前,这将安装最新的开发版本
composer require react/stream:^3@dev
有关版本升级的详细信息,请参阅变更日志。
此项目旨在在所有平台上运行,因此不需要任何PHP扩展,并支持在PHP 7.1到当前PHP 8+上运行。强烈建议为此项目使用最新支持的PHP版本。
测试
要运行测试套件,您首先需要克隆此存储库,然后通过Composer安装所有依赖项
composer install
要运行测试套件,请转到项目根目录并运行
vendor/bin/phpunit
测试套件还包括一些依赖稳定互联网连接的功能集成测试。如果您不想运行这些测试,可以像这样简单地跳过
vendor/bin/phpunit --exclude-group internet
除此之外,我们使用PHPStan在最高级别上确保整个项目中的类型安全
vendor/bin/phpstan
许可
MIT许可,请参阅许可文件。
更多
- 有关如何在现实世界应用程序中创建流的更多信息,请参阅创建流。
- 有关在现实世界应用程序中使用流的项目列表,请参阅我们的用户维基和Packagist上的依赖项。