akson / messenger-kafka
Symfony Messenger Kafka Transport
v0.13.2
2022-08-26 11:12 UTC
Requires
- php: ^7.1.3|8.*
- ext-json: *
- psr/http-client: ^1.0
- psr/http-factory: ^1.0
- psr/http-message: ^1.0
- psr/log: ^1.1
- symfony/config: ^3.0||^4.0||^5.0
- symfony/dependency-injection: ^3.4.26||^4.1.12|^5.0
- symfony/http-kernel: ^3.0||^4.0||^5.0
- symfony/messenger: ^4.4||^5.0
Requires (Dev)
- friendsofphp/php-cs-fixer: ^2.16
- kwn/php-rdkafka-stubs: ^2.0
- nyholm/psr7: ^1.3
- phpstan/phpstan: ^0.12.52
- symfony/framework-bundle: ^5.0
- symfony/phpunit-bridge: ^5.0
- symfony/property-access: ^5.0
- symfony/serializer: ^5.0
Suggests
- ext-rdkafka: ^4.0; Needed to support Kafka connectivity
- symfony/http-client: ^5.1; Needed to support Kafka REST Proxy
This package is not auto-updated.
Last update: 2024-09-20 21:42:38 UTC
README
此包旨在为Symfony Messenger提供一个简单的Kafka传输。Kafka REST Proxy支持即将推出。
安装
使用Symfony Flex的应用程序
打开命令行,进入您的项目目录并执行
$ composer require koco/messenger-kafka
未使用Symfony Flex的应用程序
在添加composer需求后,通过将其添加到项目中config/bundles.php
文件中注册的包列表来启用包
return [ // ... Koco\Kafka\KocoKafkaBundle::class => ['all' => true], ];
配置
DSN
指定以kafka://
或kafka+ssl://
开头的DSN。多个代理之间用,
分隔。
kafka://my-local-kafka:9092
kafka+ssl://my-staging-kafka:9093
kafka+ssl://prod-kafka-01:9093,kafka+ssl://prod-kafka-02:9093,kafka+ssl://prod-kafka-03:9093
示例
kafka_conf
和topic_conf
的配置选项可以在这里找到。强烈建议为消费者设置enable.auto.offset.store
为false
。否则,无论消息处理器抛出什么错误,每条消息都将被确认。
framework: messenger: transports: producer: dsn: '%env(KAFKA_URL)%' # serializer: App\Infrastructure\Messenger\MySerializer options: flushTimeout: 10000 flushRetries: 5 topic: name: 'events' kafka_conf: security.protocol: 'sasl_ssl' ssl.ca.location: '%kernel.project_dir%/config/kafka/ca.pem' sasl.username: '%env(KAFKA_SASL_USERNAME)%' sasl.password: '%env(KAFKA_SASL_PASSWORD)%' sasl.mechanisms: 'SCRAM-SHA-256' consumer: dsn: '%env(KAFKA_URL)%' # serializer: App\Infrastructure\Messenger\MySerializer options: commitAsync: true receiveTimeout: 10000 topic: name: "events" kafka_conf: enable.auto.offset.store: 'false' group.id: 'my-group-id' # should be unique per consumer security.protocol: 'sasl_ssl' ssl.ca.location: '%kernel.project_dir%/config/kafka/ca.pem' sasl.username: '%env(KAFKA_SASL_USERNAME)%' sasl.password: '%env(KAFKA_SASL_PASSWORD)%' sasl.mechanisms: 'SCRAM-SHA-256' max.poll.interval.ms: '45000' topic_conf: auto.offset.reset: 'earliest'
消息格式
您可能需要实现自己的序列化器。请参阅:https://symfony.com.cn/doc/current/messenger.html#serializing-messages
<?php namespace App\Infrastructure\Messenger; use Symfony\Component\Messenger\Envelope; use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface; final class MySerializer implements SerializerInterface { public function decode(array $encodedEnvelope): Envelope { // ... } public function encode(Envelope $envelope): array { // ... } }
如何使用Avro?
与上面基本示例中的用法相同,您需要构建自己的序列化器。在decode()
和encode()
中,您可以使用flix-tech/avro-serde-php。