toalett / react-amqp-stream
在ReactPHP中将AMQP消息作为流处理
Requires
- php: ^7.4
- php-amqplib/php-amqplib: ^2.0
- toalett/react-stream-adapter: ^1.0
README
欢迎来到Toalett,一个谦逊的倡议。Toalett是挪威语中的“厕所”。
什么是toalett/react-amqp-stream
?
这是一个库,允许您将AMQP消息队列作为ReactPHP中的可读流进行交互。它非常轻量级——它的唯一依赖项是toalett/react-stream-adapter
和php-amqplib/php-amqplib
。
类AMQPSource
实现了来自toalett/react-stream-adapter
的Toalett\React\Stream\Source
接口。它需要一个PhpAmqpLib\Channel\AMQPChannel
实例和要读取的队列名称。您可以通过将Options
对象作为构造函数的第三个参数传递,为AMQPChannel::basic_consume()
调用提供额外的选项。
Toalett\React\Stream\StreamAdapter
包装了一个Source
,使其在事件循环中作为一个可读流变得可用。
安装
它可在Packagist上找到
composer require toalett/react-amqp-stream
动机
注意:这个动机与toalett/react-stream-adapter
中给出的相同。
我正在做一个需要以非阻塞方式响应用户消息的项目。该应用程序使用事件循环。最初我使用周期性计时器和一个回调函数,但随着应用程序的增长,这变得混乱不堪。慢慢地,将消息队列视为流的感觉变得更自然。如果你这样考虑的话,这就有意义了
在计算机科学中,流是一系列随时间提供的数据元素。可以将流视为传送带上依次处理的物品,而不是大批量处理。
—— 流(计算机)在维基百科
这个定义适用于消息队列。
在我提到的那个项目中,我使用这个库每10秒轮询一个AMQP队列。这使我保持低负载,并在同时做其他事情。这种抽象证明非常有用,所以我认为其他人也可能喜欢它。
我如何使用它?
库尽可能地不干涉您的操作。从AMQP连接到可读流只需要几行代码!😀
- 使用PhpAmqpLib创建到AMQP主机的连接,并获取一个通道。
- 将通道实例和要消费的队列名称传递给
AMQPSource
的构造函数,可选地传递Options
实例。 - 使用
React\EventLoop\Factory
创建一个事件循环。 - 创建一个
StreamAdapter
,并将AMQPSource
以及事件循环传递给构造函数。 - 就像与任何其他的
ReadableInputStream
一样与适配器交互。
让我们通过两个示例来看看实际操作。
use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; use React\EventLoop\Factory as EventLoopFactory; use Toalett\React\AMQP\AMQPSource; use Toalett\React\Stream\StreamAdapter; $channel = (new AmqpStreamConnection(/* ... */))->channel(); $queueName = 'my-app.work-queue'; $amqpSource = new AMQPSource($channel, $queueName); $eventLoop = EventLoopFactory::create(); $stream = new StreamAdapter($amqpSource, $eventLoop); $stream->on('data', fn(AMQPMessage $m) => /* ... */); $stream->on('error', fn(RuntimeException $e) => /* ... */); $eventLoop->run();
正如您所看到的,从 AMQP 连接或通道到可读流只需要 2 行代码。
Options
类提供了一种将参数传递给 AMQPChannel::basic_consume
调用的方式。一旦您将 Options
实例传递给 AMQPSource
的构造函数,您就无法更改它们;Options
实例被克隆以防止意外行为。
use Toalett\React\AMQP\AMQPSource; use Toalett\React\AMQP\Options; // ... $options = (new Options) ->setConsumerTag('worker.1') ->setNoAck(true); $amqpSource = new AMQPSource($channel, $queueName, $options); // ...
如果您没有提供 消费者标签,服务器将分配一个。您可以使用 getConsumerTag()
从 AMQPSource
获取此消费者标签。
查看 examples 文件夹以获取一些简单的实现。它们与这个说明书中给出的示例没有太大的不同。您可以随意玩弄它们。
注意:由于流适配器库在底层使用轮询,因此与延迟有关的一些妥协。有关此主题的更多信息,请参阅 toalett/react-stream-adapter
。
问题
问题:如何处理流错误?
答案:传递给 on('error', ...)
回调的 RuntimeException 包含了由 Source
实际抛出的异常。在 RuntimeException 上调用 getPrevious()
可以获得原始异常。
问题:测试在哪里?
答案:可能会稍后添加测试。如果您对缺少测试感到困扰,请随意创建一个问题!