anktx / kafka-client
RdKafka的PHP包装器
0.1.4
2024-08-04 09:48 UTC
Requires
- php: ^8.1
- ext-rdkafka: *
Requires (Dev)
- friendsofphp/php-cs-fixer: ^3.59
- phpstan/phpstan: ^1.11
- phpunit/phpunit: ^10.5
README
此包为使用PHP与Apache Kafka交互提供了强大且灵活的客户端。它的目标是简化消息生产和消费的过程,使得开发人员更容易将Kafka集成到他们的PHP应用程序中。
要求
- PHP 8.1或更高版本。
- ext-rdkafka
安装
composer require anktx/kafka-client
通用用法
生产者
use Anktx\Kafka\Client\Config\ProducerConfig; use Anktx\Kafka\Client\KafkaProducer; use Anktx\Kafka\Client\Message\KafkaProducerMessage; $kafkaProducer = new KafkaProducer( new ProducerConfig( brokers: 'kafka:9092', /* >>> the rest are optional <<< queueBufferingMaxKBytes: 2048, batchSize: 1024, lingerMs: 10, compressionType: CompressionType::snappy, isDebug: true, logger: new \Psr\Log\NullLogger(), */ ) ); $kafkaProducer->produce( new KafkaProducerMessage( topic: 'topic', body: 'message body', /* >>> the rest are optional <<< partition: 1, key: 'key', headers: ['name' => 'value'], */ ) ); $kafkaProducer->flush();
消费者
<?php use Anktx\Kafka\Client\Config\ConsumerConfig; use Anktx\Kafka\Client\KafkaConsumer; use Anktx\Kafka\Client\Subscription\Subscription; use Anktx\Kafka\Client\Subscription\SubscriptionList; $kafkaConsumer = new KafkaConsumer( new ConsumerConfig( brokers: 'kafka:9092', groupId: 'groupId', instanceId: '1', /* >>> the rest are optional <<< offsetReset: OffsetReset::latest, autoCommitMs: 1000, sessionTimeoutMs: 10000, isDebug: true, logger: new \Psr\Log\NullLogger(), */ ) ); $kafkaConsumer->subscribe( new SubscriptionList( new Subscription(topic: 'topic1'), // new Subscription(topic: 'topic2', partition: 1), ), ); $messagesToConsume = 100; $i = 0; while (++$i < $messagesToConsume) { $message = $kafkaConsumer->consume(); echo $message->body; print_r($message->headers); do_some_processing($message->body); $kafkaConsumer->commit($message); }