meitu / php-consumergroup
php kafka消费者
v1.0.4
2019-12-13 06:08 UTC
Requires (Dev)
- phpunit/phpunit: 5.6.*
This package is not auto-updated.
Last update: 2024-09-25 00:12:10 UTC
README
php-consumergroup是一个具有分组和重新平衡支持的Kafka消费者库。
要求
- Apache Kafka 0.7.x, 0.8.x, 0.9.x, 0.10.x
依赖
- php-zookeeper
- php_rdkafka(推荐使用1.0.0版本)
- librdkafka(推荐使用0.9.1版本)
性能
78,000+
条消息/秒(单进程)
更多详情 基准测试
示例
- 通过composer安装此库
<?php
use MTKafka\Consumer;
function call_back_func($msg) {
echo "$msg->payload\n";
}
function handle_error_call_back($msg) {
echo $msg->errstr();
}
$consumer = New Consumer("localhost:2181");
$consumer->setGroupId("group-test");
$consumer->setTopic("topic-test");
$consumer->setOffsetAutoReset(Consumer::SMALLEST);
$consumer->setErrHandler("handle_error_call_back");
try {
$consumer->start("echo_message");
}
catch(Exception $e) {
printf("error: %s\n", $e->getMessage());
}
消费者选项
Consumer::setMaxMessage()
数量,默认为32
如果分区数>1,则当达到最大消息数时,消费者将强制切换到其他分区,否则其他分区可能会被饿死
Consumer::setCommitInterval()
毫秒,默认为500ms
偏移量自动提交间隔。
Consumer::setWatchInterval()
毫秒,默认为10,000 ms
检查重新平衡的时间间隔。当分区数或消费者数量发生变化时,将触发重新平衡。
Consumer::setConsumeTimeout()
毫秒,默认是1,000 ms
Kafka请求超时。
Consumer::setClientId()
字符串,默认为"default"
客户端ID用于识别消费者。
Consumer::setOffsetAutoReset()
smallest|largest
,默认为smallest
当偏移量在zookeeper中不存在或超出范围时,消费者可以选择获取最老的消息或最新的消息。
Consumer::setConf()
属性和值传入
我们可以使用此函数来修改librdkafka的配置。
有关librdkafka配置的更多详情。
异常
- 可恢复异常(例如请求超时),将调用错误处理器,您可以记录错误消息。
- 不可恢复异常(例如kafka/zookeeper损坏),将抛出异常,您应该记录消息并停止消费者。
基准测试
通过在单个分区产生20,000,000
条消息来测量基准测试,并计算消费这些消息所需的时间。
当进程CPU利用率是100%
时,QPS为78,000
条消息/秒。