一氧化碳 / kafka-bundle
Symfony 框架的 Kafka Bundle
Requires
- php: ^7.1
- ext-json: *
- ext-rdkafka: ^3.1.2
- enqueue/rdkafka: ^0.10.3
This package is not auto-updated.
Last update: 2024-09-25 21:53:14 UTC
README
安装
在你的应用程序中创建配置文件
# config/packages/kafka.yaml kafka: __client_name__: ## configuration: Acme\Configuration configuration: global: group.id: 'some-group' metadata.broker.list: 'kafka:9092' enable.auto.commit: 'true' topic: auto.offset.reset: latest serializer: Enqueue\RdKafka\JsonSerializer logger: Acme\Logger
其中 __client_name__
是你的客户端名称,你可以将其更改为你想要的任何名称。它将被用于在订阅 Kafka 主题时指定连接,在执行 ./bin/console kafka:consume
命令时。
在 configuration
字段中描述了与 Kafka 连接的配置。有关配置的详细信息,请参阅此处 -> https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md。此外,你可以传递一个实现接口 D1oxyde\KafkaBundle\Configuration
的类。如果需要动态计算值或从外部系统获取连接数据,这很有用。
在 serializer
中传递一个类,该类用于将来自主题的消息反序列化,并在发送到主题时序列化。它必须实现接口 Enqueue\RdKafka\Serializer
。
在 logger
中传递一个类,该类必须实现接口 D1oxyde\KafkaBundle\Logger
。它记录错误和消息的成功投递。
将包连接到项目(config/bundles.php
)
<?php return [ /* ... */ D1oxyde\KafkaBundle\KafkaBundle::class => ['all' => true], ];
实现
处理器
要订阅 Kafka 主题,需要创建一个处理器(类)并实现接口 D1oxyde\KafkaBundle\Processor
<?php use D1oxyde\KafkaBundle\Processor; use Enqueue\RdKafka\RdKafkaContext; use Enqueue\RdKafka\RdKafkaMessage; class SomeProcessor implements Processor { public function process(RdKafkaMessage $message, RdKafkaContext $context): string { echo $message->getBody() . PHP_EOL; return self::ACK; } public function getTopicName(): string { return 'events'; } public static function getProcessorName(): string { return 'some-processor'; } }
在 process
方法中,第一个参数是来自 Kafka 的消息对象,第二个参数是 Kafka 连接的上下文。然后它必须返回 self::ACK
、self::REJECT
或 self::REQUEUE
。
getTopicName
方法返回处理器订阅的主题名称。
静态方法 getProcessorName
返回处理器的名称。
注册处理器
# config/services.yaml services: Acme\Kafka\SomeProcessor: tags: - { name: 'kafka.processor' }
生产者
生产者可以通过标签 kafka.internal.producer
访问,实现为类 D1oxyde\KafkaBundle\Producer
。要向 Kafka 发送消息,需要调用方法 produce
并传递消息集合 RdKafkaMessage
和主题名称。
启动
./bin/console kafka:consume client-name processor-name
第一个参数是连接名称,该名称在配置文件中定义,第二个参数是处理器名称,由处理器中的 getProcessorName
方法确定。