koco / messenger-kafka
Symfony Messenger Kafka 传输
v0.18
2024-04-30 14:27 UTC
Requires
- php: ^7.4|8.*
- ext-json: *
- psr/http-client: ^1.0
- psr/http-factory: ^1.0
- psr/http-message: ^1.0
- psr/log: ^1.0.1||^2.0 ||^3.0
- symfony/config: ^3.0||^4.0||^5.0||^6.0||7.*
- symfony/dependency-injection: ^3.4.26||^4.1.12|^5.0||^6.0||7.*
- symfony/http-kernel: ^3.0||^4.0||^5.0||^6.0||7.*
- symfony/messenger: ^4.4||^5.0||^6.0||7.*
Requires (Dev)
- friendsofphp/php-cs-fixer: ^3.6
- kwn/php-rdkafka-stubs: ^2.1
- nyholm/psr7: ^1.4
- phpstan/phpstan: ^1.4.6
- symfony/framework-bundle: ^5.0||^6.0
- symfony/phpunit-bridge: ^5.0||^6.0
- symfony/property-access: ^5.0||^6.0
- symfony/serializer: ^5.0||^6.0
Suggests
- ext-rdkafka: ^4.0; Needed to support Kafka connectivity
- koco/avro-regy: Confluent Schema Registry integration
This package is auto-updated.
Last update: 2024-08-30 15:14:12 UTC
README
此包旨在为 Symfony Messenger 提供简单的 Kafka 传输。即将推出 Kafka REST 代理支持。
安装
使用 Symfony Flex 的应用程序
打开命令行控制台,进入您的项目目录并执行
$ composer require koco/messenger-kafka
不使用 Symfony Flex 的应用程序
添加 composer 依赖项后,将包添加到项目中 config/bundles.php
文件中注册的包列表以启用该包
return [ // ... Koco\Kafka\KocoKafkaBundle::class => ['all' => true], ];
配置
DSN
指定以 kafka://
或 kafka+ssl://
开头的 DSN。多个代理通过 ,
分隔。
kafka://my-local-kafka:9092
kafka+ssl://my-staging-kafka:9093
kafka+ssl://prod-kafka-01:9093,kafka+ssl://prod-kafka-02:9093,kafka+ssl://prod-kafka-03:9093
示例
有关 kafka_conf
和 topic_conf
的配置选项,请参阅此处。强烈建议对于消费者将 enable.auto.offset.store
设置为 false
。否则,无论消息处理程序抛出任何错误,每条消息都将被确认。
framework: messenger: transports: producer: dsn: '%env(KAFKA_URL)%' # serializer: App\Infrastructure\Messenger\MySerializer options: flushTimeout: 10000 flushRetries: 5 topic: name: 'events' kafka_conf: security.protocol: 'sasl_ssl' ssl.ca.location: '%kernel.project_dir%/config/kafka/ca.pem' sasl.username: '%env(KAFKA_SASL_USERNAME)%' sasl.password: '%env(KAFKA_SASL_PASSWORD)%' sasl.mechanisms: 'SCRAM-SHA-256' consumer: dsn: '%env(KAFKA_URL)%' # serializer: App\Infrastructure\Messenger\MySerializer options: commitAsync: true receiveTimeout: 10000 topic: name: "events" kafka_conf: enable.auto.offset.store: 'false' group.id: 'my-group-id' # should be unique per consumer security.protocol: 'sasl_ssl' ssl.ca.location: '%kernel.project_dir%/config/kafka/ca.pem' sasl.username: '%env(KAFKA_SASL_USERNAME)%' sasl.password: '%env(KAFKA_SASL_PASSWORD)%' sasl.mechanisms: 'SCRAM-SHA-256' max.poll.interval.ms: '45000' topic_conf: auto.offset.reset: 'earliest'
序列化器
您很可能会想实现自己的序列化器。请参阅:https://symfony.ac.cn/doc/current/messenger.html#serializing-messages
在 decode()
和 encode()
方法中,可以使用 key
、headers
和 body
字段。
<?php namespace App\Infrastructure\Messenger; use App\Catalogue\Domain\Model\Event\ProductCreated; use Symfony\Component\Messenger\Envelope; use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface; final class MySerializer implements SerializerInterface { public function decode(array $encodedEnvelope): Envelope { $record = json_decode($encodedEnvelope['body'], true); return new Envelope(new ProductCreated( $record['id'], $record['name'], $record['description'], )); } public function encode(Envelope $envelope): array { /** @var ProductCreated $event */ $event = $envelope->getMessage(); return [ 'key' => $event->getId(), 'headers' => [], 'body' => json_encode([ 'id' => $event->getId(), 'name' => $event->getName(), 'description' => $event->getDescription(), ]), ]; } }
我如何与 Avro 一起工作?
与上面基本示例相同,您需要构建自己的序列化器。在 decode()
和 encode()
中,您可以使用 flix-tech/avro-serde-php。
关于 Confluent Schema Registry 呢?
要连接到 Schema Registry 并控制各种设置,您可以使用此包
$ composer require koco/avro-regy
并将其配置为与您的设置相匹配
avro_regy: base_uri: '%env(SCHEMA_REGISTRY_URL)%' file_naming_strategy: subject options: register_missing_schemas: true register_missing_subjects: true serializers: catalogue: schema_dir: '%kernel.project_dir%/src/Catalogue/Domain/Model/Event/Avro/' orders: schema_dir: '%kernel.project_dir%/src/Orders/Domain/Model/Event/Avro/' file_naming_strategy: qualified_name options: register_missing_schemas: false register_missing_subjects: false
请参阅 https://github.com/KonstantinCodes/avro-regy 获取完整文档。