geekcell / kafka-bundle
1.0.1
2023-01-09 15:13 UTC
Requires
- php: ^8.0
- ext-rdkafka: *
- flix-tech/avro-serde-php: ^1.6
- symfony/config: ^6.0
- symfony/dependency-injection: ^6.0
- symfony/event-dispatcher: ^6.0
- symfony/finder: ^6.0
- symfony/http-kernel: ^6.0
- symfony/serializer: ^6.0
- symfony/string: ^6.0
Requires (Dev)
- friendsofphp/php-cs-fixer: ^3.13
- kwn/php-rdkafka-stubs: ^2.2
- phpunit/phpunit: ^9.5
README
这是一个有偏见的 Symfony 扩展包,用于轻松集成 Apache Kafka。
需求
- PHP 8 或更高版本
- php-rdkafka 扩展已安装
- Apache Kafka 代理
- 为 Kafka 配置的 Confluent Schema Registry
安装
要使用此扩展包,请在 Composer 中要求它
composer require geekcell/kafka-bundle
快速入门
通过继承 Record
来定义您想发送到 Kafka 的记录。
use GeekCell\KafkaBundle\Record; use FlixTech\AvroSerializer\Objects\Schema; class OrderDto extends Record { public int $id; public int $productId; public int $customerId; public int $quantity; public float $total; public string $status = 'PENDING'; public function getKey(): ?string { // Nullable; if provided it will be used as message key // to preserve message ordering. return sprintf('order_%s', $this->id); } protected function withFields(RecordType $root): Schema { // See for examples: // https://github.com/flix-tech/avro-serde-php/tree/master/test/Objects/Schema $root ->field('id', Schema::int()) ->field('productId', Schema::int()) ->field('customerId', Schema::int()) ->field('quantity', Schema::int()) ->field('total', Schema::float()) ->field( 'status', Schema::enum() ->name('OrderStatusEnum') ->symbols(...['PENDING', 'PAID', 'SHIPPED', 'CANCELLED']) ->default('PENDING'), ); return $root; } }
创建一个事件,该事件实现 Event
合同并返回上述记录作为 主题。
use GeekCell\KafkaBundle\Contracts\Event; class OrderPlacedEvent implements Event { public function __construct( private OrderDto $orderDto, ) { } public function getSubject(): Record { return $this->orderDto; } }
如果您通过标准 Symfony 事件调度器分发 Event
,它将被自动序列化为 Avro 格式,注册,并根据您的配置发送到 Kafka。
$this->eventDispatcher->dispatch(new OrderPlacedEvent($orderDto));
扩展包配置示例
geek_cell_kafka: avro: schema_registry_url: 'http://schemaregistry:8081' schemas: defaults: namespace: 'com.acme.avro' events: lookup: # Look up events in the following directories - 'src/Event' kafka: brokers: 'broker:9091,broker:9092' global: # Global config params for librdkafka (not pre-validated) # https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md#global-configuration-properties topic: # Topic config params for librdkafka (not pre-validated) # https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md#topic-configuration-properties