akson/messenger-kafka

Symfony Messenger Kafka Transport

安装次数: 1,962

依赖关系: 0

建议者: 0

安全性: 0

星星: 0

关注者: 0

分支: 35

类型:symfony-bundle

v0.13.2 2022-08-26 11:12 UTC

README

License Packagist Maintainability CircleCI Tests

此包旨在为Symfony Messenger提供一个简单的Kafka传输。Kafka REST Proxy支持即将推出。

安装

使用Symfony Flex的应用程序

打开命令行,进入您的项目目录并执行

$ composer require koco/messenger-kafka

未使用Symfony Flex的应用程序

在添加composer需求后,通过将其添加到项目中config/bundles.php文件中注册的包列表来启用包

return [
    // ...
    Koco\Kafka\KocoKafkaBundle::class => ['all' => true],
];

配置

DSN

指定以kafka://kafka+ssl://开头的DSN。多个代理之间用,分隔。

  • kafka://my-local-kafka:9092
  • kafka+ssl://my-staging-kafka:9093
  • kafka+ssl://prod-kafka-01:9093,kafka+ssl://prod-kafka-02:9093,kafka+ssl://prod-kafka-03:9093

示例

kafka_conftopic_conf的配置选项可以在这里找到。强烈建议为消费者设置enable.auto.offset.storefalse。否则,无论消息处理器抛出什么错误,每条消息都将被确认。

framework:
    messenger:
        transports:
            producer:
                dsn: '%env(KAFKA_URL)%'
                # serializer: App\Infrastructure\Messenger\MySerializer
                options:
                    flushTimeout: 10000
                    flushRetries: 5
                    topic:
                        name: 'events'
                    kafka_conf:
                        security.protocol: 'sasl_ssl'
                        ssl.ca.location: '%kernel.project_dir%/config/kafka/ca.pem'
                        sasl.username: '%env(KAFKA_SASL_USERNAME)%'
                        sasl.password: '%env(KAFKA_SASL_PASSWORD)%'
                        sasl.mechanisms: 'SCRAM-SHA-256'
            consumer:
                dsn: '%env(KAFKA_URL)%'
                # serializer: App\Infrastructure\Messenger\MySerializer
                options:
                    commitAsync: true
                    receiveTimeout: 10000
                    topic:
                        name: "events"
                    kafka_conf:
                        enable.auto.offset.store: 'false'
                        group.id: 'my-group-id' # should be unique per consumer
                        security.protocol: 'sasl_ssl'
                        ssl.ca.location: '%kernel.project_dir%/config/kafka/ca.pem'
                        sasl.username: '%env(KAFKA_SASL_USERNAME)%'
                        sasl.password: '%env(KAFKA_SASL_PASSWORD)%'
                        sasl.mechanisms: 'SCRAM-SHA-256'
                        max.poll.interval.ms: '45000'
                    topic_conf:
                        auto.offset.reset: 'earliest'

消息格式

您可能需要实现自己的序列化器。请参阅:https://symfony.com.cn/doc/current/messenger.html#serializing-messages

<?php
namespace App\Infrastructure\Messenger;

use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;

final class MySerializer implements SerializerInterface
{
    public function decode(array $encodedEnvelope): Envelope
    {
        // ...
    }

    public function encode(Envelope $envelope): array
    {
        // ...
    }

}

如何使用Avro?

与上面基本示例中的用法相同,您需要构建自己的序列化器。在decode()encode()中,您可以使用flix-tech/avro-serde-php