toalett/react-amqp-stream

在ReactPHP中将AMQP消息作为流处理

1.0 2021-01-31 12:52 UTC

This package is auto-updated.

Last update: 2024-09-29 05:22:33 UTC


README

欢迎来到Toalett,一个谦逊的倡议。Toalett是挪威语中的“厕所”。

什么是toalett/react-amqp-stream

这是一个库,允许您将AMQP消息队列作为ReactPHP中的可读流进行交互。它非常轻量级——它的唯一依赖项是toalett/react-stream-adapterphp-amqplib/php-amqplib

AMQPSource实现了来自toalett/react-stream-adapterToalett\React\Stream\Source接口。它需要一个PhpAmqpLib\Channel\AMQPChannel实例和要读取的队列名称。您可以通过将Options对象作为构造函数的第三个参数传递,为AMQPChannel::basic_consume()调用提供额外的选项。

Toalett\React\Stream\StreamAdapter包装了一个Source,使其在事件循环中作为一个可读流变得可用。

安装

它可在Packagist上找到

composer require toalett/react-amqp-stream

动机

注意:这个动机与toalett/react-stream-adapter中给出的相同。

我正在做一个需要以非阻塞方式响应用户消息的项目。该应用程序使用事件循环。最初我使用周期性计时器和一个回调函数,但随着应用程序的增长,这变得混乱不堪。慢慢地,将消息队列视为流的感觉变得更自然。如果你这样考虑的话,这就有意义了

在计算机科学中,流是一系列随时间提供的数据元素。可以将流视为传送带上依次处理的物品,而不是大批量处理。

—— 流(计算机)在维基百科

这个定义适用于消息队列。

在我提到的那个项目中,我使用这个库每10秒轮询一个AMQP队列。这使我保持低负载,并在同时做其他事情。这种抽象证明非常有用,所以我认为其他人也可能喜欢它。

我如何使用它?

库尽可能地不干涉您的操作。从AMQP连接到可读流只需要几行代码!😀

  1. 使用PhpAmqpLib创建到AMQP主机的连接,并获取一个通道。
  2. 将通道实例和要消费的队列名称传递给AMQPSource的构造函数,可选地传递Options实例。
  3. 使用 React\EventLoop\Factory 创建一个事件循环。
  4. 创建一个 StreamAdapter,并将 AMQPSource 以及事件循环传递给构造函数。
  5. 就像与任何其他的 ReadableInputStream 一样与适配器交互。

让我们通过两个示例来看看实际操作。

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use React\EventLoop\Factory as EventLoopFactory;
use Toalett\React\AMQP\AMQPSource;
use Toalett\React\Stream\StreamAdapter;

$channel = (new AmqpStreamConnection(/* ... */))->channel();
$queueName = 'my-app.work-queue';
$amqpSource = new AMQPSource($channel, $queueName);

$eventLoop = EventLoopFactory::create();
$stream = new StreamAdapter($amqpSource, $eventLoop);

$stream->on('data', fn(AMQPMessage $m) => /* ... */);
$stream->on('error', fn(RuntimeException $e) => /* ... */);

$eventLoop->run();

正如您所看到的,从 AMQP 连接或通道到可读流只需要 2 行代码。

Options 类提供了一种将参数传递给 AMQPChannel::basic_consume 调用的方式。一旦您将 Options 实例传递给 AMQPSource 的构造函数,您就无法更改它们;Options 实例被克隆以防止意外行为。

use Toalett\React\AMQP\AMQPSource;
use Toalett\React\AMQP\Options;

// ...
$options = (new Options)
    ->setConsumerTag('worker.1')
    ->setNoAck(true);

$amqpSource = new AMQPSource($channel, $queueName, $options);
// ...

如果您没有提供 消费者标签,服务器将分配一个。您可以使用 getConsumerTag()AMQPSource 获取此消费者标签。

查看 examples 文件夹以获取一些简单的实现。它们与这个说明书中给出的示例没有太大的不同。您可以随意玩弄它们。

注意:由于流适配器库在底层使用轮询,因此与延迟有关的一些妥协。有关此主题的更多信息,请参阅 toalett/react-stream-adapter

问题

问题:如何处理流错误?
答案:传递给 on('error', ...) 回调的 RuntimeException 包含了由 Source 实际抛出的异常。在 RuntimeException 上调用 getPrevious() 可以获得原始异常。

问题:测试在哪里?
答案:可能会稍后添加测试。如果您对缺少测试感到困扰,请随意创建一个问题!