m6web / kafka-bundle
此包已被弃用且不再维护。没有建议的替代包。
基于rdkafka扩展的Kafka包
v0.10.3
2017-06-26 16:11 UTC
Requires
- php: >=7.0
- ext-rdkafka: *
- kwn/php-rdkafka-stubs: ^0.0.5
- symfony/dependency-injection: ~2.6||~3.0
Requires (Dev)
- atoum/atoum: ~3.0
- atoum/stubs: ~2.5
- m6web/coke: ~2.1
- m6web/symfony2-coding-standard: ~3.1
- symfony/config: ~3.1
- symfony/event-dispatcher: ~3.1
- symfony/expression-language: ~3.1
- symfony/yaml: ~3.1
README
KafkaBundle的配置和使用基于RdKafka扩展。为了消费消息,我们决定使用高级消费者。
安装
针对Symfony
{
"require": {
"m6web/kafka-bundle": "~0.1",
}
}
注册包
// app/AppKernel.php public function registerBundles() { $bundles = array( new M6Web\Bundle\KafkaBundle\M6WebKafkaBundle(), ); }
安装包
$ composer update m6web/kafka-bundle
使用
在配置文件中添加m6_web_kafka
部分。
默认情况下,sf3事件调度器将在每个命令上抛出事件。要禁用此功能
m6_web_kafka: event_dispatcher: false
以下是一个配置示例
m6_web_kafka: event_dispatcher: true producers: producer1: configuration: timeout.ms: 1000 queue.buffering.max.ms: 0 # Maximum time, in milliseconds, for buffering data on the producer queue. 1000ms by default. brokers: - '127.0.0.1' - '10.05.05.19' log_level: 3 events_poll_timeout: 2000 #ms topics: batman: configuration: retries: 3 strategy_partition: 2 catwoman: configuration: retries: 3 strategy_partition: 2 consumers: consumer1: configuration: metadata.broker.list: '127.0.0.1' group.id: 'myConsumerGroup' enable.auto.commit: 0 topicConfiguration: auto.offset.reset: 'smallest' timeout_consuming_queue: 200 topics: - batman - catwoman
注意,我们决定使用高级消费者。因此,您可以在消费者配置中设置“group.id”选项。
configuration: metadata.broker.list: '127.0.0.1' group.id: 'myConsumerGroup'
对于生产者,我们为每个主题有一个主题配置
topics: batman: configuration: retries: 3 strategy_partition: '2' catwoman: configuration: retries: 3 strategy_partition: '2'
而对于消费者,我们为所有主题有一个主题配置
topicConfiguration: auto.offset.reset: 'smallest' timeout_consuming_queue: 200 topics: - batman - catwoman
生产者
生产者用于向服务器发送消息。
在Kafka模型中,消息被发送到主题,这些主题在代理上分区和存储。这意味着在生产者的配置中,您必须指定代理和主题。您可以配置日志级别和分区策略。
由于RdKafka扩展的限制,您不能从包中配置分区数或复制因子。您必须从命令行进行此操作。
设置好您的生产者和选项后,您将能够使用produce
方法发送消息
$producer->produce('message', RD_KAFKA_PARTITION_UA, '12345');
- 第一个参数是要发送的消息。
- 第二个参数是要生产消息的分区。默认值是
RD_KAFKA_PARTITION_UA
,这意味着消息将被发送到随机分区。 - 第三个参数是如果策略分区器是按键的话,它是一个键。
使用RD_KAFKA_PARTITION_UA
常量根据策略分区器。
- 如果策略分区器是
random
(RD_KAFKA_MSG_PARTITIONER_RANDOM
),消息将被随机分配到分区。 - 如果策略分区器是
consistent
(RD_KAFKA_MSG_PARTITIONER_CONSISTENT
)并且定义了键,则消息将被分配到ID映射键的哈希的分区。如果没有定义键,则消息将被分配到相同的分区。
消费者
消费者用于从不同的主题中获取消息。您可以选择通过消费者设置仅一个主题。
在Kafka模型中,消息从主题和代理上分区和存储中被消费。这意味着对于消费者,您必须在配置中指定代理和主题。
要消费消息,您必须使用consume
方法来消费消息
$consumer->consume();
消息将被自动提交,除非出现错误。但您可以通过添加以下参数来选择不提交:
$consumer->consume(false);
您可以选择使用以下方法手动提交消息:
$consumer->commit();
它将提交最后消费的消息。
它将返回一个包含有关消息信息的对象 \RdKafka\Message
:例如,负载、主题或分区。它是来自 RdKafka 扩展 的 \RdKafka\Message
。
如果没有更多消息,它将返回一个 没有更多消息 字符串。如果超时,它将返回一个 超时 字符串。
异常列表
- EntityNotSetException
- KafkaException
- LogLevelNotSetException
- NoBrokerSetException