coste/kafka-iterator

让您通过简单的 foreach 循环消费 Kafka 主题

v0.2.1 2021-11-30 14:28 UTC

This package is auto-updated.

Last update: 2024-09-29 05:49:59 UTC


README

什么是 KafkaConsumerIterator.php?

KafkaConsumerIterator.php 是一个用于 Rdkafka 扩展的抽象层,用 PHP 编写。

介绍

使用 Rdkafka 使得您的代码与一个不太稳定的 API 相耦合… 此库提供了一个符合 PHP 迭代器接口 的类,因此消费 Kafka 主题就像 foreach 循环一样简单,并且可以与其他迭代器结合使用。

这可以简化您的业务逻辑和单元测试,因为没有什么比用另一个 可遍历 对象替换一个对象更容易了。

安装

composer require coste/kafka-iterator

示例

经典用法

<?php

use Coste\Kafka;

$consumer = new ConsumerIterator([
    'topics' => [
        'topic_1',
        'topic_2',
    ],
    'group_id' => 'consumer1',
    'brokers' => [
        '127.0.0.1:9092',
        '127.0.0.2:9092',
    ],
]);

foreach ($consumer as $key => $message) {
    try {

        // 1. Processing a message
        my_complex_process_which_can_fail($message);

    } catch (RuntimeError $e) {

        // 2. Some exception in my process occured
        put_the_message_somewhere($message);

    } catch (Exception $e) {

        // 3. If I want, I can still break the loop.
        //    If I restart the loop, this message will be read again
        //    as it was not treated.
        break;
    }

    // 4. Each loop commits the offset
}

切换迭代器

<?php

use Coste\Kafka;

if ($topics) {
    $it = new ConsumerIterator([
        'topics' => $topics,
        // …
    ]);
} else {
    $it = new \ArrayIterator([
        "message 1",
        "message 2",
    ]);
}

foreach ($it as $message) {
    // …
}

装饰迭代器

<?php

use Coste\Kafka;

$it = new ConsumerIterator([
    'topics' => $topics,
    // …
]);

$it = new \CallbackFilterIterator($it, function($current) {
    return !empty($current);
});
    
$it = new \CachingIterator($it);
    
foreach($it as $message) {
    // …
}

使用 SSL 安全性

<?php

$consumer = new ConsumerIterator([
    'topics' => [
        'topic_1',
        'topic_2'
    ],
    'group_id' => 'consumer1',
    'brokers' => [
        '127.0.0.1:9092',
        '127.0.0.2:9092',
    ],
    'ssl' => [
        'ca' => [
            'location' => '/etc/my_ca.pem',
        ],
        'certificate' => [
             'location' => '/etc/my_certificate.pem',
        ],
        'key' => [
            'location' => '/etc/my_key.pem'],

            // add key password, only if needed
            'password' => 'aabbcc123',
        ],
    ],
]);

// …

贡献

如果您有任何改进的想法或遇到任何错误,请通过 bugtracker 或发送电子邮件到 charles-edouard@coste.dev 告诉我。为了避免知识产权问题并确保我可以在这个代码下保持自由(作为自由)许可,贡献代码将不接受,直到我找到一个简单的贡献者许可协议流程。

许可证

AGPL

版权 (C) 2019-2021 Charles-Édouard Coste

本程序是自由软件:您可以按照自由软件基金会发布的 GNU Affero 通用公共许可证的条款重新分发和/或修改它,许可证版本为 3 或(按您的选择)任何更高版本。

本程序是在希望它有用的前提下分发的,但没有任何保证;甚至没有关于适销性或特定用途的隐含保证。有关详细信息,请参阅 GNU Affero 通用公共许可证。

您应该已经收到了本程序的 GNU Affero 通用公共许可证副本。如果没有,请参阅 https://gnu.ac.cn/licenses/