coste / kafka-iterator
让您通过简单的 foreach 循环消费 Kafka 主题
v0.2.1
2021-11-30 14:28 UTC
Requires
- ext-rdkafka: ~4.0|~5.0
README
什么是 KafkaConsumerIterator.php?
KafkaConsumerIterator.php 是一个用于 Rdkafka 扩展的抽象层,用 PHP 编写。
介绍
使用 Rdkafka 使得您的代码与一个不太稳定的 API 相耦合… 此库提供了一个符合 PHP 迭代器接口 的类,因此消费 Kafka 主题就像 foreach 循环一样简单,并且可以与其他迭代器结合使用。
这可以简化您的业务逻辑和单元测试,因为没有什么比用另一个 可遍历 对象替换一个对象更容易了。
安装
composer require coste/kafka-iterator
示例
经典用法
<?php
use Coste\Kafka;
$consumer = new ConsumerIterator([
'topics' => [
'topic_1',
'topic_2',
],
'group_id' => 'consumer1',
'brokers' => [
'127.0.0.1:9092',
'127.0.0.2:9092',
],
]);
foreach ($consumer as $key => $message) {
try {
// 1. Processing a message
my_complex_process_which_can_fail($message);
} catch (RuntimeError $e) {
// 2. Some exception in my process occured
put_the_message_somewhere($message);
} catch (Exception $e) {
// 3. If I want, I can still break the loop.
// If I restart the loop, this message will be read again
// as it was not treated.
break;
}
// 4. Each loop commits the offset
}
切换迭代器
<?php
use Coste\Kafka;
if ($topics) {
$it = new ConsumerIterator([
'topics' => $topics,
// …
]);
} else {
$it = new \ArrayIterator([
"message 1",
"message 2",
]);
}
foreach ($it as $message) {
// …
}
装饰迭代器
<?php
use Coste\Kafka;
$it = new ConsumerIterator([
'topics' => $topics,
// …
]);
$it = new \CallbackFilterIterator($it, function($current) {
return !empty($current);
});
$it = new \CachingIterator($it);
foreach($it as $message) {
// …
}
使用 SSL 安全性
<?php
$consumer = new ConsumerIterator([
'topics' => [
'topic_1',
'topic_2'
],
'group_id' => 'consumer1',
'brokers' => [
'127.0.0.1:9092',
'127.0.0.2:9092',
],
'ssl' => [
'ca' => [
'location' => '/etc/my_ca.pem',
],
'certificate' => [
'location' => '/etc/my_certificate.pem',
],
'key' => [
'location' => '/etc/my_key.pem'],
// add key password, only if needed
'password' => 'aabbcc123',
],
],
]);
// …
贡献
如果您有任何改进的想法或遇到任何错误,请通过 bugtracker 或发送电子邮件到 charles-edouard@coste.dev 告诉我。为了避免知识产权问题并确保我可以在这个代码下保持自由(作为自由)许可,贡献代码将不接受,直到我找到一个简单的贡献者许可协议流程。
许可证
版权 (C) 2019-2021 Charles-Édouard Coste
本程序是自由软件:您可以按照自由软件基金会发布的 GNU Affero 通用公共许可证的条款重新分发和/或修改它,许可证版本为 3 或(按您的选择)任何更高版本。
本程序是在希望它有用的前提下分发的,但没有任何保证;甚至没有关于适销性或特定用途的隐含保证。有关详细信息,请参阅 GNU Affero 通用公共许可证。
您应该已经收到了本程序的 GNU Affero 通用公共许可证副本。如果没有,请参阅 https://gnu.ac.cn/licenses/。