amphp/stream

该包已被放弃,不再维护。作者建议使用amphp/byte-stream包。

一个用于简化非阻塞I/O工作的流抽象。

资助包维护!
amphp


README

AMPHP 是一套为 PHP 设计的事件驱动库集合,考虑到了纤程和并发性。 amphp/byte-stream特别提供了一个流抽象,以简化对各种字节流的操作。

安装

该包可以作为Composer依赖项安装。

composer require amphp/byte-stream

要求

此包需要 PHP 8.1 或更高版本。

用法

流是对有序字节序列的抽象。此包提供了基本的接口 ReadableStreamWritableStream

注意 之前的版本使用了术语 InputStreamOutputStream,但这些术语可能根据用例而变得容易混淆。

ReadableStream

ReadableStream 提供了一个主要方法: read()。它返回一个字符串或 nullnull 表示流已结束。

以下示例展示了如何缓冲整个流内容。

$stream = ...;
$buffer = "";

while (($chunk = $stream->read()) !== null) {
    $buffer .= $chunk;
}

// do something with $buffer

注意 可以使用 Amp\ByteStream\buffer($stream) 来替代,但我们在这里演示手动消耗。

此包提供了一些基本实现,其他库可能提供更多实现,例如 amphp/socket

负载

Payload 实现 ReadableStream,同时提供了一个 buffer() 方法用于缓冲整个内容。这允许按块(流式传输)或一次性消耗整个消息。当对象被销毁时,流中剩余的所有数据将自动消耗并丢弃。此类对于小负载或在需要处理之前需要整个流内容时非常有用。

缓冲

可以使用 buffer() 方法来缓冲完整的可读流。

$payload = new Payload($inputStream);
$content = $payload->buffer();

流式传输

有时按块消耗负载而不是首先完全缓冲它是有用或可能的,例如直接将大型HTTP响应体流式传输到磁盘。

while (null !== $chunk = $payload->read()) {
    // Use $chunk here, works just like any other ReadableStream
}

ReadableBuffer

ReadableBuffer 允许从一个已知字符串块创建一个 ReadableStream。如果已知整个流的全部内容,这很有帮助。

$stream = new ReadableBuffer("foobar");

它还允许通过将块传递为 null 或省略构造函数参数来创建没有块的流

$stream = new ReadableBuffer;

// The stream ends immediately
assert(null === $stream->read());

ReadableIterableStream

ReadableIterableStream 允许将一个生成字符串的 iterable 转换为 ReadableStream

$inputStream = new Amp\ByteStream\ReadableIterableStream((function () {
    for ($i = 0; $i < 10; $i++) {
        Amp\delay(1);
        yield $emit(".");
    }
})());

ReadableResourceStream

此包使用 ReadableResourceStreamWritableResourceStream 抽象 PHP 的流资源。它们自动将传入的资源设置为非阻塞模式,并允许像任何其他 ReadableStream / WritableStream 一样进行读写。它们还通过禁用无读请求时的读取监视器,并在底层写入缓冲区已满时仅激活可写性监视器来自动处理背压,这使得它们非常高效。

DecompressingReadableStream

此包实现了基于 Zlib 的压缩。可以使用 CompressingWritableStream 进行压缩,而 DecompressingReadableStream 用于解压缩。两者都可以简单地包装现有的流来应用它们。它们的构造函数都接受 $encoding$options 参数。

$readableStream = new ReadableResourceStream(STDIN);
$decompressingReadableStream = new DecompressingReadableStream($readableStream, \ZLIB_ENCODING_GZIP);

while (null !== $chunk = $decompressingReadableStream) {
    print $chunk;
}

另请参阅: ./examples/gzip-decompress.php

WritableStream

WritableStream 提供了两个主要方法:write()end()

WritableStream::write

write() 将给定字符串写入流。等待完成允许以底层流可以写入的速度进行写入,并且可能通过网络发送。只要写入缓冲区不满,TCP 流将立即返回。

即使写入者在不等待完成之前发出另一个写入之前,也始终确保写入顺序。

WritableStream::end

end() 标记流为结束。TCP 流可能会关闭底层的写入流,但必须关闭。相反,所有资源都应被释放,实际资源句柄应由 PHP 的垃圾收集过程关闭。

以下示例使用之前的示例从流中读取并将所有数据写入 WritableStream

$readableStream = ...;
$writableStream = ...;
$buffer = "";

while (($chunk = $readableStream->read()) !== null) {
    $writableStream->write($chunk);
}

$writableStream->end();

注意 可以使用 Amp\ByteStream\pipe($readableStream, $writableStream) 代替,但我们想在这里展示手动消费/写入。

此包提供了一些基本实现,其他库可能提供更多实现,例如 amphp/socket

WritableResourceStream

此包使用 ReadableResourceStreamWritableResourceStream 抽象 PHP 的流资源。它们自动将传入的资源设置为非阻塞模式,并允许像任何其他 ReadableStream / WritableStream 一样进行读写。它们还通过禁用无读请求时的读取监视器,并在底层写入缓冲区已满时仅激活可写性监视器来自动处理背压,这使得它们非常高效。

CompressingWritableStream

此包实现了基于 Zlib 的压缩。可以使用 CompressingWritableStream 进行压缩,而 DecompressingReadableStream 用于解压缩。两者都可以简单地包装现有的流来应用它们。它们的构造函数都接受 $encoding$options 参数。

$writableStream = new WritableResourceStream(STDOUT);
$compressedWritableStream = new CompressingWritableStream($writableStream, \ZLIB_ENCODING_GZIP);

for ($i = 0; $i < 100; $i++) {
    $compressedWritableStream->write(bin2hex(random_bytes(32));
}

$compressedWritableStream->end();

另请参阅: ./examples/gzip-compress.php

版本控制

amphp/byte-stream 遵循与其他所有 amphp 包相同的 semver 语义版本规范。

安全性

如果您发现任何安全相关的问题,请通过电子邮件 me@kelunik.com 而不是使用问题跟踪器。

许可证

MIT 许可证(MIT)。有关更多信息,请参阅 LICENSE