fatih/tuzlu / php-apache-kafka

您可以使用此包来管理多数据

v1.0.0 2024-03-10 12:37 UTC

This package is auto-updated.

Last update: 2024-09-10 16:24:40 UTC


README

PHP Kafka CDC 是一个由 Fatih Tuzlu 开发的包,用于在 PHP 应用程序中管理 Apache Kafka 的多数据。它提供了使用 Kafka 消费和产生消息的功能。

注意

不要忘记安装 rdkafka 库

MacOS

brew install librdkafka

Debian

$ apt install librdkafka-dev

RedHat、CentOS、Fedora

$ yum install librdkafka-devel

Windows

# Install vcpkg if not already installed
$ git clone https://github.com/Microsoft/vcpkg.git
$ cd vcpkg
$ ./bootstrap-vcpkg.sh
$ ./vcpkg integrate install

# Install librdkafka
$ vcpkg install librdkafka

详细说明

You can visit https://github.com/confluentinc/librdkafka

安装

您可以通过 Composer 安装 PHP Kafka CDC。在终端中运行以下命令:

composer require fatihtuzlu/php-apache-kafka

需求

PHP Kafka CDC 需要以下依赖项

idealo/php-rdkafka-ffi: ^0.5.0
php: ^8.0.0

用法

消费者

要从 Kafka 主题消费消息,您可以使用 ConsumerManager 类。以下是如何设置消费者的基本示例

use Fatihtuzlu\PHPKafkaCDC\ConsumerManager;

// Initialize consumer manager
$consumer = new ConsumerManager(
    $topic,       // Kafka topic to consume from
    $groupId,     // Consumer group ID
    $bootstrapServers // Kafka bootstrap servers
);

// Fetch messages from Kafka
$message = $consumer->fetchMessages($topic, $partition, $offset, $timeMs);

生产者

要将消息发送到 Kafka 主题,您可以使用 ProducerManager 类。以下是如何设置生产者的基本示例

use Fatihtuzlu\PHPKafkaCDC\ProducerManager;

// Initialize producer manager
$producer = new ProducerManager(
    $topic,             // Kafka topic to produce to
    $bootstrapServers   // Kafka bootstrap servers
);

// Send a message to Kafka
$producer->sendMessages($message, $flush = 1000, $debug = false);

配置

调试

您可以通过调用 debug() 方法来启用 Kafka 连接的调试

$producer->debug();

额外的代理

$producer->addBrokers($brokerList);

示例

示例消费者

use Fatihtuzlu\PHPKafkaCDC\ConsumerManager;

// Kafka broker and topic information
$bootstrapServers = "localhost:9092";
$topic = "my_topic";
$groupId = "my_group";

// Initialize the consumer
$consumer = new ConsumerManager($topic, $groupId, $bootstrapServers);

// Fetch messages
$message = $consumer->fetchMessages($topic, 0, RD_KAFKA_OFFSET_BEGINNING);

// Process the received message
if ($message !== null) {
    echo "Received message: " . $message->payload . "\n";
} else {
    echo "No messages available.\n";
}

示例生产者

use Fatihtuzlu\PHPKafkaCDC\ProducerManager;

// Kafka broker and topic information
$bootstrapServers = "localhost:9092";
$topic = "my_topic";

// Initialize the producer
$producer = new ProducerManager($topic, $bootstrapServers);

// Send a message
$messageToSend = "Hello, Kafka!";
$producer->sendMessages($messageToSend);

echo "Message sent to Kafka: " . $messageToSend . "\n";

在这些示例中,消费者从 Kafka 获取消息,而生产者向 Kafka 发送消息。my_topic 代表 Kafka 中的目标主题。请确保根据您自己的 Kafka 集群更新配置设置。