koco/messenger-kafka

Symfony Messenger Kafka 传输

安装次数: 659,656

依赖项: 1

建议者: 3

安全性: 0

星标: 84

关注者: 5

分支: 35

开放性问题: 16

类型:symfony-bundle

v0.18 2024-04-30 14:27 UTC

README

License Packagist Maintainability CircleCI Tests

此包旨在为 Symfony Messenger 提供简单的 Kafka 传输。即将推出 Kafka REST 代理支持。

安装

使用 Symfony Flex 的应用程序

打开命令行控制台,进入您的项目目录并执行

$ composer require koco/messenger-kafka

不使用 Symfony Flex 的应用程序

添加 composer 依赖项后,将包添加到项目中 config/bundles.php 文件中注册的包列表以启用该包

return [
    // ...
    Koco\Kafka\KocoKafkaBundle::class => ['all' => true],
];

配置

DSN

指定以 kafka://kafka+ssl:// 开头的 DSN。多个代理通过 , 分隔。

  • kafka://my-local-kafka:9092
  • kafka+ssl://my-staging-kafka:9093
  • kafka+ssl://prod-kafka-01:9093,kafka+ssl://prod-kafka-02:9093,kafka+ssl://prod-kafka-03:9093

示例

有关 kafka_conftopic_conf 的配置选项,请参阅此处。强烈建议对于消费者将 enable.auto.offset.store 设置为 false。否则,无论消息处理程序抛出任何错误,每条消息都将被确认。

framework:
    messenger:
        transports:
            producer:
                dsn: '%env(KAFKA_URL)%'
                # serializer: App\Infrastructure\Messenger\MySerializer
                options:
                    flushTimeout: 10000
                    flushRetries: 5
                    topic:
                        name: 'events'
                    kafka_conf:
                        security.protocol: 'sasl_ssl'
                        ssl.ca.location: '%kernel.project_dir%/config/kafka/ca.pem'
                        sasl.username: '%env(KAFKA_SASL_USERNAME)%'
                        sasl.password: '%env(KAFKA_SASL_PASSWORD)%'
                        sasl.mechanisms: 'SCRAM-SHA-256'
            consumer:
                dsn: '%env(KAFKA_URL)%'
                # serializer: App\Infrastructure\Messenger\MySerializer
                options:
                    commitAsync: true
                    receiveTimeout: 10000
                    topic:
                        name: "events"
                    kafka_conf:
                        enable.auto.offset.store: 'false'
                        group.id: 'my-group-id' # should be unique per consumer
                        security.protocol: 'sasl_ssl'
                        ssl.ca.location: '%kernel.project_dir%/config/kafka/ca.pem'
                        sasl.username: '%env(KAFKA_SASL_USERNAME)%'
                        sasl.password: '%env(KAFKA_SASL_PASSWORD)%'
                        sasl.mechanisms: 'SCRAM-SHA-256'
                        max.poll.interval.ms: '45000'
                    topic_conf:
                        auto.offset.reset: 'earliest'

序列化器

您很可能会想实现自己的序列化器。请参阅:https://symfony.ac.cn/doc/current/messenger.html#serializing-messages

decode()encode() 方法中,可以使用 keyheadersbody 字段。

<?php
namespace App\Infrastructure\Messenger;

use App\Catalogue\Domain\Model\Event\ProductCreated;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;

final class MySerializer implements SerializerInterface
{
    public function decode(array $encodedEnvelope): Envelope
    {
        $record = json_decode($encodedEnvelope['body'], true);

        return new Envelope(new ProductCreated(
            $record['id'],
            $record['name'],
            $record['description'],
        ));
    }

    public function encode(Envelope $envelope): array
    {
        /** @var ProductCreated $event */
        $event = $envelope->getMessage();
        
        return [
            'key' => $event->getId(),
            'headers' => [],
            'body' => json_encode([
                'id' => $event->getId(),
                'name' => $event->getName(),
                'description' => $event->getDescription(),
            ]),
        ];
    }

}

我如何与 Avro 一起工作?

与上面基本示例相同,您需要构建自己的序列化器。在 decode()encode() 中,您可以使用 flix-tech/avro-serde-php

关于 Confluent Schema Registry 呢?

要连接到 Schema Registry 并控制各种设置,您可以使用此包

$ composer require koco/avro-regy

并将其配置为与您的设置相匹配

avro_regy:
  base_uri: '%env(SCHEMA_REGISTRY_URL)%'
  file_naming_strategy: subject
  options:
    register_missing_schemas: true
    register_missing_subjects: true
  serializers:
    catalogue:
      schema_dir: '%kernel.project_dir%/src/Catalogue/Domain/Model/Event/Avro/'
    orders:
      schema_dir: '%kernel.project_dir%/src/Orders/Domain/Model/Event/Avro/'
      file_naming_strategy: qualified_name
      options:
        register_missing_schemas: false
        register_missing_subjects: false

请参阅 https://github.com/KonstantinCodes/avro-regy 获取完整文档。