grongor/kafka-rest-client

Apache Kafka的Confluent REST代理客户端


README

这是一个现代PHP客户端,用于Confluent REST代理(版本2)。

用于与API通信的HTTP客户端遵循现代的HTTP标准,如PSR-7PSR-17PSR-18

除了实现REST代理API,此库还添加了一些方便的类,这些类对于与Apache Kafka的交互非常有用 - 请参阅示例以获取完整列表和简单教程,说明如何使用它们。

缺少的功能

一些功能被有意跳过以简化实现/缩短开发时间。如果您需要任何东西并且愿意创建一个pull request,我会很高兴查看它(如果合理)并将其合并。您可以通过打开问题来开始讨论。

  • REST代理API的版本1
    • 升级REST代理很容易,因此我认为实现第一个版本没有意义。
  • AVRO嵌入格式
  • JSON嵌入格式
    • 我认为二进制格式足够了。您可以在与该库交互之前/之后始终序列化/反序列化您的对象,因此直接集成在这里只是增加了复杂性。
  • 异步操作
    • 为了简化库,所有方法都是同步的。如果需要,这肯定会改变。
  • 此处提到的所有尚未实现的功能(在此处)
    • 实现这些功能后,它们将尽快添加到这里。

示例

生产者

生产者允许您逐条或批量生产消息。两者都使用相同的底层API方法,因此使用批量方法更有效。

成功时,`produce` 和 `produceBatch` 都不返回任何内容,失败时抛出异常。可能会有部分成功/失败,因此抛出的异常 `FailedToProduceMessages` 包含两个公共属性,`$retryable` 和 `$nonRetryable`,其中每个都包含一个数组,`['error' => 'Kafka提供的错误', message => (生产给消息的对象)]`。错误是否可重试基于 `error_code`,如文档所述。您如何处理这些错误取决于您。

$producer = new Producer($restClient);
$producer->produce('your-topic', new Message('some-message'));

$messages = [new Message('some-message'), new Message('and-some-other-message')];
$producer->produceBatch('your-topic', $messages);

消费者

消费者允许您逐条消费消息,直到您从循环中返回、应用程序抛出异常或以其他方式被迫退出。`consume` 方法返回一个 生成器 并无限循环,在消息可用时产生消息。`consume` 方法接受两个参数:`timeout` 和 `maxBytes`。`timeout` 是消费者等待消息的最大时间。`maxBytes` 是单个请求中要获取的消息的最大大小。这两个设置与 `ConsumerOptions` 中的设置以及服务器设置互补(请参阅Kafka文档以获取更多信息)。

如果您将消费者选项 autoCommitEnable 设置为 false,则可以使用消费者的 commit 方法提交消息。只需传入您希望提交的消息。在大多数情况下,建议关闭自动提交并手动提交每条消息,这样您的应用程序在处理过程中死亡时就不会“丢失”任何消息。

您还可以(可选地)使用 setIdleCallback 方法设置一个空闲回调。当没有消息可提供时,将调用此回调。如果没有提供,则空闲间隔等于 timeout 参数,如果设置了 consumerRequestTimeoutMs 选项,则等于该选项,否则等于代理配置选项 consumer.request.timeout.ms

$consumerFactory = new ConsumerFactory($restClient);
$consumer = $consumerFactory->create('your-consumer-group', Subscription::topic('your-topic'));
foreach ($consumer->consume() as $message) {
    // Do your magic
    $logger->info('Got new message', $message->content);

    // ... and when you are done, commit the message (if you turned off auto-committing).
    $consumer->commit($message);
}

BatchConsumer

BatchConsumer 与 Consumer 的工作方式相同;不同之处在于 BatchConsumer 不单独提供每条消息,而是首先将它们放入批次(MessagesBatch)。这些批次可以通过消息数量(maxCount)、时间(maxDuration)或两者(设置 maxCountmaxDuration)进行配置来限制。如果您设置了 maxDuration,则批次将不会超过该时间(加一些处理时间),因为它改变了消费者的 timeout 参数(消费者不会在网络中等待更多消息而卡住)。

BatchConsumer 对于处理“大型”数据集非常有用,否则您必须自己批量处理消息(例如,在插入数据库时,批量操作总是更好的)。

如 Consumer 示例中所述,您可能需要提交消息。为此,有一个 commit 方法,它接受提供的 MessagesBatch 并在一个请求中提交其内部的所有消息。

$batchConsumerFactory = new BatchConsumerFactory($restClient);
$batchConsumer = $batchConsumerFactory->create(
    'your-consumer-group',
    Subscription::topic('your-topic'),
    $maxCount = 10000, // Yield the batch when there is 10 000 messages in it
    $maxDuration = 60  // or when 60 seconds passes, whichever comes first.
);
foreach ($batchConsumer->consume() as $messagesBatch) {
    // The batch might be empty if you specified the maxDuration.
    if ($messagesBatch->isEmpty()) {
        continue;
    }

    // Do your magic
    $database->insert($messagesBatch->getMessages());

    // ... and when you are done, commit the batch.
    $batchConsumer->commit($messagesBatch);
}