toalett/react-stream-adapter

使用ReactPHP将任何数据源作为流使用

1.0 2021-01-06 17:11 UTC

This package is auto-updated.

Last update: 2024-09-07 01:43:48 UTC


README

欢迎来到Toalett,一个谦逊的倡议。Toalett是挪威语中“厕所”的意思 💩。

toalett/react-stream-adapter是什么?

这是一个库,允许任何数据源作为ReactPHP的流使用。它非常小巧——只有一个接口、一个和一个特质。它的唯一依赖是react/stream

StreamAdapter接受一个实现Source接口的实例,并将其作为在支持事件循环的应用程序中使用。

安装

它可在Packagist上找到

composer require toalett/react-stream-adapter

动机

我正在做一个项目,该项目需要应用程序以非阻塞方式响应用MQP消息。该应用程序使用事件循环。最初我使用一个周期性定时器和一个回调函数,但随着应用程序的增长,这变得杂乱无章。慢慢地,将消息队列视为流的感觉更加自然。如果你这么想的话,这很有意义

在计算机科学中,流是一系列随时间提供的元素序列。可以将流想象为传送带上的物品,一次处理一个,而不是大量处理。

—— Wikipedia上的流(计算)

这个定义适用于消息队列。

在我之前提到的项目中,我使用这个库每隔10秒轮询一次AMQP队列。这使我负载较低,同时允许我在此期间做其他事情。这种抽象证明非常有用,所以我认为其他人也可能喜欢它。

如何使用它?

只有三个组件需要关注,其中一个是可以选的!主要组件是Source接口和StreamAdapter类。可选组件是EndlessTrait,它可以在无限源中使用。

使用此库所需的步骤如下

  1. 定义一个能够生成或提供一些数据的类。它必须实现Source接口。
  2. select()方法会定期被调用。这是你返回下一份数据的地方。确保当有数据可用时(可以是任何东西),select()方法返回的不是null,当没有数据时返回null。你可以对你的select()实现添加类型提示,例如select(): ?stringselect(): ?MyData以提高清晰度。
  3. 该接口还指定了close(): voideof(): bool方法。在一个无限(无穷)流中,close()可以留空,而eof()应返回false(EOF永远无法到达)。EndlessTrait提供了这些实现。
  4. 使用StreamAdapter将您的Source附加到循环中。
  5. 将适配器当作任何其他ReadableInputStream来交互。

注意:该库在底层使用轮询。默认轮询间隔为0.5秒,但如果检查数据是一项密集操作,您可能想稍微增加间隔以防止减慢速度。这是响应速度和负载之间的权衡。可以通过将它们作为StreamAdapter构造函数的第三个参数传递来自定义间隔。

注意:StreamAdapter会从源中贪婪地读取数据 - 它不会停止读取直到没有可读取的内容。这防止了在高轮询间隔时发生拥塞,但它可能会在读取大量数据或您的select()例程需要一些时间时阻塞执行。

$loop = Factory::create();

$source = new MySource(); // implements Source
$stream = new StreamAdapter($source, $loop);
$stream->on('data', function(MyData $data) {
    /* do something with data */
});

$loop->run();

请查看示例文件夹中的简单实现。

问题

Q: 处理AMQP消息的代码在哪里?
A:它将作为一个独立的包发布,但在发布之前还需要做一些工作。

Q: 测试在哪里?
A:只有一个类,它主要基于react/stream中的ReadableResourceStream。测试可能会稍后添加,但截至目前,我并不认为它很有价值。如果您对此感到困扰,请随时创建一个问题!