m6web/kafka-bundle

此包已被弃用且不再维护。没有建议的替代包。

基于rdkafka扩展的Kafka包

安装次数: 3,472

依赖项: 0

建议者: 0

安全: 0

星标: 8

关注者: 41

分支: 6

类型:symfony-bundle

v0.10.3 2017-06-26 16:11 UTC

This package is auto-updated.

Last update: 2021-10-12 14:42:07 UTC


README

KafkaBundle的配置和使用基于RdKafka扩展。为了消费消息,我们决定使用高级消费者。

Kafka文档

安装

针对Symfony

{
    "require": {
        "m6web/kafka-bundle": "~0.1",
    }
}

注册包

// app/AppKernel.php

public function registerBundles()
{
    $bundles = array(
        new M6Web\Bundle\KafkaBundle\M6WebKafkaBundle(),
    );
}

安装包

$ composer update m6web/kafka-bundle

使用

在配置文件中添加m6_web_kafka部分。

默认情况下,sf3事件调度器将在每个命令上抛出事件。要禁用此功能

m6_web_kafka:
   event_dispatcher: false

以下是一个配置示例

Librdkafka全局配置属性

m6_web_kafka:
    event_dispatcher: true
    producers:
       producer1:
           configuration:
               timeout.ms: 1000
               queue.buffering.max.ms: 0 # Maximum time, in milliseconds, for buffering data on the producer queue. 1000ms by default. 
           brokers:
               - '127.0.0.1'
               - '10.05.05.19'
           log_level: 3
           events_poll_timeout: 2000 #ms
           topics:
               batman:
                   configuration:
                       retries: 3
                   strategy_partition: 2
               catwoman:
                   configuration:
                       retries: 3
                   strategy_partition: 2

    consumers:
        consumer1:
            configuration:
                metadata.broker.list: '127.0.0.1'
                group.id: 'myConsumerGroup'
                enable.auto.commit: 0
            topicConfiguration:
                auto.offset.reset: 'smallest'
            timeout_consuming_queue: 200
            topics:
                - batman
                - catwoman

注意,我们决定使用高级消费者。因此,您可以在消费者配置中设置“group.id”选项。

configuration:
  metadata.broker.list: '127.0.0.1'
  group.id: 'myConsumerGroup'

对于生产者,我们为每个主题有一个主题配置

 topics:
   batman:
       configuration:
           retries: 3
       strategy_partition: '2'
   catwoman:
       configuration:
           retries: 3
       strategy_partition: '2'

而对于消费者,我们为所有主题有一个主题配置

topicConfiguration:
    auto.offset.reset: 'smallest'
timeout_consuming_queue: 200
topics:
    - batman
    - catwoman

生产者

生产者用于向服务器发送消息。

在Kafka模型中,消息被发送到主题,这些主题在代理上分区和存储。这意味着在生产者的配置中,您必须指定代理和主题。您可以配置日志级别和分区策略。

由于RdKafka扩展的限制,您不能从包中配置分区数或复制因子。您必须从命令行进行此操作。

设置好您的生产者和选项后,您将能够使用produce方法发送消息

$producer->produce('message', RD_KAFKA_PARTITION_UA, '12345');
  • 第一个参数是要发送的消息。
  • 第二个参数是要生产消息的分区。默认值是RD_KAFKA_PARTITION_UA,这意味着消息将被发送到随机分区。
  • 第三个参数是如果策略分区器是按键的话,它是一个键。

使用RD_KAFKA_PARTITION_UA常量根据策略分区器。

  • 如果策略分区器是randomRD_KAFKA_MSG_PARTITIONER_RANDOM),消息将被随机分配到分区。
  • 如果策略分区器是consistentRD_KAFKA_MSG_PARTITIONER_CONSISTENT)并且定义了键,则消息将被分配到ID映射键的哈希的分区。如果没有定义键,则消息将被分配到相同的分区。

消费者

消费者用于从不同的主题中获取消息。您可以选择通过消费者设置仅一个主题。

在Kafka模型中,消息从主题和代理上分区和存储中被消费。这意味着对于消费者,您必须在配置中指定代理和主题。

要消费消息,您必须使用consume方法来消费消息

$consumer->consume();

消息将被自动提交,除非出现错误。但您可以通过添加以下参数来选择不提交:

$consumer->consume(false);

您可以选择使用以下方法手动提交消息:

$consumer->commit();

它将提交最后消费的消息。

它将返回一个包含有关消息信息的对象 \RdKafka\Message:例如,负载、主题或分区。它是来自 RdKafka 扩展\RdKafka\Message

如果没有更多消息,它将返回一个 没有更多消息 字符串。如果超时,它将返回一个 超时 字符串。

异常列表

  • EntityNotSetException
  • KafkaException
  • LogLevelNotSetException
  • NoBrokerSetException