clue / ndjson-react
为 ReactPHP 提供流式换行分隔 JSON (NDJSON) 解析器和编码器。
Requires
- php: >=5.3
- react/stream: ^1.2
Requires (Dev)
- phpunit/phpunit: ^9.5 || ^5.7 || ^4.8.35
- react/event-loop: ^1.2
README
为 ReactPHP 提供 NDJSON 流式解析器和编码器。
NDJSON 可以用于将多个 JSON 记录存储在文件中,以存储任何类型的(统一)结构化数据,如用户对象列表或日志条目。它使用简单的换行符分隔每个记录,因此既可以用于高效的持久化,也可以用于简单的追加式操作。这也允许它在流式环境中使用,例如简单的进程间通信(IPC)协议或远程过程调用(RPC)机制。此库提供了一个简单的流式 API,可以高效地处理包含数千甚至数百万行的非常大的 NDJSON 文件,而不必一次性将整个文件加载到内存中。
- 标准接口 - 通过实现 ReactPHP 的标准流式接口,可以轻松地与现有的高级组件集成。
- 轻量级、SOLID 设计 - 提供了一个“刚好足够好”的薄抽象层,不会妨碍你。它建立在经过良好测试的组件和已建立的概念之上,而不是重新发明轮子。
- 良好的测试覆盖率 - 包含一个 自动测试套件,并在 真实世界 中定期测试。
目录
支持我们
我们在开发、维护和更新我们出色的开源项目上投入了大量时间。您可以通过 在 GitHub 上成为赞助商 来帮助我们保持我们工作的这种高质量。赞助商将获得许多回报,请参阅我们的 赞助页面 了解详情。
让我们一起将这些项目提升到新的水平!🚀
NDJSON 格式
NDJSON(“换行分隔 JSON”或有时称为“JSON 行”)是一种用于存储大量记录(如用户记录或日志条目)的非常简单的基于文本的格式。
{"name":"Alice","age":30,"comment":"Yes, I like cheese"} {"name":"Bob","age":50,"comment":"Hello\nWorld!"}
如果您理解 JSON,并且您是第一次查看这种换行分隔 JSON,您应该已经知道您需要了解 NDJSON 的所有内容:正如其名所示,该格式基本上由单独的行组成,其中每行都是任何有效的 JSON 文本,并且每行都由换行符分隔。
本例使用用户对象列表,每个用户都有一些任意属性。这可以很容易地调整以适应多种不同的用例,例如存储产品而不是用户,分配额外的属性或处理大量的记录。您可以使用任何文本编辑器编辑NDJSON文件,或在需要处理单个记录的流式环境中使用它们。与普通JSON文件不同,向此NDJSON文件添加新日志条目不需要修改此文件的结构(注意没有要修改的“外部数组”)。这使得它非常适合流式环境,用于面向行的CLI工具(如grep
等)或需要在以后时间附加记录的日志环境。此外,这还允许它在流式环境中使用,例如简单的进程间通信(IPC)协议或远程过程调用(RPC)机制。
每行末尾的换行符允许进行一些非常简单的帧(检测单个记录)。虽然每行都是有效的JSON,但整个文件作为整体在技术上不再是有效的JSON,因为它包含多个JSON文本。这意味着例如,调用PHP的json_decode()
在此完整输入上会失败,因为它会尝试一次性解析多个记录。同样,使用“美化打印”JSON(JSON_PRETTY_PRINT
)是不允许的,因为每个JSON文本限制在正好一行。另一方面,包含换行符的值(如上述示例中的comment
属性)不会引起问题,因为JSON字符串中的每个换行符都会被表示为\n
。
NDJSON的一个常见替代方案是逗号分隔值(CSV)。如果您想处理CSV文件,可以查看相关的项目clue/reactphp-csv
name,age,comment
Alice,30,"Yes, I like cheese"
Bob,50,"Hello
World!"
CSV看起来可能稍微简单一些,但这种简单是有代价的。CSV仅限于无类型的二维数据,因此没有标准的方式来存储任何嵌套结构或区分布尔值、字符串或整数。字段名有时使用,有时不使用(由应用程序决定)。对于包含分隔符的字段(如,
或空格或换行符)的不一致处理(如上例中的comment
字段)引入了额外的复杂性,并且其文本编码通常是未定义的,不太可能支持Unicode(或UTF-8),CSV文件通常使用ISO 8859-1编码或某些变体(再次由应用程序决定)。
虽然NDJSON有助于避免CSV的许多缺点,但它仍然是一个(相对)较新的格式,而CSV文件已经在生产系统中使用了数十年。这意味着如果您想与现有系统接口,您可能必须依赖于已经支持的格式。如果您正在构建新的系统,使用NDJSON是一个非常好的选择,因为它提供了灵活的方式来处理单个记录,使用一种常见的基于文本的格式,可以包含任何类型的结构化数据。
用法
解码器
Decoder
(解析器)类可以用来确保在从流中读取时,您只得到完整的、有效的JSON元素。它包装了一个给定的ReadableStreamInterface
,并通过相同的接口公开其数据,但以解析的值的形式而不是字符串块来发出JSON元素
{"name":"test","active":true}
{"name":"hello w\u00f6rld","active":true}
$stdin = new React\Stream\ReadableResourceStream(STDIN); $ndjson = new Clue\React\NDJson\Decoder($stdin); $ndjson->on('data', function ($data) { // $data is a parsed element from the JSON stream // line 1: $data = (object)array('name' => 'test', 'active' => true); // line 2: $data = (object)array('name' => 'hello wörld', 'active' => true); var_dump($data); });
ReactPHP的流会发出数据字符串块,不假设它们的长度。这些块不一定代表完整的JSON元素,因为一个元素可能被分割成多个块。此类通过缓冲不完整的元素来重新组装这些元素。
Decoder
支持与底层json_decode()
函数相同的可选参数。这意味着默认情况下,JSON对象将以stdClass
的形式发出。此行为可以通过可选构造函数参数来控制
$ndjson = new Clue\React\NDJson\Decoder($stdin, true); $ndjson->on('data', function ($data) { // JSON objects will be emitted as assoc arrays now });
此外,Decoder
限制最大缓冲区大小(最大行长度),以避免由于用户输入格式错误导致缓冲区溢出。通常情况下,无需更改此值,除非您知道您正在处理一些过长的行。如果您想将此值从默认的64 KiB更改为其他值,它可以接受一个额外的参数。
$ndjson = new Clue\React\NDJson\Decoder($stdin, false, 512, 0, 64 * 1024);
如果底层流发出一个error
事件或普通流包含任何不表示有效NDJson流的任何数据,它将发出一个error
事件,然后关闭输入流。
$ndjson->on('error', function (Exception $error) { // an error occurred, stream will close next });
如果底层流发出一个end
事件,它将清除缓冲区中的任何不完整数据,从而在成功的情况下可能发出一个随后的data
事件和end
事件,或者对于上述不完整/无效的JSON数据,发出一个error
事件。
$ndjson->on('end', function () { // stream successfully ended, stream will close next });
如果底层流或Decoder
被关闭,它将转发close
事件。
$ndjson->on('close', function () { // stream closed // possibly after an "end" event or due to an "error" event });
可以使用close(): void
方法显式关闭Decoder
及其底层流。
$ndjson->close();
可以使用pipe(WritableStreamInterface $dest, array $options = array()): WritableStreamInterface
方法将所有数据转发到指定的目标流。请注意,Decoder
发出解码/解析数据事件,而许多(大多数)可写流只期望数据块。
$ndjson->pipe($logger);
有关更多详细信息,请参阅ReactPHP的ReadableStreamInterface
。
编码器
可以使用Encoder
(序列化器)类来确保您写入流中的任何内容都成为NDJson流中有效的JSON元素。它包装了一个给定的WritableStreamInterface
,并通过相同的接口接受其数据,但它将任何数据作为完整的JSON元素处理,而不是字符串块。
$stdout = new React\Stream\WritableResourceStream(STDOUT); $ndjson = new Clue\React\NDJson\Encoder($stdout); $ndjson->write(array('name' => 'test', 'active' => true)); $ndjson->write(array('name' => 'hello wörld', 'active' => true));
{"name":"test","active":true}
{"name":"hello w\u00f6rld","active":true}
Encoder
支持与底层json_encode()
函数相同的参数。这意味着,默认情况下,输出中的Unicode字符将被转义。此行为可以通过可选构造函数参数进行控制。
$ndjson = new Clue\React\NDJson\Encoder($stdout, JSON_UNESCAPED_SLASHES | JSON_UNESCAPED_UNICODE); $ndjson->write('hello wörld');
"hello wörld"
请注意,尝试传递JSON_PRETTY_PRINT
选项将产生一个InvalidArgumentException
,因为它与NDJSON不兼容。
如果底层流发出一个error
事件或给定的数据包含任何不能表示为有效NDJson流的数据,它将发出一个error
事件,然后关闭输入流。
$ndjson->on('error', function (Exception $error) { // an error occurred, stream will close next });
如果底层流或Encoder
被关闭,它将转发close
事件。
$ndjson->on('close', function () { // stream closed // possibly after an "end" event or due to an "error" event });
可以使用end(mixed $data = null): void
方法可选地发出任何最终数据,然后软关闭Encoder
及其底层流。
$ndjson->end();
可以使用close(): void
方法显式关闭Encoder
及其底层流。
$ndjson->close();
有关更多详细信息,请参阅ReactPHP的WritableStreamInterface
。
安装
安装此库的推荐方法是通过Composer。 您是Composer的新手吗?
此项目遵循SemVer
。这将安装最新支持的版本。
composer require clue/ndjson-react:^1.3
有关版本升级的详细信息,请参阅CHANGELOG。
本项目的目标是运行在任何平台上,因此不需要任何PHP扩展,并且支持在从旧版PHP 5.3到当前PHP 8+和HHVM的运行。强烈建议使用该项目支持的最新PHP版本。
测试
要运行测试套件,您首先需要克隆此仓库,然后通过Composer安装所有依赖项(Composer)
composer install
要运行测试套件,请转到项目根目录并运行
vendor/bin/phpunit
许可证
本项目采用宽松的MIT许可协议发布。
你知道吗?我提供定制开发服务,并为发行赞助和贡献发出发票。如有需要,请联系我(@clue)了解详情。
更多信息
-
如果您想了解更多关于处理数据流的信息,请参阅底层react/stream组件的文档。
-
如果您要处理压缩的NDJSON文件(
.ndjson.gz
文件扩展名),您可能需要在将解压缩流传递给NDJSON解码器之前,在压缩输入流上使用clue/reactphp-zlib。 -
如果您要创建压缩的NDJSON文件(
.ndjson.gz
文件扩展名),您可能需要在将压缩流传递给文件输出流之前,在NDJSON编码器输出流上使用clue/reactphp-zlib。 -
如果您想同时处理NDJSON流中的记录,您可能需要使用clue/reactphp-flux来同时处理许多(但不要太多)记录。
-
如果您要处理更常见的基于文本的格式中的结构化数据,您可能需要使用clue/reactphp-csv来处理逗号分隔值(CSV)文件(
.csv
文件扩展名)。