sts-gaming-group/kafka-bundle

用于消费和发送Apache Kafka消息的捆绑包。

安装数: 15,996

依赖者: 0

建议者: 0

安全: 0

星级: 30

关注者: 5

分支: 6

开放问题: 1

类型:symfony-bundle

1.3.0 2023-12-14 08:40 UTC

This package is auto-updated.

Last update: 2024-09-04 08:35:05 UTC


README

kafka-bundle

技术栈

  • PHP >=7.4
  • PHP的ext-rdkafka
  • symfony组件:请参考composer.json require部分获取所需的包版本

快速开始

如果您想在您的Symfony项目中安装它

composer require sts-gaming-group/kafka-bundle

示例项目

如果您想在Symfony项目中测试此捆绑包的功能,请参阅https://github.com/sts-gaming-group/kafka-bundle-app项目,该项目包含kafka-bundle和方便的docker-compose文件。

基本配置

  1. sts_gaming_group_kafka.yaml添加到配置文件夹中,路径为config/packages/sts_gaming_group_kafka.yaml或特定的环境文件夹中,例如config/packages/prod/sts_gaming_group_kafka.yaml
  2. 例如,在sts_gaming_group_kafka.yaml中添加配置
sts_gaming_group_kafka:
 consumers:
   instances:
     App\Consumers\ExampleConsumer:
       brokers: [ '127.0.0.1:9092', '127.0.0.2:9092', '127.0.0.3:9092' ]
       schema_registry: 'http://127.0.0.1:8081'
       group_id: 'some_group_id'
       topics: [ 'some_topic' ]
 producers:    
   instances:
     App\Producers\ExampleProducer:
       brokers: [ '127.0.0.1:9092', '127.0.0.2:9092', '127.0.0.3:9092' ]    
       producer_topic: 'my_app_failed_message_topic'
  1. 通常,您会希望在yaml文件中保留您的kafka配置,但您也可以直接在CLI中传递配置,例如
bin/console kafka:consumers:consume example_consumer --group_id some_other_group_id

目前,CLI中传递的选项仅适用于由命令kafka:consumers:consume运行的消费者。

配置在运行时解析。优先级如下

  • CLI中传递的配置始终优先
  • 按消费者/生产者传递的配置(sts_gaming_group_kafka.yaml中的consumers:producers:部分的instances:部分)

消费消息

  1. 创建消费者
<?php

declare(strict_types=1);

namespace App\Consumers;

use StsGamingGroup\KafkaBundle\Client\Consumer\Message;
use StsGamingGroup\KafkaBundle\Client\Contract\ConsumerInterface;
use StsGamingGroup\KafkaBundle\RdKafka\Context;

class ExampleConsumer implements ConsumerInterface
{
    public const CONSUMER_NAME = 'example_consumer';

    public function consume(Message $message, Context $context): void
    {
        $data = $message->getData(); // contains denormalized data from Kafka
        $retryNo = $context->getRetryNo();  // contains retry count in case of a failure
    }

    public function handleException(\Exception $exception, Context $context): void
    {
        // log it or i.e. produce to retry topic based on type of exception
    }

    public function getName(): string
    {
        return self::CONSUMER_NAME; // consumer unique name in your project
    }
 }
  1. 如果配置正确(合适的代理、主题以及很可能是模式注册表),您应该能够运行您的消费者并接收消息
bin/console kafka:consumers:consume example_consumer

重试失败消息

要触发回退重试,您的消费者应在consume方法中抛出RecoverableMessageException。您还必须在sts_gaming_group_kafka.yaml中配置一些重试选项

use StsGamingGroup\KafkaBundle\Client\Consumer\Exception\RecoverableMessageException;
sts_gaming_group_kafka:
  consumers:
     ... # global consumers configurations
    instances:
      App\Consumers\ExampleConsumer:
        ... # other configurations
        max_retries: 3 # defaults to 0 which means it is disabled
        max_retry_delay: 2500 # defaults to 2000 ms
        retry_delay: 300 # defaults to 200 ms
        retry_multiplier: 3 # defaults to 2

使用此配置,您将最多收到相同消息4次(第一次消费+3次重试)。在第一次重试之前,将会有300毫秒的延迟。在第二次重试之前,将会有900毫秒的延迟(retry_delay * retry_multiplier)。在第三次重试之前,将会有2500毫秒的延迟(max_retry_delay)。重要的是要记住,在Kafka中提交偏移量,以防消息永久失败(如果enable_auto_commit设置为false)。

消费者中的任何未捕获的异常都会使消费者关闭。

处理偏移量

默认情况下,选项enable.auto.commit设置为true。在这种情况下,在消费消息后,偏移量将自动提交到Kafka代理。偏移量提交的频率由选项auto.commit.interval.ms(默认为50ms)描述。这意味着每50ms Librdkafka(管理PHP进程底下的Kafka的库)将当前存储的偏移量发送到Kafka代理。这也意味着,如果您在49ms后终止PHP进程,则消息将不会被提交,并且在重新启动消费者后,您将再次收到相同的消息。这种情况非常不可能,但可能会发生。

Apache Kafka保证每个消息、每个主题、每个消费者组.id的至少一次投递。这种行为的一个含义是,如果偏移量未提交给代理,Kafka会再次发送相同消息。开发者需要处理这种情况。

要确保偏移量提交的100%正确,一种方法是通过手动处理将enable.auto.commit设置为false。然后您可以使用CommitOffsetTrait::commitOffset()方法将当前偏移量发送到代理。

<?php

declare(strict_types=1);

namespace App\Consumers;

use StsGamingGroup\KafkaBundle\Client\Traits\CommitOffsetTrait;

class ExampleConsumer implements ConsumerInterface
{
   use CommitOffsetTrait;
   
   public function consume(Message $message, Context $context): void
   {
      // process the message
      $this->commitOffset($context); // manually commits the offset
   }
}

手动提交偏移量几乎可以保证您不会再次收到相同的消息。然而,仍然有很小的可能性,偏移量不会保存到代理,例如在网络问题的情况下。同样,开发者需要处理这种情况(可能在提交偏移量时使用try...catch块)。

然而,手动提交有一个很大的缺点——它们很慢。原因是提交必须在您的PHP进程中完成,因此会阻塞主线程。每次提交可能需要约40-50毫秒,对于Kafka来说这是极其缓慢的。您可以将true作为第二个参数传递给$this->commitOffset($context, true);,这样手动提交就会异步处理,从而变得更快——但如果您的PHP进程在提交时死亡,一些偏移量可能不会发送到代理(当将enable.auto.commit设置为true且进程死亡时的情况几乎相同)。

根据上述情况,建议保持enable.auto.commit选项设置为true,并在您的应用程序内部处理可能的重复消息。

解码器

解码器的作用是将原始Kafka数据(json、avro、纯文本或其他任何格式)转换为PHP数组(或实际上您想要的任何格式)。有三种解码器可用

  • AvroDecoder
  • JsonDecoder(它实际上只对Kafka原始数据进行json_decode操作)
  • PlainDecoder(它实际上不解码消息,而是将原始版本传递给您)

默认情况下,此包使用AvroDecoder,并需要schema_registry配置。Schema registry应包含消费消息的schema版本。

您也可以通过实现DecoderInterface来实现自己的解码器

<?php

namespace App\Decoder;

use StsGamingGroup\KafkaBundle\Configuration\ResolvedConfiguration;
use StsGamingGroup\KafkaBundle\Decoder\Contract\DecoderInterface;

class CustomDecoder implements DecoderInterface
{
    public function decode(ResolvedConfiguration $configuration, string $message)
    {
        // $configuration contains values from sts_gaming_group_kafka.yaml or CLI
        // $message contains raw value from Kafka
    }
}

将其注册到您的配置中

sts_gaming_group_kafka:
  consumers:
    instances:
      App\Consumers\ExampleConsumer:
        decoder: App\Decoder\CustomDecoder

反规范化器

您还可能希望将消息反规范化为某种DTO或其他您希望的对象。默认情况下,此捆绑包不会将消息反规范化为任何对象,而是传递一个数组(来自AvroDecoder)。

您的反规范化器必须实现DenormalizerInterface,并需要您实现denormalize方法。返回值可以是任何类型。

<?php

declare(strict_types=1);

namespace App\Normalizer;

use StsGamingGroup\KafkaBundle\Denormalizer\Contract\DenormalizerInterface;

class CustomDenormalizer implements DenormalizerInterface
{
    public function denormalize($data): MessageDTO
    {
        // $data is an array which comes from AvroDecoder or some other registered Decoder
        $messageDTO = new MessageDTO();
        $messageDTO->setName($data['name']);

        return $messageDTO;
    }
}

将其注册到您的配置中

sts_gaming_group_kafka:
  consumers:
    instances:
      App\Consumers\ExampleConsumer:
        denormalizer: App\Normalizer\CustomDenormalizer

在消费者中接收它

<?php

...

class ExampleConsumer implements ConsumerInterface
{
    public function consume(Message $message, Context $context): void
    {
        $messageDTO = $message->getData(); // $messageDTO comes from CustomDenormalizer
    }
}

验证器

在反规范化之前或之后,您可能想要验证给定的对象是否应该传递给您的消费者——例如,您可能想要过滤掉来自代理的不完整数据。

  1. 创建验证器
<?php

declare(strict_types=1);

namespace App\Validator;

use StsGamingGroup\KafkaBundle\Validator\Contract\ValidatorInterface;
use StsGamingGroup\KafkaBundle\Validator\Validator;

class ExampleValidator implements ValidatorInterface
{
    public function validate($decoded): bool
    {
        return !array_key_exists('foo', $decoded);
    }

    public function failureReason($decoded): string
    {
        return sprintf('Missing foo key in decoded message.');
    }
    
    public function type() : string
    {
       return Validator::PRE_DENORMALIZE_TYPE; // runs before denormalization
       // Validator::POST_DENORMALIZE_TYPE // runs after denormalization
    }
}

将其注册到您的配置中

sts_gaming_group_kafka:
  consumers:
    instances:
      App\Consumers\ExampleConsumer:
        validators: 
         - App\Validator\ExampleValidator
         - App\Validator\SomeOtherValidator      

您可以为单个消费者附加多个验证器。调用验证器的优先级与您在sts_gaming_group_kafka.yaml中定义的完全一致,因此在这种情况下,ExampleValidator首先被调用,然后是SomeOtherValidator。

如果验证器返回false,则抛出一个ValidatorException实例。

 ...
 
 use StsGamingGroup\KafkaBundle\Validator\Exception\ValidationException;
 
 public function handleException(\Exception $exception, Context $context)
 {
     if ($exception instanceof ValidationException) {      
         $decoded = $exception->getData();
         $this->logger->info(
             sprintf(
                 'Message has not passed validation. Id: %s  | Reason: %s', 
                 $decoded['id'],
                 $exception->getFailedReason())
         );
     }
 }

未通过验证的消息的偏移量将自动提交。

事件

消费者使用symfony/event-dispatcher组件作为可选依赖项来调度事件

仅针对当前运行的消费者

  • sts_gaming_group_kafka.pre_message_consumed_{consumer_name} 例如:sts_gaming_group_kafka.pre_message_consumed_example_consumer
  • sts_gaming_group_kafka.post_message_consumed_{consumer_name} 例如:sts_gaming_group_kafka.post_message_consumed_example_consumer

适用于所有消费者的全局事件

  • StsGamingGroup\KafkaBundle\Event\PreMessageConsumedEvent
  • StsGamingGroup\KafkaBundle\Event\PostMessageConsumedEvent

正如其名所示 - 在消息被消费之前,首先派发第一个事件,第二个事件则在消息被消费之后立即派发(不考虑重试机制,消息必须被完全处理才能派发事件)。您可以使用 symfony 事件订阅者/监听器来挂钩这些事件,即

use Symfony\Component\EventDispatcher\EventSubscriberInterface;
use StsGamingGroup\KafkaBundle\Event\PostMessageConsumedEvent;
use StsGamingGroup\KafkaBundle\Event\PreMessageConsumedEvent;

class ExampleConsumerEventSubscriber implements EventSubscriberInterface
{
    public static function getSubscribedEvents(): array
    {
        return [
            PreMessageConsumedEvent::getEventName('example_consumer') => 'onPreMessageConsumed',
            PostMessageConsumedEvent::getEventName('example_consumer') => 'onPostMessageConsumed',
            PreMessageConsumedEvent::class => 'onGlobalPreMessageConsumed',
            PostMessageConsumedEvent::class => 'onGlobalPostMessageConsumed'
        ];
    }

    public function onPreMessageConsumed(PreMessageConsumedEvent $event): void
    {
        $event->getConsumedMessages(); // number of processed messages
        $event->getConsumptionTimeMs(); // how long consumer is running
    }

    public function onPostMessageConsumed(PostMessageConsumedEvent $event): void
    {
        $event->getConsumedMessages();
        $event->getConsumptionTimeMs();
    }
}

Kafka回调

Librdkafka(PHP底层使用的C/C++库)提供了一些回调,您可以在不同的场景中使用(消费/生产/错误处理/日志记录)。您的消费者必须实现CallableInterface,这要求您定义callbacks方法。此方法应返回您希望自行处理的回调数组。

<?php

declare(strict_types=1);

namespace App\Consumers;

use StsGamingGroup\KafkaBundle\Client\Contract\CallableInterface;
use StsGamingGroup\KafkaBundle\Client\Contract\ConsumerInterface;
use StsGamingGroup\KafkaBundle\RdKafka\Callbacks;

class ExampleConsumer implements ConsumerInterface, CallableInterface
{
    public function callbacks(): array
    {
        return [
            Callbacks::OFFSET_COMMIT_CALLBACK => static function (
                \RdKafka\KafkaConsumer $kafkaConsumer,
                int $error,
                array $partitions
            ) {
                // call some action according to i.e. error
            },
            Callbacks::LOG_CALLBACK => static function ($kafka, int $level, string $facility, string $message) {
                // log it somewhere
            }
        ];
    }
    
    // other methods
}
 

生产消息

  1. 要生产消息,您必须在sts_gaming_group_kafka.yaml中配置一些选项
producers:
 instances:
   App\Producers\ExampleProducer:
     brokers: [ '127.0.0.1:9092', '127.0.0.2:9092', '127.0.0.3:9092' ]
     producer_topic: 'topic_i_want_to_produce_to' #only one topic allowed per producer
  1. 创建您想要操作的数据对象(例如某些实体或DTO)
<?php

declare(strict_types=1);

namespace App\Producers;

class SomeEntity
{
    private int $id;
    private string $name;

    public function __construct(int $id, string $name)
    {
        $this->id = $id;
        $this->name = $name;
    }

    public function toArray(): array
    {
        return [
            'id' => $this->id,
            'name' => $this->name
        ];
    }
}
  1. 创建一个生产者,该生产者将处理您的数据对象并创建Kafka消息
<?php

declare(strict_types=1);

namespace App\Producers;

use StsGamingGroup\KafkaBundle\Client\Contract\ProducerInterface;
use StsGamingGroup\KafkaBundle\Client\Producer\Message;

class ExampleProducer implements ProducerInterface
{
    public function produce($data): Message
    {
        /** @var SomeEntity $data */
        return new Message(json_encode($data->toArray()), $data->getId());
        // first argument of Message is the payload as a string
        // second argument is a message key which is used to help kafka partition messages
    }

    public function supports($data): bool
    {
        // in case of many producers you should check what $data is passed here
        return $data instanceof SomeEntity;
    }
}
  1. 通过调用ProducerClient::produce()推送消息,即在您的Command类中某个位置
<?php

declare(strict_types=1);

namespace App\Command;

use StsGamingGroup\KafkaBundle\Client\Producer\ProducerClient;

class ExampleCommand extends Command
{ 
 public function __construct(ProducerClient $client, SomeEntityRepository $repository)
 {
     $this->client = $client;
     $this->repository = $repository;
 }
 
 protected function execute(InputInterface $input, OutputInterface $output): int
 {
     $someEntities = $this->repository->findAll();
     foreach ($someEntities as $entity) {
         $this->client->produce($entity);
     }

     $this->client->flush(); // call flush after produce() method has finished

     return Command::SUCCESS;
 }
  1. 要将消息生产到特定的分区,您的生产者可以实现PartitionAwareProducerInterface
<?php

declare(strict_types=1);

namespace App\Producers;

use StsGamingGroup\KafkaBundle\Client\Contract\PartitionAwareProducerInterface;
use StsGamingGroup\KafkaBundle\Client\Producer\Message;

class ExampleProducer implements ProducerInterface
{
    public function produce($data): Message
    {
        /** @var SomeEntity $data */
        return new Message(json_encode($data->toArray()), $data->getId());
    }

    public function getPartition($data, ResolvedConfiguration $configuration): int
    {
        /** @var SomeEntity $data */
        return $data->getId() % 16; // calculating modulo from object id to produce to maximum of 16 partitions (0-15)
    }
}
  1. 您还可以将回调数组设置到生产者中,例如,检查消息是否已成功发送。您生产者类应实现CallableInterface。
use StsGamingGroup\KafkaBundle\Client\Contract\CallableInterface;
use StsGamingGroup\KafkaBundle\Client\Contract\ProducerInterface;

class ExampleProducer implements ProducerInterface, CallableInterface
{
    public function callbacks(): array
    {
        // callbacks array just like in Consumer example
    }
}
  1. 可以在运行时配置ProducerClient的其他选项
$this->producerClient
   ->setPollingBatch(25000)   
   ->setPollingTimeoutMs(1000)
   ->setFlushTimeoutMs(500)
   ->setMaxFlushRetries(10);
  • 轮询批量 - 在多少消息(如上面的示例中的$someEntities循环)之后,ProducerClient应该调用librdkafka的poll方法。如果您生产大消息且不经常调用poll,可能会出现librdkafka内部队列满的问题。此外,消费者在调用poll之前不会收到任何消息。因此,建议将轮询批量数保持在合理水平,例如10000或20000
  • 轮询超时毫秒 - librdkafka等待轮询消息完成的时间
  • 刷新超时毫秒,最大刷新重试次数 - 在调用flush()之后,ProducerClient将尝试刷新librdkafka内部队列中剩余的消息。剩余消息是那些尚未被poll的消息。

自定义配置

有时您可能希望向您的消费者对象传递一些额外的选项。您可以添加自己的配置

<?php

declare(strict_types=1);

namespace App\Configuration;

use StsGamingGroup\KafkaBundle\Configuration\Contract\ConfigurationInterface;
use Symfony\Component\Console\Input\InputOption;

class Divisor implements ConfigurationInterface
{
    public function getName(): string
    {
        return 'divisor';
    }

    public function getMode(): int
    {
        return InputOption::VALUE_REQUIRED;
    }

    public function getDescription(): string
    {
        return 'Option description';
    }

    public function isValueValid($value): bool
    {
        return is_numeric($value) && $value > 0;
    }

    public function getDefaultValue(): int
    {
        return 1;
    }
}

自定义选项只能通过CLI传递

bin/console kafka:consumers:consume example_consumer --divisor 4 --remainder 1 --group_id first_group
bin/console kafka:consumers:consume example_consumer --divisor 4 --remainder 2 --group_id second_group
etc.

您将在consume方法中收到它,并可以据此采取行动。

class ExampleConsumer implements ConsumerInterface
{
    public const CONSUMER_NAME = 'example_consumer';

    public function consume(Message $message, Context $context): void
    {
        $divisor = $context->getValue(Divisor::NAME);
        $remainder = $context->getValue(Remainder::NAME);
        
        if ($message->getId() % $divisor !== $remainder) {
            return; // let's skip that message
        }
        
        // process message normally
    }
}

上面的示例显示了如何通过执行例如4个消费者/命令来扩展应用程序,这些消费者/命令具有不同的剩余量和组ID。如果您的主题只有一个分区且无法扩展消费者,您可能需要采取这种策略。

显示当前消费者/生产者配置

您可以通过调用以下命令来显示将传递给消费者的当前配置

bin/console kafka:consumers:describe example_consumer
┌───────────────────────────┬─────────────────────────────────────────────────────────┐
│ configuration             │ value                                                   │
├───────────────────────────┼─────────────────────────────────────────────────────────┤
│ class                     │ App\Consumers\ExampleConsumer                           │
│ topics                    │ some_topic                                              │
│ group_id                  │ some_group_id                                           │
│ brokers                   │ 127.0.0.1:9092, 127.0.0.2:9092, 127.0.0.3:9092          │
│ offset_store_method       │ broker                                                  │
│ timeout                   │ 1000                                                    │
│ auto_offset_reset         │ smallest                                                │
│ auto_commit_interval_ms   │ 5                                                       │
│ decoder                   │ StsGamingGroup\KafkaBundle\Decoder\AvroDecoder                     │
│ schema_registry           │ http://127.0.0.1:8081                                   │
│ enable_auto_offset_store  │ true                                                    │
│ enable_auto_commit        │ true                                                    │
│ log_level                 │ 3                                                       │
│ register_missing_schemas  │ false                                                   │
│ register_missing_subjects │ false                                                   │
│ denormalizer              │ App\Normalizer\CustomDenormalizer                       │
│ max_retries               │ 3                                                       │
│ retry_delay               │ 250                                                     │
│ retry_multiplier          │ 3                                                       │
│ max_retry_delay           │ 3000                                                    │
└───────────────────────────┴─────────────────────────────────────────────────────────┘

您可以通过运行以下命令来显示生产者配置

bin/console kafka:producers:describe
┌────────────────────┬─────────────────────────────────────────────────────────┐
│ configuration      │ value                                                   │
├────────────────────┼─────────────────────────────────────────────────────────┤
│ class              │ App\Producers\ExampleProducer                           │
│ brokers            │ 127.0.0.1:9092, 127.0.0.2:9092, 127.0.0.3:9092          │
│ log_level          │ 3                                                       │
│ producer_partition │ -1                                                      │
│ producer_topic     │ topic_i_want_to_produce_to                              │
└────────────────────┴─────────────────────────────────────────────────────────┘

许可证

本软件包在MIT许可证下分发。请参阅LICENSE.md获取更多详细信息。