myvon / reactphp-rdkafka
使用 ReactPHP 事件循环的 RdKafka 消费者和生产者实现
Requires
- ext-rdkafka: *
- react/stream: ^1.2
Requires (Dev)
- pestphp/pest: ^1.22
- pestphp/pest-plugin-mock: ^1.0
README
RdKafka 实现与 ReactPHP 事件循环。
此库实现了来自 Arnaud-lb 的 PHP RDKafka,并结合了 react/event-loop 和 react/stream,提供了一种非阻塞的事件驱动式消费者和生产者。
工作原理
此包使用来自 react/event-loop 的周期性定时器,以固定间隔消耗消息。为了防止阻塞,在消耗时将超时设置为0。
它还使用 react/stream 以事件驱动的方式接收和发送消息。通过监听流的数据事件来消耗消息。通过将数据写入相应的流来完成消息的生产。请参见下面的“消耗消息”和“生产消息”。
安装
您可以通过 composer 安装此包
composer require myvon/reactphp-kafka
请确保您的服务器上已安装 PHP RDKafka 扩展。
消耗消息
要消耗消息,首先通过传递应用程序的名称(用于 kafka 的 group.id
配置,有关更多信息,请参阅 消费者组 ID(一般))和您的代理列表来创建一个 Configuration
对象。
use Myvon\Kafka\Configuration; $configuration = new Configuration("appName", ["127.0.0.1:9092"]);
然后,您可以通过创建一个 "Myvon\Kafka\Consumer" 实例
use Myvon\Kafka\Consumer; $consumer = new Consumer($configuration->consumer());
使用 Configuration::consumer()
生成一个具有正确配置的 RdKafka\Conf
实例,适用于消费者。
然后,您可以通过调用消费者的 start
方法并传递您想要订阅的主题列表来开始消耗消息
$stream = $consumer->start(['topic']);
此方法将返回一个 ThroughStream
实例,允许您监听 data
事件以接收消息
use Myvon\Kafka\Configuration; use Myvon\Kafka\Consumer; $configuration = new Configuration("appName", ["127.0.0.1:9092"]); $consumer = new Consumer($configuration->consumer()); $stream = $consumer->start(['topic']); $stream->on('data', function($data) { $topic = $data['topic']; $message = $data['payload']; //... do whatever you want here });
$data
参数将包含以下键
topic
:包含消息来自的主题的名称payload
:包含接收到的消息
处理消费者错误
当接收到错误 RD_KAFKA_RESP_ERR_NO_ERROR
时,消费者将向流写入每条消息。
RD_KAFKA_RESP_ERR__TIMED_OUT
和 RD_KAFKA_RESP_ERR__PARTITION_EOF
将被忽略。
其他所有错误将通过 error
事件发送
$stream->on("error", function(Exception $exception) { $errorStr = $exception->getMessage(); $errorCode = $exception->getCode(); // handle the error here });
此包不处理错误,它只是将它们传递给您的应用程序。处理错误由您负责。
消费者超时和周期性计时器
默认情况下,传递给 RdKafka\KafkaConsumer
的 consume
方法的超时设置为0。这防止了该方法阻塞脚本的执行。如果您仍然想要设置超时,可以通过传递所需的超时时间(以毫秒为单位)到 setConsumeTimeout
方法来实现
$consumer->setConsumeTimeout(1000); // 1 second
请注意,这将影响事件循环!
默认情况下,消费者将每秒查找一次消息。您可以通过传递新的计时器到 setTimerPeriod
方法来设置此计时器
$consumer->setTimerPeriod(0.1); // 100 ms
注意:它内部使用事件循环的 addPeriodicTimer
方法,因此计时器以秒为单位。
访问 KafkaConsumer 实例
如果您需要直接访问KafkaConsumer实例,可以通过调用getConsumer
来实现。
$kafkaConsumer = $consumer->getConsumer();
生产消息
与消费者类似,您需要创建配置对象,并在实例化时将其传递给Myvon\Kafka\Producer
。
use Myvon\Kafka\Configuration; use Myvon\Kafka\Producer; $configuration = new Configuration("appName", ["127.0.0.1:9092"]); $producer = new Producer($configuration->producer()); $streams = $producer->start(['topicName']);
您可以将多个主题传递给start
方法。调用start
会根据您想要发布的主题返回一个ThroughStream
实例。您可以通过以下方式访问特定主题的流:
- 通过
start
方法返回的数据访问它:$streams['topicName']
- 通过调用
getStream('topicName')
获取特定主题的ThroughStream
实例。
然后您可以将消息写入流中,这些消息将被发送到相应的主题。
use Myvon\Kafka\Configuration; use Myvon\Kafka\Producer; $configuration = new Configuration("appName", ["127.0.0.1:9092"]); $producer = new Producer($configuration->producer()); $streams = $producer->start(['myFirstTopic', 'mySecondTopic']); $streams['myFirstTopic']->write('Hello First Topic !'); $streams['mySecondTopic']->write('Hello Second Topic !'); $producer->getStream('myFirstTopic')->write('Hello sent with getStream !');
注意:生产者将通过每500毫秒调用一次poll()
来确保每条消息都被发送。可以通过调用setPollInterval
来更改此延迟。当流关闭时,生产者将对每条消息调用poll
和flush
以确保不丢失任何数据。
我不想在生产消息时使用事件循环
有时,您可能希望不使用事件循环直接发送消息。这可以通过将false
作为start
方法的第二个参数传递来实现。
use Myvon\Kafka\Configuration; use Myvon\Kafka\Producer; $configuration = new Configuration("appName", ["127.0.0.1:9092"]); $producer = new Producer($configuration->producer()); $streams = $producer->start(['myFirstTopic', 'mySecondTopic'], false); $streams['myFirstTopic']->write('Hello First Topic !'); $streams['mySecondTopic']->write('Hello Second Topic !'); $producer->getStream('myFirstTopic')->write('Hello sent with getStream !');
这将禁用生产者中循环的所有使用,并在最后一行之后立即退出。为了避免丢失消息,在类销毁时会关闭流,这是通过__destruct()
方法实现的。
优雅地停止消费者和生产者
如果您想在代码结束之前停止消费者或生产者,可以调用每个类的stop()
方法。这将删除所有周期性定时器并关闭所有流。
这样做,您将接收到消费者或生产者使用的每个流的end
和close
事件。
使用自定义配置选项
您可以将自定义配置选项传递给由Myvon\Kafka\Configuration
构造函数生成的RdKafka\Conf
实例,将它们作为数组传递给构造函数的第三个参数。
$configuration = new Configuration('appName', ['127.0.0.1:9092'], ['enable.partition.eof' => 'false']);
使用自定义循环
如果您不想消费者或生产者使用默认循环,可以将它作为每个类的构造函数的第二个参数传递。
$loop = new AnotherLoopInstance(); $producer = new Producer($configuration->producer(), $loop); $consumer = new Consumer($configuration->consumer(), $loop);
注意:将循环传递给生产者将强制使用循环,即使您将false
作为start
方法的第二个参数传递。
测试
composer test
贡献
有关详细信息,请参阅CONTRIBUTING。
许可
MIT许可证(MIT)。有关更多信息,请参阅许可文件。