anktx/kafka-client

RdKafka的PHP包装器

0.1.4 2024-08-04 09:48 UTC

This package is auto-updated.

Last update: 2024-09-04 10:24:49 UTC


README

此包为使用PHP与Apache Kafka交互提供了强大且灵活的客户端。它的目标是简化消息生产和消费的过程,使得开发人员更容易将Kafka集成到他们的PHP应用程序中。

要求

  • PHP 8.1或更高版本。
  • ext-rdkafka

安装

composer require anktx/kafka-client

通用用法

生产者

use Anktx\Kafka\Client\Config\ProducerConfig;
use Anktx\Kafka\Client\KafkaProducer;
use Anktx\Kafka\Client\Message\KafkaProducerMessage;

$kafkaProducer = new KafkaProducer(
    new ProducerConfig(
        brokers: 'kafka:9092',
        /*  >>> the rest are optional <<<
        queueBufferingMaxKBytes: 2048,
        batchSize: 1024,
        lingerMs: 10,
        compressionType: CompressionType::snappy,
        isDebug: true,
        logger: new \Psr\Log\NullLogger(),
        */
    )
);

$kafkaProducer->produce(
    new KafkaProducerMessage(
        topic: 'topic',
        body: 'message body',
        /*  >>> the rest are optional <<<
        partition: 1,
        key: 'key',
        headers: ['name' => 'value'],
        */
    )
);

$kafkaProducer->flush();

消费者

<?php

use Anktx\Kafka\Client\Config\ConsumerConfig;
use Anktx\Kafka\Client\KafkaConsumer;
use Anktx\Kafka\Client\Subscription\Subscription;
use Anktx\Kafka\Client\Subscription\SubscriptionList;

$kafkaConsumer = new KafkaConsumer(
    new ConsumerConfig(
        brokers: 'kafka:9092',
        groupId: 'groupId',
        instanceId: '1',
        /*  >>> the rest are optional <<<
        offsetReset: OffsetReset::latest,
        autoCommitMs: 1000,
        sessionTimeoutMs: 10000,
        isDebug: true,
        logger: new \Psr\Log\NullLogger(),
        */
    )
);

$kafkaConsumer->subscribe(
    new SubscriptionList(
        new Subscription(topic: 'topic1'),
        // new Subscription(topic: 'topic2', partition: 1),
    ),
);

$messagesToConsume = 100;
$i = 0;

while (++$i < $messagesToConsume) {
    $message = $kafkaConsumer->consume();

    echo $message->body;
    print_r($message->headers);
    
    do_some_processing($message->body);

    $kafkaConsumer->commit($message);
}