sts-gaming-group / kafka-bundle
用于消费和发送Apache Kafka消息的捆绑包。
Requires
- php: >=7.4
- ext-json: *
- ext-rdkafka: *
- flix-tech/avro-serde-php: ^2.0
- symfony/config: ^4.3.3|^5.0|^6.0
- symfony/console: ^4.0|^5.2.0|^6.0
- symfony/dependency-injection: ^4.3.3|^5.2|^6.0
- symfony/http-kernel: ^4.3|^5.2.1|^6.0
- symfony/options-resolver: ~4.3|^5.2|^6.0
Requires (Dev)
- phpstan/extension-installer: ^1.1
- phpstan/phpstan: ^0.12.75
- phpstan/phpstan-phpunit: ^0.12.17
- phpstan/phpstan-symfony: ^0.12.18
- squizlabs/php_codesniffer: ^3.5
- symfony/dotenv: 5.2.*|^6.0
- symfony/event-dispatcher: ^5.2|^6.0
- symfony/framework-bundle: ^5.2|^6.0
- symfony/phpunit-bridge: ^5.2|^6.0
This package is auto-updated.
Last update: 2024-09-04 08:35:05 UTC
README
技术栈
- PHP >=7.4
- PHP的ext-rdkafka
- symfony组件:请参考composer.json
require
部分获取所需的包版本
快速开始
如果您想在您的Symfony项目中安装它
composer require sts-gaming-group/kafka-bundle
示例项目
如果您想在Symfony项目中测试此捆绑包的功能,请参阅https://github.com/sts-gaming-group/kafka-bundle-app项目,该项目包含kafka-bundle和方便的docker-compose文件。
基本配置
- 将sts_gaming_group_kafka.yaml添加到配置文件夹中,路径为config/packages/sts_gaming_group_kafka.yaml或特定的环境文件夹中,例如config/packages/prod/sts_gaming_group_kafka.yaml
- 例如,在sts_gaming_group_kafka.yaml中添加配置
sts_gaming_group_kafka: consumers: instances: App\Consumers\ExampleConsumer: brokers: [ '127.0.0.1:9092', '127.0.0.2:9092', '127.0.0.3:9092' ] schema_registry: 'http://127.0.0.1:8081' group_id: 'some_group_id' topics: [ 'some_topic' ] producers: instances: App\Producers\ExampleProducer: brokers: [ '127.0.0.1:9092', '127.0.0.2:9092', '127.0.0.3:9092' ] producer_topic: 'my_app_failed_message_topic'
- 通常,您会希望在yaml文件中保留您的kafka配置,但您也可以直接在CLI中传递配置,例如
bin/console kafka:consumers:consume example_consumer --group_id some_other_group_id
目前,CLI中传递的选项仅适用于由命令kafka:consumers:consume
运行的消费者。
配置在运行时解析。优先级如下
- CLI中传递的配置始终优先
- 按消费者/生产者传递的配置(sts_gaming_group_kafka.yaml中的
consumers:
或producers:
部分的instances:
部分)
消费消息
- 创建消费者
<?php declare(strict_types=1); namespace App\Consumers; use StsGamingGroup\KafkaBundle\Client\Consumer\Message; use StsGamingGroup\KafkaBundle\Client\Contract\ConsumerInterface; use StsGamingGroup\KafkaBundle\RdKafka\Context; class ExampleConsumer implements ConsumerInterface { public const CONSUMER_NAME = 'example_consumer'; public function consume(Message $message, Context $context): void { $data = $message->getData(); // contains denormalized data from Kafka $retryNo = $context->getRetryNo(); // contains retry count in case of a failure } public function handleException(\Exception $exception, Context $context): void { // log it or i.e. produce to retry topic based on type of exception } public function getName(): string { return self::CONSUMER_NAME; // consumer unique name in your project } }
- 如果配置正确(合适的代理、主题以及很可能是模式注册表),您应该能够运行您的消费者并接收消息
bin/console kafka:consumers:consume example_consumer
重试失败消息
要触发回退重试,您的消费者应在consume
方法中抛出RecoverableMessageException
。您还必须在sts_gaming_group_kafka.yaml中配置一些重试选项
use StsGamingGroup\KafkaBundle\Client\Consumer\Exception\RecoverableMessageException;
sts_gaming_group_kafka: consumers: ... # global consumers configurations instances: App\Consumers\ExampleConsumer: ... # other configurations max_retries: 3 # defaults to 0 which means it is disabled max_retry_delay: 2500 # defaults to 2000 ms retry_delay: 300 # defaults to 200 ms retry_multiplier: 3 # defaults to 2
使用此配置,您将最多收到相同消息4次(第一次消费+3次重试)。在第一次重试之前,将会有300毫秒的延迟。在第二次重试之前,将会有900毫秒的延迟(retry_delay * retry_multiplier)。在第三次重试之前,将会有2500毫秒的延迟(max_retry_delay)。重要的是要记住,在Kafka中提交偏移量,以防消息永久失败(如果enable_auto_commit设置为false)。
消费者中的任何未捕获的异常都会使消费者关闭。
处理偏移量
默认情况下,选项enable.auto.commit
设置为true。在这种情况下,在消费消息后,偏移量将自动提交到Kafka代理。偏移量提交的频率由选项auto.commit.interval.ms
(默认为50ms)描述。这意味着每50ms Librdkafka(管理PHP进程底下的Kafka的库)将当前存储的偏移量发送到Kafka代理。这也意味着,如果您在49ms后终止PHP进程,则消息将不会被提交,并且在重新启动消费者后,您将再次收到相同的消息。这种情况非常不可能,但可能会发生。
Apache Kafka保证每个消息、每个主题、每个消费者组.id的至少一次投递。这种行为的一个含义是,如果偏移量未提交给代理,Kafka会再次发送相同消息。开发者需要处理这种情况。
要确保偏移量提交的100%正确,一种方法是通过手动处理将enable.auto.commit
设置为false。然后您可以使用CommitOffsetTrait::commitOffset()
方法将当前偏移量发送到代理。
<?php declare(strict_types=1); namespace App\Consumers; use StsGamingGroup\KafkaBundle\Client\Traits\CommitOffsetTrait; class ExampleConsumer implements ConsumerInterface { use CommitOffsetTrait; public function consume(Message $message, Context $context): void { // process the message $this->commitOffset($context); // manually commits the offset } }
手动提交偏移量几乎可以保证您不会再次收到相同的消息。然而,仍然有很小的可能性,偏移量不会保存到代理,例如在网络问题的情况下。同样,开发者需要处理这种情况(可能在提交偏移量时使用try...catch
块)。
然而,手动提交有一个很大的缺点——它们很慢。原因是提交必须在您的PHP进程中完成,因此会阻塞主线程。每次提交可能需要约40-50毫秒,对于Kafka来说这是极其缓慢的。您可以将true
作为第二个参数传递给$this->commitOffset($context, true);
,这样手动提交就会异步处理,从而变得更快——但如果您的PHP进程在提交时死亡,一些偏移量可能不会发送到代理(当将enable.auto.commit
设置为true且进程死亡时的情况几乎相同)。
根据上述情况,建议保持enable.auto.commit
选项设置为true,并在您的应用程序内部处理可能的重复消息。
解码器
解码器的作用是将原始Kafka数据(json、avro、纯文本或其他任何格式)转换为PHP数组(或实际上您想要的任何格式)。有三种解码器可用
- AvroDecoder
- JsonDecoder(它实际上只对Kafka原始数据进行json_decode操作)
- PlainDecoder(它实际上不解码消息,而是将原始版本传递给您)
默认情况下,此包使用AvroDecoder,并需要schema_registry配置。Schema registry应包含消费消息的schema版本。
您也可以通过实现DecoderInterface
来实现自己的解码器
<?php namespace App\Decoder; use StsGamingGroup\KafkaBundle\Configuration\ResolvedConfiguration; use StsGamingGroup\KafkaBundle\Decoder\Contract\DecoderInterface; class CustomDecoder implements DecoderInterface { public function decode(ResolvedConfiguration $configuration, string $message) { // $configuration contains values from sts_gaming_group_kafka.yaml or CLI // $message contains raw value from Kafka } }
将其注册到您的配置中
sts_gaming_group_kafka: consumers: instances: App\Consumers\ExampleConsumer: decoder: App\Decoder\CustomDecoder
反规范化器
您还可能希望将消息反规范化为某种DTO或其他您希望的对象。默认情况下,此捆绑包不会将消息反规范化为任何对象,而是传递一个数组(来自AvroDecoder)。
您的反规范化器必须实现DenormalizerInterface,并需要您实现denormalize
方法。返回值可以是任何类型。
<?php declare(strict_types=1); namespace App\Normalizer; use StsGamingGroup\KafkaBundle\Denormalizer\Contract\DenormalizerInterface; class CustomDenormalizer implements DenormalizerInterface { public function denormalize($data): MessageDTO { // $data is an array which comes from AvroDecoder or some other registered Decoder $messageDTO = new MessageDTO(); $messageDTO->setName($data['name']); return $messageDTO; } }
将其注册到您的配置中
sts_gaming_group_kafka: consumers: instances: App\Consumers\ExampleConsumer: denormalizer: App\Normalizer\CustomDenormalizer
在消费者中接收它
<?php ... class ExampleConsumer implements ConsumerInterface { public function consume(Message $message, Context $context): void { $messageDTO = $message->getData(); // $messageDTO comes from CustomDenormalizer } }
验证器
在反规范化之前或之后,您可能想要验证给定的对象是否应该传递给您的消费者——例如,您可能想要过滤掉来自代理的不完整数据。
- 创建验证器
<?php declare(strict_types=1); namespace App\Validator; use StsGamingGroup\KafkaBundle\Validator\Contract\ValidatorInterface; use StsGamingGroup\KafkaBundle\Validator\Validator; class ExampleValidator implements ValidatorInterface { public function validate($decoded): bool { return !array_key_exists('foo', $decoded); } public function failureReason($decoded): string { return sprintf('Missing foo key in decoded message.'); } public function type() : string { return Validator::PRE_DENORMALIZE_TYPE; // runs before denormalization // Validator::POST_DENORMALIZE_TYPE // runs after denormalization } }
将其注册到您的配置中
sts_gaming_group_kafka: consumers: instances: App\Consumers\ExampleConsumer: validators: - App\Validator\ExampleValidator - App\Validator\SomeOtherValidator
您可以为单个消费者附加多个验证器。调用验证器的优先级与您在sts_gaming_group_kafka.yaml中定义的完全一致,因此在这种情况下,ExampleValidator首先被调用,然后是SomeOtherValidator。
如果验证器返回false,则抛出一个ValidatorException实例。
... use StsGamingGroup\KafkaBundle\Validator\Exception\ValidationException; public function handleException(\Exception $exception, Context $context) { if ($exception instanceof ValidationException) { $decoded = $exception->getData(); $this->logger->info( sprintf( 'Message has not passed validation. Id: %s | Reason: %s', $decoded['id'], $exception->getFailedReason()) ); } }
未通过验证的消息的偏移量将自动提交。
事件
消费者使用symfony/event-dispatcher组件作为可选依赖项来调度事件
仅针对当前运行的消费者
- sts_gaming_group_kafka.pre_message_consumed_{consumer_name} 例如:sts_gaming_group_kafka.pre_message_consumed_example_consumer
- sts_gaming_group_kafka.post_message_consumed_{consumer_name} 例如:sts_gaming_group_kafka.post_message_consumed_example_consumer
适用于所有消费者的全局事件
- StsGamingGroup\KafkaBundle\Event\PreMessageConsumedEvent
- StsGamingGroup\KafkaBundle\Event\PostMessageConsumedEvent
正如其名所示 - 在消息被消费之前,首先派发第一个事件,第二个事件则在消息被消费之后立即派发(不考虑重试机制,消息必须被完全处理才能派发事件)。您可以使用 symfony 事件订阅者/监听器来挂钩这些事件,即
use Symfony\Component\EventDispatcher\EventSubscriberInterface; use StsGamingGroup\KafkaBundle\Event\PostMessageConsumedEvent; use StsGamingGroup\KafkaBundle\Event\PreMessageConsumedEvent; class ExampleConsumerEventSubscriber implements EventSubscriberInterface { public static function getSubscribedEvents(): array { return [ PreMessageConsumedEvent::getEventName('example_consumer') => 'onPreMessageConsumed', PostMessageConsumedEvent::getEventName('example_consumer') => 'onPostMessageConsumed', PreMessageConsumedEvent::class => 'onGlobalPreMessageConsumed', PostMessageConsumedEvent::class => 'onGlobalPostMessageConsumed' ]; } public function onPreMessageConsumed(PreMessageConsumedEvent $event): void { $event->getConsumedMessages(); // number of processed messages $event->getConsumptionTimeMs(); // how long consumer is running } public function onPostMessageConsumed(PostMessageConsumedEvent $event): void { $event->getConsumedMessages(); $event->getConsumptionTimeMs(); } }
Kafka回调
Librdkafka(PHP底层使用的C/C++库)提供了一些回调,您可以在不同的场景中使用(消费/生产/错误处理/日志记录)。您的消费者必须实现CallableInterface,这要求您定义callbacks
方法。此方法应返回您希望自行处理的回调数组。
<?php declare(strict_types=1); namespace App\Consumers; use StsGamingGroup\KafkaBundle\Client\Contract\CallableInterface; use StsGamingGroup\KafkaBundle\Client\Contract\ConsumerInterface; use StsGamingGroup\KafkaBundle\RdKafka\Callbacks; class ExampleConsumer implements ConsumerInterface, CallableInterface { public function callbacks(): array { return [ Callbacks::OFFSET_COMMIT_CALLBACK => static function ( \RdKafka\KafkaConsumer $kafkaConsumer, int $error, array $partitions ) { // call some action according to i.e. error }, Callbacks::LOG_CALLBACK => static function ($kafka, int $level, string $facility, string $message) { // log it somewhere } ]; } // other methods }
生产消息
- 要生产消息,您必须在sts_gaming_group_kafka.yaml中配置一些选项
producers: instances: App\Producers\ExampleProducer: brokers: [ '127.0.0.1:9092', '127.0.0.2:9092', '127.0.0.3:9092' ] producer_topic: 'topic_i_want_to_produce_to' #only one topic allowed per producer
- 创建您想要操作的数据对象(例如某些实体或DTO)
<?php declare(strict_types=1); namespace App\Producers; class SomeEntity { private int $id; private string $name; public function __construct(int $id, string $name) { $this->id = $id; $this->name = $name; } public function toArray(): array { return [ 'id' => $this->id, 'name' => $this->name ]; } }
- 创建一个生产者,该生产者将处理您的数据对象并创建Kafka消息
<?php declare(strict_types=1); namespace App\Producers; use StsGamingGroup\KafkaBundle\Client\Contract\ProducerInterface; use StsGamingGroup\KafkaBundle\Client\Producer\Message; class ExampleProducer implements ProducerInterface { public function produce($data): Message { /** @var SomeEntity $data */ return new Message(json_encode($data->toArray()), $data->getId()); // first argument of Message is the payload as a string // second argument is a message key which is used to help kafka partition messages } public function supports($data): bool { // in case of many producers you should check what $data is passed here return $data instanceof SomeEntity; } }
- 通过调用ProducerClient::produce()推送消息,即在您的Command类中某个位置
<?php declare(strict_types=1); namespace App\Command; use StsGamingGroup\KafkaBundle\Client\Producer\ProducerClient; class ExampleCommand extends Command { public function __construct(ProducerClient $client, SomeEntityRepository $repository) { $this->client = $client; $this->repository = $repository; } protected function execute(InputInterface $input, OutputInterface $output): int { $someEntities = $this->repository->findAll(); foreach ($someEntities as $entity) { $this->client->produce($entity); } $this->client->flush(); // call flush after produce() method has finished return Command::SUCCESS; }
- 要将消息生产到特定的分区,您的生产者可以实现PartitionAwareProducerInterface
<?php declare(strict_types=1); namespace App\Producers; use StsGamingGroup\KafkaBundle\Client\Contract\PartitionAwareProducerInterface; use StsGamingGroup\KafkaBundle\Client\Producer\Message; class ExampleProducer implements ProducerInterface { public function produce($data): Message { /** @var SomeEntity $data */ return new Message(json_encode($data->toArray()), $data->getId()); } public function getPartition($data, ResolvedConfiguration $configuration): int { /** @var SomeEntity $data */ return $data->getId() % 16; // calculating modulo from object id to produce to maximum of 16 partitions (0-15) } }
- 您还可以将回调数组设置到生产者中,例如,检查消息是否已成功发送。您生产者类应实现CallableInterface。
use StsGamingGroup\KafkaBundle\Client\Contract\CallableInterface; use StsGamingGroup\KafkaBundle\Client\Contract\ProducerInterface; class ExampleProducer implements ProducerInterface, CallableInterface { public function callbacks(): array { // callbacks array just like in Consumer example } }
- 可以在运行时配置ProducerClient的其他选项
$this->producerClient ->setPollingBatch(25000) ->setPollingTimeoutMs(1000) ->setFlushTimeoutMs(500) ->setMaxFlushRetries(10);
- 轮询批量 - 在多少消息(如上面的示例中的$someEntities循环)之后,ProducerClient应该调用librdkafka的
poll
方法。如果您生产大消息且不经常调用poll,可能会出现librdkafka内部队列满的问题。此外,消费者在调用poll
之前不会收到任何消息。因此,建议将轮询批量数保持在合理水平,例如10000或20000 - 轮询超时毫秒 - librdkafka等待轮询消息完成的时间
- 刷新超时毫秒,最大刷新重试次数 - 在调用
flush()
之后,ProducerClient将尝试刷新librdkafka内部队列中剩余的消息。剩余消息是那些尚未被poll
的消息。
自定义配置
有时您可能希望向您的消费者对象传递一些额外的选项。您可以添加自己的配置
<?php declare(strict_types=1); namespace App\Configuration; use StsGamingGroup\KafkaBundle\Configuration\Contract\ConfigurationInterface; use Symfony\Component\Console\Input\InputOption; class Divisor implements ConfigurationInterface { public function getName(): string { return 'divisor'; } public function getMode(): int { return InputOption::VALUE_REQUIRED; } public function getDescription(): string { return 'Option description'; } public function isValueValid($value): bool { return is_numeric($value) && $value > 0; } public function getDefaultValue(): int { return 1; } }
自定义选项只能通过CLI传递
bin/console kafka:consumers:consume example_consumer --divisor 4 --remainder 1 --group_id first_group
bin/console kafka:consumers:consume example_consumer --divisor 4 --remainder 2 --group_id second_group
etc.
您将在consume方法中收到它,并可以据此采取行动。
class ExampleConsumer implements ConsumerInterface { public const CONSUMER_NAME = 'example_consumer'; public function consume(Message $message, Context $context): void { $divisor = $context->getValue(Divisor::NAME); $remainder = $context->getValue(Remainder::NAME); if ($message->getId() % $divisor !== $remainder) { return; // let's skip that message } // process message normally } }
上面的示例显示了如何通过执行例如4个消费者/命令来扩展应用程序,这些消费者/命令具有不同的剩余量和组ID。如果您的主题只有一个分区且无法扩展消费者,您可能需要采取这种策略。
显示当前消费者/生产者配置
您可以通过调用以下命令来显示将传递给消费者的当前配置
bin/console kafka:consumers:describe example_consumer
┌───────────────────────────┬─────────────────────────────────────────────────────────┐
│ configuration │ value │
├───────────────────────────┼─────────────────────────────────────────────────────────┤
│ class │ App\Consumers\ExampleConsumer │
│ topics │ some_topic │
│ group_id │ some_group_id │
│ brokers │ 127.0.0.1:9092, 127.0.0.2:9092, 127.0.0.3:9092 │
│ offset_store_method │ broker │
│ timeout │ 1000 │
│ auto_offset_reset │ smallest │
│ auto_commit_interval_ms │ 5 │
│ decoder │ StsGamingGroup\KafkaBundle\Decoder\AvroDecoder │
│ schema_registry │ http://127.0.0.1:8081 │
│ enable_auto_offset_store │ true │
│ enable_auto_commit │ true │
│ log_level │ 3 │
│ register_missing_schemas │ false │
│ register_missing_subjects │ false │
│ denormalizer │ App\Normalizer\CustomDenormalizer │
│ max_retries │ 3 │
│ retry_delay │ 250 │
│ retry_multiplier │ 3 │
│ max_retry_delay │ 3000 │
└───────────────────────────┴─────────────────────────────────────────────────────────┘
您可以通过运行以下命令来显示生产者配置
bin/console kafka:producers:describe
┌────────────────────┬─────────────────────────────────────────────────────────┐
│ configuration │ value │
├────────────────────┼─────────────────────────────────────────────────────────┤
│ class │ App\Producers\ExampleProducer │
│ brokers │ 127.0.0.1:9092, 127.0.0.2:9092, 127.0.0.3:9092 │
│ log_level │ 3 │
│ producer_partition │ -1 │
│ producer_topic │ topic_i_want_to_produce_to │
└────────────────────┴─────────────────────────────────────────────────────────┘
许可证
本软件包在MIT许可证下分发。请参阅LICENSE.md获取更多详细信息。