simpod / kafka-bundle
Kafka Symfony 扩展包。
0.6.3
2023-12-08 12:38 UTC
Requires
- php: ^8.2
- simpod/kafka: ^0.2.0
- symfony/config: ^6.4 || ^7.0
- symfony/console: ^6.4 || ^7.0
- symfony/contracts: ^3.0
- symfony/dependency-injection: ^6.4 || ^7.0
- symfony/framework-bundle: ^6.4 || ^7.0
- thecodingmachine/safe: ^2
Requires (Dev)
- doctrine/coding-standard: ^12.0
- infection/infection: ^0.27.0
- kwn/php-rdkafka-stubs: ^2.0
- phpstan/extension-installer: ^1.1
- phpstan/phpstan: ^1.0.0
- phpstan/phpstan-phpunit: ^1.0.0
- phpstan/phpstan-strict-rules: ^1.0.0
- phpunit/phpunit: ^10.3
- psalm/plugin-phpunit: ^0.18.4
- roave/infection-static-analysis-plugin: ^1.7
- symfony/yaml: ^7.0
- thecodingmachine/phpstan-safe-rule: ^1.0
- vimeo/psalm: ^5.8
Suggests
- kwn/php-rdkafka-stubs: Support and autocompletion for RDKafka in IDE | require as dev dependency
README
安装
将Composer作为依赖项添加
composer require simpod/kafka-bundle
然后将KafkaBundle
添加到Symfony的bundles.php
use SimPod\KafkaBundle\SimPodKafkaBundle; return [ ... new SimPodKafkaBundle() ... ];
用法
此包简化了将https://github.com/arnaud-lb/php-rdkafka与Symfony集成的过程。有关如何在PHP中使用Kafka的更多详细信息,请参阅其文档。
可用的控制台命令
bin/console debug:kafka:consumers
列出所有可用的消费者组bin/console kafka:consumer:run <consumer name>
运行消费者实例
配置
您可以在配置目录中创建一个名为kafka.yaml
的文件,其内容如下
kafka: authentication: '%env(KAFKA_AUTHENTICATION)%' bootstrap_servers: '%env(KAFKA_BOOTSTRAP_SERVERS)%' client: id: 'your-application-name'
authentication
读取环境变量KAFKA_AUTHENTICATION
,它包含认证URI(sasl-plain://user:password
,或者它可能是空的,表示没有认证)。bootstrap_servers
读取环境变量KAFKA_BOOTSTRAP_SERVERS
,它包含逗号分隔的引导服务器列表(broker-1.kafka:9092,broker-2.kafka:9092
)。
如果未设置bootstrap_servers
,则默认为127.0.0.1:9092
服务
以下服务已在容器中注册,并可进行依赖注入。
配置
class: \SimPod\KafkaBundle\Kafka\Configuration
配置服务允许轻松访问所有配置属性。
$config->set(ConsumerConfig::CLIENT_ID_CONFIG, $this->configuration->getIdWithHostname());
消费
有一个可用的NamedConsumer
接口。当您的消费者实现该接口时,此包会自动注册它。
这是一个简单的消费者示例,然后可以通过bin/console kafka:consumer:run consumer1
运行
<?php declare(strict_types=1); namespace Your\AppNamespace; use SimPod\Kafka\Clients\Consumer\ConsumerConfig; use SimPod\Kafka\Clients\Consumer\KafkaConsumer; use SimPod\KafkaBundle\Kafka\Configuration; use SimPod\KafkaBundle\Kafka\Clients\Consumer\NamedConsumer; final class ExampleKafkaConsumer implements NamedConsumer { private Configuration $configuration; public function __construct(Configuration $configuration) { $this->configuration = $configuration; } public function run(): void { $kafkaConsumer = new KafkaConsumer($this->getConfig()); $kafkaConsumer->subscribe(['topic1']); while (true) { ... } } public function getName(): string { return 'consumer1'; } private function getConfig(): ConsumerConfig { $config = new ConsumerConfig(); $config->set(ConsumerConfig::BOOTSTRAP_SERVERS_CONFIG, $this->configuration->getBootstrapServers()); $config->set(ConsumerConfig::ENABLE_AUTO_COMMIT_CONFIG, false); $config->set(ConsumerConfig::CLIENT_ID_CONFIG, $this->configuration->getClientIdWithHostname()); $config->set(ConsumerConfig::AUTO_OFFSET_RESET_CONFIG, 'earliest'); $config->set(ConsumerConfig::GROUP_ID_CONFIG, 'consumer_group'); return $config; } }
开发
已将kwn/php-rdkafka-stubs
列为开发依赖项,以便正确地将php-rdkafka扩展与IDE集成。