myvon/reactphp-rdkafka

使用 ReactPHP 事件循环的 RdKafka 消费者和生产者实现

1.0 2023-02-15 22:03 UTC

This package is auto-updated.

Last update: 2024-09-16 01:57:06 UTC


README

RdKafka 实现与 ReactPHP 事件循环。

此库实现了来自 Arnaud-lbPHP RDKafka,并结合了 react/event-loopreact/stream,提供了一种非阻塞的事件驱动式消费者和生产者。

Latest Version on Packagist Tests Total Downloads

工作原理

此包使用来自 react/event-loop 的周期性定时器,以固定间隔消耗消息。为了防止阻塞,在消耗时将超时设置为0。

它还使用 react/stream 以事件驱动的方式接收和发送消息。通过监听流的数据事件来消耗消息。通过将数据写入相应的流来完成消息的生产。请参见下面的“消耗消息”和“生产消息”。

安装

您可以通过 composer 安装此包

composer require myvon/reactphp-kafka

请确保您的服务器上已安装 PHP RDKafka 扩展。

消耗消息

要消耗消息,首先通过传递应用程序的名称(用于 kafka 的 group.id 配置,有关更多信息,请参阅 消费者组 ID(一般))和您的代理列表来创建一个 Configuration 对象。

use Myvon\Kafka\Configuration;

$configuration = new Configuration("appName", ["127.0.0.1:9092"]);

然后,您可以通过创建一个 "Myvon\Kafka\Consumer" 实例

use Myvon\Kafka\Consumer;

$consumer = new Consumer($configuration->consumer());

使用 Configuration::consumer() 生成一个具有正确配置的 RdKafka\Conf 实例,适用于消费者。

然后,您可以通过调用消费者的 start 方法并传递您想要订阅的主题列表来开始消耗消息

$stream = $consumer->start(['topic']);

此方法将返回一个 ThroughStream 实例,允许您监听 data 事件以接收消息

use Myvon\Kafka\Configuration;
use Myvon\Kafka\Consumer;

$configuration = new Configuration("appName", ["127.0.0.1:9092"]);
$consumer = new Consumer($configuration->consumer());
$stream = $consumer->start(['topic']);

$stream->on('data', function($data) {
    $topic = $data['topic'];
    $message = $data['payload'];
    
    //... do whatever you want here 
});

$data 参数将包含以下键

  • topic:包含消息来自的主题的名称
  • payload:包含接收到的消息

处理消费者错误

当接收到错误 RD_KAFKA_RESP_ERR_NO_ERROR 时,消费者将向流写入每条消息。

RD_KAFKA_RESP_ERR__TIMED_OUTRD_KAFKA_RESP_ERR__PARTITION_EOF 将被忽略。

其他所有错误将通过 error 事件发送

$stream->on("error", function(Exception $exception) {
    $errorStr = $exception->getMessage();
    $errorCode = $exception->getCode();
    // handle the error here
});

此包不处理错误,它只是将它们传递给您的应用程序。处理错误由您负责。

消费者超时和周期性计时器

默认情况下,传递给 RdKafka\KafkaConsumerconsume 方法的超时设置为0。这防止了该方法阻塞脚本的执行。如果您仍然想要设置超时,可以通过传递所需的超时时间(以毫秒为单位)到 setConsumeTimeout 方法来实现

$consumer->setConsumeTimeout(1000); // 1 second

请注意,这将影响事件循环!

默认情况下,消费者将每秒查找一次消息。您可以通过传递新的计时器到 setTimerPeriod 方法来设置此计时器

$consumer->setTimerPeriod(0.1); // 100 ms

注意:它内部使用事件循环的 addPeriodicTimer 方法,因此计时器以秒为单位。

访问 KafkaConsumer 实例

如果您需要直接访问KafkaConsumer实例,可以通过调用getConsumer来实现。

$kafkaConsumer = $consumer->getConsumer();

生产消息

与消费者类似,您需要创建配置对象,并在实例化时将其传递给Myvon\Kafka\Producer

use Myvon\Kafka\Configuration;
use Myvon\Kafka\Producer;

$configuration = new Configuration("appName", ["127.0.0.1:9092"]);
$producer = new Producer($configuration->producer());
$streams = $producer->start(['topicName']);

您可以将多个主题传递给start方法。调用start会根据您想要发布的主题返回一个ThroughStream实例。您可以通过以下方式访问特定主题的流:

  • 通过start方法返回的数据访问它:$streams['topicName']
  • 通过调用getStream('topicName')获取特定主题的ThroughStream实例。

然后您可以将消息写入流中,这些消息将被发送到相应的主题。

use Myvon\Kafka\Configuration;
use Myvon\Kafka\Producer;

$configuration = new Configuration("appName", ["127.0.0.1:9092"]);
$producer = new Producer($configuration->producer());
$streams = $producer->start(['myFirstTopic', 'mySecondTopic']);

$streams['myFirstTopic']->write('Hello First Topic !');
$streams['mySecondTopic']->write('Hello Second Topic !');

$producer->getStream('myFirstTopic')->write('Hello sent with getStream !');

注意:生产者将通过每500毫秒调用一次poll()来确保每条消息都被发送。可以通过调用setPollInterval来更改此延迟。当流关闭时,生产者将对每条消息调用pollflush以确保不丢失任何数据。

我不想在生产消息时使用事件循环

有时,您可能希望不使用事件循环直接发送消息。这可以通过将false作为start方法的第二个参数传递来实现。

use Myvon\Kafka\Configuration;
use Myvon\Kafka\Producer;

$configuration = new Configuration("appName", ["127.0.0.1:9092"]);
$producer = new Producer($configuration->producer());
$streams = $producer->start(['myFirstTopic', 'mySecondTopic'], false);

$streams['myFirstTopic']->write('Hello First Topic !');
$streams['mySecondTopic']->write('Hello Second Topic !');

$producer->getStream('myFirstTopic')->write('Hello sent with getStream !');

这将禁用生产者中循环的所有使用,并在最后一行之后立即退出。为了避免丢失消息,在类销毁时会关闭流,这是通过__destruct()方法实现的。

优雅地停止消费者和生产者

如果您想在代码结束之前停止消费者或生产者,可以调用每个类的stop()方法。这将删除所有周期性定时器并关闭所有流。

这样做,您将接收到消费者或生产者使用的每个流的endclose事件。

使用自定义配置选项

您可以将自定义配置选项传递给由Myvon\Kafka\Configuration构造函数生成的RdKafka\Conf实例,将它们作为数组传递给构造函数的第三个参数。

$configuration = new Configuration('appName', ['127.0.0.1:9092'], ['enable.partition.eof' => 'false']);

使用自定义循环

如果您不想消费者或生产者使用默认循环,可以将它作为每个类的构造函数的第二个参数传递。

$loop = new AnotherLoopInstance();
$producer = new Producer($configuration->producer(), $loop);
$consumer = new Consumer($configuration->consumer(), $loop);

注意:将循环传递给生产者将强制使用循环,即使您将false作为start方法的第二个参数传递。

测试

composer test

贡献

有关详细信息,请参阅CONTRIBUTING

许可

MIT许可证(MIT)。有关更多信息,请参阅许可文件