simpod/kafka-bundle

Kafka Symfony 扩展包。

0.6.3 2023-12-08 12:38 UTC

This package is auto-updated.

Last update: 2024-08-28 03:44:34 UTC


README

GitHub Actions Code Coverage Downloads Packagist Infection MSI

安装

Composer作为依赖项添加

composer require simpod/kafka-bundle

然后将KafkaBundle添加到Symfony的bundles.php

use SimPod\KafkaBundle\SimPodKafkaBundle;

return [
    ...
    new SimPodKafkaBundle()
    ...
];

用法

此包简化了将https://github.com/arnaud-lb/php-rdkafka与Symfony集成的过程。有关如何在PHP中使用Kafka的更多详细信息,请参阅其文档。

可用的控制台命令

  • bin/console debug:kafka:consumers列出所有可用的消费者组
  • bin/console kafka:consumer:run <consumer name>运行消费者实例

配置

您可以在配置目录中创建一个名为kafka.yaml的文件,其内容如下

kafka:
    authentication: '%env(KAFKA_AUTHENTICATION)%'
    bootstrap_servers: '%env(KAFKA_BOOTSTRAP_SERVERS)%'
    client:
        id: 'your-application-name'
  • authentication读取环境变量KAFKA_AUTHENTICATION,它包含认证URI(sasl-plain://user:password,或者它可能是空的,表示没有认证)。
  • bootstrap_servers读取环境变量KAFKA_BOOTSTRAP_SERVERS,它包含逗号分隔的引导服务器列表(broker-1.kafka:9092,broker-2.kafka:9092)。

如果未设置bootstrap_servers,则默认为127.0.0.1:9092

服务

以下服务已在容器中注册,并可进行依赖注入。

配置

class: \SimPod\KafkaBundle\Kafka\Configuration

配置服务允许轻松访问所有配置属性。

$config->set(ConsumerConfig::CLIENT_ID_CONFIG, $this->configuration->getIdWithHostname());

消费

有一个可用的NamedConsumer接口。当您的消费者实现该接口时,此包会自动注册它。

这是一个简单的消费者示例,然后可以通过bin/console kafka:consumer:run consumer1运行

<?php

declare(strict_types=1);

namespace Your\AppNamespace;

use SimPod\Kafka\Clients\Consumer\ConsumerConfig;
use SimPod\Kafka\Clients\Consumer\KafkaConsumer;
use SimPod\KafkaBundle\Kafka\Configuration;
use SimPod\KafkaBundle\Kafka\Clients\Consumer\NamedConsumer;

final class ExampleKafkaConsumer implements NamedConsumer
{
    private Configuration $configuration;

    public function __construct(Configuration $configuration)
    {
        $this->configuration = $configuration;
    }

    public function run(): void
    {
        $kafkaConsumer = new KafkaConsumer($this->getConfig());

        $kafkaConsumer->subscribe(['topic1']);

        while (true) {
            ...
        }
    }
    
    public function getName(): string {
        return 'consumer1';    
    }

    private function getConfig(): ConsumerConfig
    {
        $config = new ConsumerConfig();

        $config->set(ConsumerConfig::BOOTSTRAP_SERVERS_CONFIG, $this->configuration->getBootstrapServers());
        $config->set(ConsumerConfig::ENABLE_AUTO_COMMIT_CONFIG, false);
        $config->set(ConsumerConfig::CLIENT_ID_CONFIG, $this->configuration->getClientIdWithHostname());
        $config->set(ConsumerConfig::AUTO_OFFSET_RESET_CONFIG, 'earliest');
        $config->set(ConsumerConfig::GROUP_ID_CONFIG, 'consumer_group');

        return $config;
    }
}

开发

已将kwn/php-rdkafka-stubs列为开发依赖项,以便正确地将php-rdkafka扩展与IDE集成。