meitu/php-consumergroup

php kafka消费者

v1.0.4 2019-12-13 06:08 UTC

README

php-consumergroup是一个具有分组和重新平衡支持的Kafka消费者库。

中文文档

要求

  • Apache Kafka 0.7.x, 0.8.x, 0.9.x, 0.10.x

依赖

性能

78,000+条消息/秒(单进程)

更多详情 基准测试

示例

  • 通过composer安装此库
<?php 
use MTKafka\Consumer;

function call_back_func($msg) {
    echo "$msg->payload\n";
}

function handle_error_call_back($msg) {
    echo $msg->errstr();
}

$consumer = New Consumer("localhost:2181");
$consumer->setGroupId("group-test");
$consumer->setTopic("topic-test");
$consumer->setOffsetAutoReset(Consumer::SMALLEST);
$consumer->setErrHandler("handle_error_call_back");

try {
    $consumer->start("echo_message");
}
catch(Exception $e) {
    printf("error: %s\n", $e->getMessage());
}

查看example.php

消费者选项

Consumer::setMaxMessage()

数量,默认为32

如果分区数>1,则当达到最大消息数时,消费者将强制切换到其他分区,否则其他分区可能会被饿死

Consumer::setCommitInterval()

毫秒,默认为500ms

偏移量自动提交间隔。

Consumer::setWatchInterval()

毫秒,默认为10,000 ms

检查重新平衡的时间间隔。当分区数或消费者数量发生变化时,将触发重新平衡。

Consumer::setConsumeTimeout()

毫秒,默认是1,000 ms

Kafka请求超时。

Consumer::setClientId()

字符串,默认为"default"

客户端ID用于识别消费者。

Consumer::setOffsetAutoReset()

smallest|largest,默认为smallest

当偏移量在zookeeper中不存在或超出范围时,消费者可以选择获取最老的消息或最新的消息。

Consumer::setConf()

属性和值传入

我们可以使用此函数来修改librdkafka的配置。

有关librdkafka配置的更多详情。

异常

  • 可恢复异常(例如请求超时),将调用错误处理器,您可以记录错误消息。
  • 不可恢复异常(例如kafka/zookeeper损坏),将抛出异常,您应该记录消息并停止消费者。

基准测试

通过在单个分区产生20,000,000条消息来测量基准测试,并计算消费这些消息所需的时间。

当进程CPU利用率是100%时,QPS为78,000条消息/秒。