toalett / react-stream-adapter
使用ReactPHP将任何数据源作为流使用
Requires
- php: ^7.4
- react/stream: ^1.0
This package is auto-updated.
Last update: 2024-09-07 01:43:48 UTC
README
欢迎来到Toalett,一个谦逊的倡议。Toalett是挪威语中“厕所”的意思 💩。
toalett/react-stream-adapter
是什么?
这是一个库,允许任何数据源作为ReactPHP的流使用。它非常小巧——只有一个接口、一个类和一个特质。它的唯一依赖是react/stream
。
StreamAdapter
接受一个实现Source
接口的实例,并将其作为流在支持事件循环的应用程序中使用。
安装
它可在Packagist上找到
composer require toalett/react-stream-adapter
动机
我正在做一个项目,该项目需要应用程序以非阻塞方式响应用MQP消息。该应用程序使用事件循环。最初我使用一个周期性定时器和一个回调函数,但随着应用程序的增长,这变得杂乱无章。慢慢地,将消息队列视为流的感觉更加自然。如果你这么想的话,这很有意义
在计算机科学中,流是一系列随时间提供的元素序列。可以将流想象为传送带上的物品,一次处理一个,而不是大量处理。
这个定义适用于消息队列。
在我之前提到的项目中,我使用这个库每隔10秒轮询一次AMQP队列。这使我负载较低,同时允许我在此期间做其他事情。这种抽象证明非常有用,所以我认为其他人也可能喜欢它。
如何使用它?
只有三个组件需要关注,其中一个是可以选的!主要组件是Source
接口和StreamAdapter
类。可选组件是EndlessTrait
,它可以在无限源中使用。
使用此库所需的步骤如下
- 定义一个能够生成或提供一些数据的类。它必须实现
Source
接口。 select()
方法会定期被调用。这是你返回下一份数据的地方。确保当有数据可用时(可以是任何东西),select()
方法返回的不是null
,当没有数据时返回null
。你可以对你的select()
实现添加类型提示,例如select(): ?string
或select(): ?MyData
以提高清晰度。- 该接口还指定了
close(): void
和eof(): bool
方法。在一个无限(无穷)流中,close()
可以留空,而eof()
应返回false(EOF永远无法到达)。EndlessTrait
提供了这些实现。 - 使用
StreamAdapter
将您的Source
附加到循环中。 - 将适配器当作任何其他
ReadableInputStream
来交互。
注意:该库在底层使用轮询。默认轮询间隔为0.5秒,但如果检查数据是一项密集操作,您可能想稍微增加间隔以防止减慢速度。这是响应速度和负载之间的权衡。可以通过将它们作为StreamAdapter
构造函数的第三个参数传递来自定义间隔。
注意:StreamAdapter
会从源中贪婪地读取数据 - 它不会停止读取直到没有可读取的内容。这防止了在高轮询间隔时发生拥塞,但它可能会在读取大量数据或您的select()
例程需要一些时间时阻塞执行。
$loop = Factory::create(); $source = new MySource(); // implements Source $stream = new StreamAdapter($source, $loop); $stream->on('data', function(MyData $data) { /* do something with data */ }); $loop->run();
请查看示例文件夹中的简单实现。
问题
Q: 处理AMQP消息的代码在哪里?
A:它将作为一个独立的包发布,但在发布之前还需要做一些工作。
Q: 测试在哪里?
A:只有一个类,它主要基于react/stream
中的ReadableResourceStream
。测试可能会稍后添加,但截至目前,我并不认为它很有价值。如果您对此感到困扰,请随时创建一个问题!