simpod / kafka
PHP Kafka 模板包装 RdKafka
0.2.4
2024-09-17 12:35 UTC
Requires
- php: ^8.2
- ext-pcntl: *
- ext-rdkafka: ^6
- psr/log: ^3
- thecodingmachine/safe: ^2.2
Requires (Dev)
- doctrine/coding-standard: ^12.0
- infection/infection: ^0.29.0
- phpstan/extension-installer: ^1.1
- phpstan/phpstan: 1.12.3
- phpstan/phpstan-phpunit: 1.4.0
- phpstan/phpstan-strict-rules: 1.6.0
- phpunit/phpunit: ^11.0
- thecodingmachine/phpstan-safe-rule: ^1.0
This package is auto-updated.
Last update: 2024-09-20 19:30:09 UTC
README
安装
作为 Composer 依赖添加
composer require simpod/kafka
配置常量
提供了某些配置常量,如 ConsumerConfig
、ProducerConfig
或 CommonClientConfigs
。
然而,它们是从 Java API 复制的,并不都适用于 librdkafka。在使用前,请参考 librdkafka 文档。
客户端
消费者
KafkaConsumer
模板提供了 startBatch()
方法(用于补充 librdkafka 中的此示例)和 start()
方法。它们还为您处理终止信号。
经典消费者
<?php declare(strict_types=1); namespace Your\AppNamespace; use RdKafka\Message; use SimPod\Kafka\Clients\Consumer\ConsumerConfig; use SimPod\Kafka\Clients\Consumer\KafkaConsumer; final class ExampleConsumer { public function run(): void { $kafkaConsumer = new KafkaConsumer($this->getConfig(), Logger::get()); $kafkaConsumer->subscribe(['topic1']); $kafkaConsumer->start( 120 * 1000, static function (Message $message) use ($kafkaConsumer) : void { // Process message here $kafkaConsumer->commit($message); // Autocommit is disabled } ); } private function getConfig(): ConsumerConfig { $config = new ConsumerConfig(); $config->set(ConsumerConfig::BOOTSTRAP_SERVERS_CONFIG, '127.0.0.1:9092'); $config->set(ConsumerConfig::ENABLE_AUTO_COMMIT_CONFIG, false); $config->set(ConsumerConfig::CLIENT_ID_CONFIG, gethostname()); $config->set(ConsumerConfig::AUTO_OFFSET_RESET_CONFIG, 'earliest'); $config->set(ConsumerConfig::GROUP_ID_CONFIG, 'consumer_group_name'); return $config; } }
批量消费者
<?php declare(strict_types=1); namespace Your\AppNamespace; use RdKafka\Message; use SimPod\Kafka\Clients\Consumer\ConsumerConfig; use SimPod\Kafka\Clients\Consumer\ConsumerRecords; use SimPod\Kafka\Clients\Consumer\KafkaConsumer; final class ExampleBatchConsumer { public function run(): void { $kafkaConsumer = new KafkaConsumer($this->getConfig()); $kafkaConsumer->subscribe(['topic1']); $kafkaConsumer->startBatch( 200000, 120 * 1000, static function (Message $message): void { // Process record }, static function (ConsumerRecords $consumerRecords) use ($kafkaConsumer) : void { // Process records batch $kafkaConsumer->commit($consumerRecords->getLast()); } ); } private function getConfig(): ConsumerConfig { $config = new ConsumerConfig(); $config->set(ConsumerConfig::BOOTSTRAP_SERVERS_CONFIG, '127.0.0.1:9092'); $config->set(ConsumerConfig::ENABLE_AUTO_COMMIT_CONFIG, false); $config->set(ConsumerConfig::CLIENT_ID_CONFIG, gethostname()); $config->set(ConsumerConfig::AUTO_OFFSET_RESET_CONFIG, 'earliest'); $config->set(ConsumerConfig::GROUP_ID_CONFIG, 'consumer_group_name'); return $config; } }