geekcell / kafka-bundle

安装数: 3,964

依赖者: 0

建议者: 0

安全性: 0

星标: 2

关注者: 4

分支: 0

公开问题: 0

类型:symfony-bundle

1.0.1 2023-01-09 15:13 UTC

This package is auto-updated.

Last update: 2024-09-09 19:56:31 UTC


README

这是一个有偏见的 Symfony 扩展包,用于轻松集成 Apache Kafka

需求

安装

要使用此扩展包,请在 Composer 中要求它

composer require geekcell/kafka-bundle

快速入门

通过继承 Record 来定义您想发送到 Kafka 的记录。

use GeekCell\KafkaBundle\Record;
use FlixTech\AvroSerializer\Objects\Schema;

class OrderDto extends Record
{
    public int $id;
    public int $productId;
    public int $customerId;
    public int $quantity;
    public float $total;
    public string $status = 'PENDING';

    public function getKey(): ?string
    {
        // Nullable; if provided it will be used as message key
        // to preserve message ordering.
        return sprintf('order_%s', $this->id);
    }

    protected function withFields(RecordType $root): Schema
    {
        // See for examples:
        // https://github.com/flix-tech/avro-serde-php/tree/master/test/Objects/Schema
        $root
            ->field('id', Schema::int())
            ->field('productId', Schema::int())
            ->field('customerId', Schema::int())
            ->field('quantity', Schema::int())
            ->field('total', Schema::float())
            ->field(
                'status', 
                Schema::enum()
                    ->name('OrderStatusEnum')
                    ->symbols(...['PENDING', 'PAID', 'SHIPPED', 'CANCELLED'])
                    ->default('PENDING'),
            );

        return $root;
    }
}

创建一个事件,该事件实现 Event 合同并返回上述记录作为 主题

use GeekCell\KafkaBundle\Contracts\Event;

class OrderPlacedEvent implements Event
{
    public function __construct(
        private OrderDto $orderDto,
    ) {
    }

    public function getSubject(): Record
    {
        return $this->orderDto;
    }
}

如果您通过标准 Symfony 事件调度器分发 Event,它将被自动序列化为 Avro 格式,注册,并根据您的配置发送到 Kafka。

$this->eventDispatcher->dispatch(new OrderPlacedEvent($orderDto));

扩展包配置示例

geek_cell_kafka:
    avro:
        schema_registry_url: 'http://schemaregistry:8081'
        schemas:
            defaults:
                namespace: 'com.acme.avro'
    events:
        lookup: # Look up events in the following directories
            - 'src/Event'

    kafka:
        brokers: 'broker:9091,broker:9092'
        global:
            # Global config params for librdkafka (not pre-validated)
            # https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md#global-configuration-properties
        topic:
            # Topic config params for librdkafka (not pre-validated)
            # https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md#topic-configuration-properties