一氧化碳/kafka-bundle

Symfony 框架的 Kafka Bundle

安装: 0

依赖者: 0

建议者: 0

安全: 0

星标: 0

关注者: 1

分支: 1

开放问题: 0

类型:symfony-bundle

1.2.0 2021-01-18 19:43 UTC

This package is not auto-updated.

Last update: 2024-09-25 21:53:14 UTC


README

安装

在你的应用程序中创建配置文件

# config/packages/kafka.yaml
kafka:
  __client_name__:
    ## configuration: Acme\Configuration
    configuration:
      global:
        group.id: 'some-group'
        metadata.broker.list: 'kafka:9092'
        enable.auto.commit: 'true'
      topic:
        auto.offset.reset: latest
    serializer: Enqueue\RdKafka\JsonSerializer
    logger: Acme\Logger

其中 __client_name__ 是你的客户端名称,你可以将其更改为你想要的任何名称。它将被用于在订阅 Kafka 主题时指定连接,在执行 ./bin/console kafka:consume 命令时。

configuration 字段中描述了与 Kafka 连接的配置。有关配置的详细信息,请参阅此处 -> https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md。此外,你可以传递一个实现接口 D1oxyde\KafkaBundle\Configuration 的类。如果需要动态计算值或从外部系统获取连接数据,这很有用。

serializer 中传递一个类,该类用于将来自主题的消息反序列化,并在发送到主题时序列化。它必须实现接口 Enqueue\RdKafka\Serializer

logger 中传递一个类,该类必须实现接口 D1oxyde\KafkaBundle\Logger。它记录错误和消息的成功投递。

将包连接到项目(config/bundles.php

<?php

return [
    /* ... */
    D1oxyde\KafkaBundle\KafkaBundle::class => ['all' => true],
];

实现

处理器

要订阅 Kafka 主题,需要创建一个处理器(类)并实现接口 D1oxyde\KafkaBundle\Processor

<?php

use D1oxyde\KafkaBundle\Processor;
use Enqueue\RdKafka\RdKafkaContext;
use Enqueue\RdKafka\RdKafkaMessage;

class SomeProcessor implements Processor
{
    public function process(RdKafkaMessage $message, RdKafkaContext $context): string
    {
        echo $message->getBody() . PHP_EOL;

        return self::ACK;
    }

    public function getTopicName(): string
    {
        return 'events';
    }

    public static function getProcessorName(): string
    {
        return 'some-processor';
    }
}

process 方法中,第一个参数是来自 Kafka 的消息对象,第二个参数是 Kafka 连接的上下文。然后它必须返回 self::ACKself::REJECTself::REQUEUE

getTopicName 方法返回处理器订阅的主题名称。

静态方法 getProcessorName 返回处理器的名称。

注册处理器

# config/services.yaml
services:
  Acme\Kafka\SomeProcessor:
    tags:
      - { name: 'kafka.processor' }

生产者

生产者可以通过标签 kafka.internal.producer 访问,实现为类 D1oxyde\KafkaBundle\Producer。要向 Kafka 发送消息,需要调用方法 produce 并传递消息集合 RdKafkaMessage 和主题名称。

启动

./bin/console kafka:consume client-name processor-name

第一个参数是连接名称,该名称在配置文件中定义,第二个参数是处理器名称,由处理器中的 getProcessorName 方法确定。