clue/ndjson-react

为 ReactPHP 提供流式换行分隔 JSON (NDJSON) 解析器和编码器。

v1.3.0 2022-12-23 10:58 UTC

This package is auto-updated.

Last update: 2024-08-24 10:25:44 UTC


README

CI status installs on Packagist code coverage

ReactPHP 提供 NDJSON 流式解析器和编码器。

NDJSON 可以用于将多个 JSON 记录存储在文件中,以存储任何类型的(统一)结构化数据,如用户对象列表或日志条目。它使用简单的换行符分隔每个记录,因此既可以用于高效的持久化,也可以用于简单的追加式操作。这也允许它在流式环境中使用,例如简单的进程间通信(IPC)协议或远程过程调用(RPC)机制。此库提供了一个简单的流式 API,可以高效地处理包含数千甚至数百万行的非常大的 NDJSON 文件,而不必一次性将整个文件加载到内存中。

  • 标准接口 - 通过实现 ReactPHP 的标准流式接口,可以轻松地与现有的高级组件集成。
  • 轻量级、SOLID 设计 - 提供了一个“刚好足够好”的薄抽象层,不会妨碍你。它建立在经过良好测试的组件和已建立的概念之上,而不是重新发明轮子。
  • 良好的测试覆盖率 - 包含一个 自动测试套件,并在 真实世界 中定期测试。

目录

支持我们

我们在开发、维护和更新我们出色的开源项目上投入了大量时间。您可以通过 在 GitHub 上成为赞助商 来帮助我们保持我们工作的这种高质量。赞助商将获得许多回报,请参阅我们的 赞助页面 了解详情。

让我们一起将这些项目提升到新的水平!🚀

NDJSON 格式

NDJSON(“换行分隔 JSON”或有时称为“JSON 行”)是一种用于存储大量记录(如用户记录或日志条目)的非常简单的基于文本的格式。

{"name":"Alice","age":30,"comment":"Yes, I like cheese"}
{"name":"Bob","age":50,"comment":"Hello\nWorld!"}

如果您理解 JSON,并且您是第一次查看这种换行分隔 JSON,您应该已经知道您需要了解 NDJSON 的所有内容:正如其名所示,该格式基本上由单独的行组成,其中每行都是任何有效的 JSON 文本,并且每行都由换行符分隔。

本例使用用户对象列表,每个用户都有一些任意属性。这可以很容易地调整以适应多种不同的用例,例如存储产品而不是用户,分配额外的属性或处理大量的记录。您可以使用任何文本编辑器编辑NDJSON文件,或在需要处理单个记录的流式环境中使用它们。与普通JSON文件不同,向此NDJSON文件添加新日志条目不需要修改此文件的结构(注意没有要修改的“外部数组”)。这使得它非常适合流式环境,用于面向行的CLI工具(如grep等)或需要在以后时间附加记录的日志环境。此外,这还允许它在流式环境中使用,例如简单的进程间通信(IPC)协议或远程过程调用(RPC)机制。

每行末尾的换行符允许进行一些非常简单的(检测单个记录)。虽然每行都是有效的JSON,但整个文件作为整体在技术上不再是有效的JSON,因为它包含多个JSON文本。这意味着例如,调用PHP的json_decode()在此完整输入上会失败,因为它会尝试一次性解析多个记录。同样,使用“美化打印”JSON(JSON_PRETTY_PRINT)是不允许的,因为每个JSON文本限制在正好一行。另一方面,包含换行符的值(如上述示例中的comment属性)不会引起问题,因为JSON字符串中的每个换行符都会被表示为\n

NDJSON的一个常见替代方案是逗号分隔值(CSV)。如果您想处理CSV文件,可以查看相关的项目clue/reactphp-csv

name,age,comment
Alice,30,"Yes, I like cheese"
Bob,50,"Hello
World!"

CSV看起来可能稍微简单一些,但这种简单是有代价的。CSV仅限于无类型的二维数据,因此没有标准的方式来存储任何嵌套结构或区分布尔值、字符串或整数。字段名有时使用,有时不使用(由应用程序决定)。对于包含分隔符的字段(如,或空格或换行符)的不一致处理(如上例中的comment字段)引入了额外的复杂性,并且其文本编码通常是未定义的,不太可能支持Unicode(或UTF-8),CSV文件通常使用ISO 8859-1编码或某些变体(再次由应用程序决定)。

虽然NDJSON有助于避免CSV的许多缺点,但它仍然是一个(相对)较新的格式,而CSV文件已经在生产系统中使用了数十年。这意味着如果您想与现有系统接口,您可能必须依赖于已经支持的格式。如果您正在构建新的系统,使用NDJSON是一个非常好的选择,因为它提供了灵活的方式来处理单个记录,使用一种常见的基于文本的格式,可以包含任何类型的结构化数据。

用法

解码器

Decoder(解析器)类可以用来确保在从流中读取时,您只得到完整的、有效的JSON元素。它包装了一个给定的ReadableStreamInterface,并通过相同的接口公开其数据,但以解析的值的形式而不是字符串块来发出JSON元素

{"name":"test","active":true}
{"name":"hello w\u00f6rld","active":true}
$stdin = new React\Stream\ReadableResourceStream(STDIN);

$ndjson = new Clue\React\NDJson\Decoder($stdin);

$ndjson->on('data', function ($data) {
    // $data is a parsed element from the JSON stream
    // line 1: $data = (object)array('name' => 'test', 'active' => true);
    // line 2: $data = (object)array('name' => 'hello wörld', 'active' => true);
    var_dump($data);
});

ReactPHP的流会发出数据字符串块,不假设它们的长度。这些块不一定代表完整的JSON元素,因为一个元素可能被分割成多个块。此类通过缓冲不完整的元素来重新组装这些元素。

Decoder支持与底层json_decode()函数相同的可选参数。这意味着默认情况下,JSON对象将以stdClass的形式发出。此行为可以通过可选构造函数参数来控制

$ndjson = new Clue\React\NDJson\Decoder($stdin, true);

$ndjson->on('data', function ($data) {
    // JSON objects will be emitted as assoc arrays now
});

此外,Decoder限制最大缓冲区大小(最大行长度),以避免由于用户输入格式错误导致缓冲区溢出。通常情况下,无需更改此值,除非您知道您正在处理一些过长的行。如果您想将此值从默认的64 KiB更改为其他值,它可以接受一个额外的参数。

$ndjson = new Clue\React\NDJson\Decoder($stdin, false, 512, 0, 64 * 1024);

如果底层流发出一个error事件或普通流包含任何不表示有效NDJson流的任何数据,它将发出一个error事件,然后关闭输入流。

$ndjson->on('error', function (Exception $error) {
    // an error occurred, stream will close next
});

如果底层流发出一个end事件,它将清除缓冲区中的任何不完整数据,从而在成功的情况下可能发出一个随后的data事件和end事件,或者对于上述不完整/无效的JSON数据,发出一个error事件。

$ndjson->on('end', function () {
    // stream successfully ended, stream will close next
});

如果底层流或Decoder被关闭,它将转发close事件。

$ndjson->on('close', function () {
    // stream closed
    // possibly after an "end" event or due to an "error" event
});

可以使用close(): void方法显式关闭Decoder及其底层流。

$ndjson->close();

可以使用pipe(WritableStreamInterface $dest, array $options = array()): WritableStreamInterface方法将所有数据转发到指定的目标流。请注意,Decoder发出解码/解析数据事件,而许多(大多数)可写流只期望数据块。

$ndjson->pipe($logger);

有关更多详细信息,请参阅ReactPHP的ReadableStreamInterface

编码器

可以使用Encoder(序列化器)类来确保您写入流中的任何内容都成为NDJson流中有效的JSON元素。它包装了一个给定的WritableStreamInterface,并通过相同的接口接受其数据,但它将任何数据作为完整的JSON元素处理,而不是字符串块。

$stdout = new React\Stream\WritableResourceStream(STDOUT);

$ndjson = new Clue\React\NDJson\Encoder($stdout);

$ndjson->write(array('name' => 'test', 'active' => true));
$ndjson->write(array('name' => 'hello wörld', 'active' => true));
{"name":"test","active":true}
{"name":"hello w\u00f6rld","active":true}

Encoder支持与底层json_encode()函数相同的参数。这意味着,默认情况下,输出中的Unicode字符将被转义。此行为可以通过可选构造函数参数进行控制。

$ndjson = new Clue\React\NDJson\Encoder($stdout, JSON_UNESCAPED_SLASHES | JSON_UNESCAPED_UNICODE);

$ndjson->write('hello wörld');
"hello wörld"

请注意,尝试传递JSON_PRETTY_PRINT选项将产生一个InvalidArgumentException,因为它与NDJSON不兼容。

如果底层流发出一个error事件或给定的数据包含任何不能表示为有效NDJson流的数据,它将发出一个error事件,然后关闭输入流。

$ndjson->on('error', function (Exception $error) {
    // an error occurred, stream will close next
});

如果底层流或Encoder被关闭,它将转发close事件。

$ndjson->on('close', function () {
    // stream closed
    // possibly after an "end" event or due to an "error" event
});

可以使用end(mixed $data = null): void方法可选地发出任何最终数据,然后软关闭Encoder及其底层流。

$ndjson->end();

可以使用close(): void方法显式关闭Encoder及其底层流。

$ndjson->close();

有关更多详细信息,请参阅ReactPHP的WritableStreamInterface

安装

安装此库的推荐方法是通过Composer您是Composer的新手吗?

此项目遵循SemVer。这将安装最新支持的版本。

composer require clue/ndjson-react:^1.3

有关版本升级的详细信息,请参阅CHANGELOG

本项目的目标是运行在任何平台上,因此不需要任何PHP扩展,并且支持在从旧版PHP 5.3到当前PHP 8+和HHVM的运行。强烈建议使用该项目支持的最新PHP版本。

测试

要运行测试套件,您首先需要克隆此仓库,然后通过Composer安装所有依赖项(Composer)

composer install

要运行测试套件,请转到项目根目录并运行

vendor/bin/phpunit

许可证

本项目采用宽松的MIT许可协议发布。

你知道吗?我提供定制开发服务,并为发行赞助和贡献发出发票。如有需要,请联系我(@clue)了解详情。

更多信息

  • 如果您想了解更多关于处理数据流的信息,请参阅底层react/stream组件的文档。

  • 如果您要处理压缩的NDJSON文件(.ndjson.gz文件扩展名),您可能需要在将解压缩流传递给NDJSON解码器之前,在压缩输入流上使用clue/reactphp-zlib

  • 如果您要创建压缩的NDJSON文件(.ndjson.gz文件扩展名),您可能需要在将压缩流传递给文件输出流之前,在NDJSON编码器输出流上使用clue/reactphp-zlib

  • 如果您想同时处理NDJSON流中的记录,您可能需要使用clue/reactphp-flux来同时处理许多(但不要太多)记录。

  • 如果您要处理更常见的基于文本的格式中的结构化数据,您可能需要使用clue/reactphp-csv来处理逗号分隔值(CSV)文件(.csv文件扩展名)。