jobcloud / php-kafka-lib
Jobcloud PHP Kafka 库
v2.0.0
2022-12-30 09:21 UTC
Requires
- php: ^8.0
- ext-json: *
- ext-rdkafka: ^4.0|^5.0|^6.0
Requires (Dev)
- flix-tech/avro-serde-php: ^1.4
- infection/infection: ^0.26
- johnkary/phpunit-speedtrap: ^3.1
- kwn/php-rdkafka-stubs: ^2.0.0
- php-mock/php-mock-phpunit: ^2.6
- phpstan/phpstan: ^1.8
- phpunit/phpunit: ^9.4
- rregeer/phpunit-coverage-check: ^0.3.1
- squizlabs/php_codesniffer: ^3.5.4
Suggests
- flix-tech/avro-serde-php: Is needed for Avro support
This package is auto-updated.
Last update: 2024-08-27 11:56:41 UTC
README
描述
这是一个使 PHP 项目中使用 Kafka 更容易的库。
该库依赖于 arnaud-lb/php-rdkafka
Avro 支持依赖于 flix-tech/avro-serde-php
php 扩展的 文档,
有助于理解该库的内部结构。
要求
- php: ^8.0
- ext-rdkafka: >=4.0.0
- librdkafka: >=0.11.6 (如果你使用
<librdkafka:1.x
,请定义自己的错误回调)
⚠️ 要使用事务性生产者,你需要
- ext-rdkafka: >=4.1.0
- librdkafka: >=1.4
安装
composer require jobcloud/php-kafka-lib "~1.0"
启用 Avro 支持
如果你需要 Avro 支持,运行
composer require flix-tech/avro-serde-php "~1.4"
用法
生产者
Kafka
简单示例
<?php use Jobcloud\Kafka\Message\KafkaProducerMessage; use Jobcloud\Kafka\Producer\KafkaProducerBuilder; $producer = KafkaProducerBuilder::create() ->withAdditionalBroker('localhost:9092') ->build(); $message = KafkaProducerMessage::create('test-topic', 0) ->withKey('asdf-asdf-asfd-asdf') ->withBody('some test message payload') ->withHeaders([ 'key' => 'value' ]); $producer->produce($message); // Shutdown producer, flush messages that are in queue. Give up after 20s $result = $producer->flush(20000);
事务性生产者(需要 >=php-rdkafka:4.1 和 >=librdkafka:1.4)
<?php use Jobcloud\Kafka\Message\KafkaProducerMessage; use Jobcloud\Kafka\Producer\KafkaProducerBuilder; use Jobcloud\Kafka\Exception\KafkaProducerTransactionRetryException; use Jobcloud\Kafka\Exception\KafkaProducerTransactionAbortException; use Jobcloud\Kafka\Exception\KafkaProducerTransactionFatalException; $producer = KafkaProducerBuilder::create() ->withAdditionalBroker('localhost:9092') ->build(); $message = KafkaProducerMessage::create('test-topic', 0) ->withKey('asdf-asdf-asfd-asdf') ->withBody('some test message payload') ->withHeaders([ 'key' => 'value' ]); try { $producer->beginTransaction(10000); $producer->produce($message); $producer->commitTransaction(10000); } catch (KafkaProducerTransactionRetryException $e) { // something went wrong but you can retry the failed call (either beginTransaction or commitTransaction) } catch (KafkaProducerTransactionAbortException $e) { // you need to call $producer->abortTransaction(10000); and try again } catch (KafkaProducerTransactionFatalException $e) { // something went very wrong, re-create your producer, otherwise you could jeopardize the idempotency guarantees } // Shutdown producer, flush messages that are in queue. Give up after 20s $result = $producer->flush(20000);
Avro 生产者
要创建一个 Avro 生产者,请添加 avro 编码器。
<?php use FlixTech\AvroSerializer\Objects\RecordSerializer; use Jobcloud\Kafka\Message\KafkaProducerMessage; use Jobcloud\Kafka\Message\Encoder\AvroEncoder; use Jobcloud\Kafka\Message\Registry\AvroSchemaRegistry; use Jobcloud\Kafka\Producer\KafkaProducerBuilder; use Jobcloud\Kafka\Message\KafkaAvroSchema; use FlixTech\SchemaRegistryApi\Registry\CachedRegistry; use FlixTech\SchemaRegistryApi\Registry\BlockingRegistry; use FlixTech\SchemaRegistryApi\Registry\PromisingRegistry; use FlixTech\SchemaRegistryApi\Registry\Cache\AvroObjectCacheAdapter; use GuzzleHttp\Client; $cachedRegistry = new CachedRegistry( new BlockingRegistry( new PromisingRegistry( new Client(['base_uri' => 'jobcloud-kafka-schema-registry:9081']) ) ), new AvroObjectCacheAdapter() ); $registry = new AvroSchemaRegistry($cachedRegistry); $recordSerializer = new RecordSerializer($cachedRegistry); //if no version is defined, latest version will be used //if no schema definition is defined, the appropriate version will be fetched form the registry $registry->addBodySchemaMappingForTopic( 'test-topic', new KafkaAvroSchema('bodySchemaName' /*, int $version, AvroSchema $definition */) ); $registry->addKeySchemaMappingForTopic( 'test-topic', new KafkaAvroSchema('keySchemaName' /*, int $version, AvroSchema $definition */) ); // if you are only encoding key or value, you can pass that mode as additional third argument // per default both key and body will get encoded $encoder = new AvroEncoder($registry, $recordSerializer /*, AvroEncoderInterface::ENCODE_BODY */); $producer = KafkaProducerBuilder::create() ->withAdditionalBroker('kafka:9092') ->withEncoder($encoder) ->build(); $schemaName = 'testSchema'; $version = 1; $message = KafkaProducerMessage::create('test-topic', 0) ->withKey('asdf-asdf-asfd-asdf') ->withBody(['name' => 'someName']) ->withHeaders([ 'key' => 'value' ]); $producer->produce($message); // Shutdown producer, flush messages that are in queue. Give up after 20s $result = $producer->flush(20000);
注意:为了提高生产者延迟,你可以安装 pcntl
扩展。
php-kafka-lib 已经有相应的代码,具体描述如下
https://github.com/arnaud-lb/php-rdkafka#performance--low-latency-settings
消费者
Kafka 高级
<?php use Jobcloud\Kafka\Consumer\KafkaConsumerBuilder; use Jobcloud\Kafka\Exception\KafkaConsumerConsumeException; use Jobcloud\Kafka\Exception\KafkaConsumerEndOfPartitionException; use Jobcloud\Kafka\Exception\KafkaConsumerTimeoutException; $consumer = KafkaConsumerBuilder::create() ->withAdditionalConfig( [ 'compression.codec' => 'lz4', 'auto.commit.interval.ms' => 500 ] ) ->withAdditionalBroker('kafka:9092') ->withConsumerGroup('testGroup') ->withAdditionalSubscription('test-topic') ->build(); $consumer->subscribe(); while (true) { try { $message = $consumer->consume(); // your business logic $consumer->commit($message); } catch (KafkaConsumerTimeoutException $e) { //no messages were read in a given time } catch (KafkaConsumerEndOfPartitionException $e) { //only occurs if enable.partition.eof is true (default: false) } catch (KafkaConsumerConsumeException $e) { // Failed } }
Kafka 低级
<?php use Jobcloud\Kafka\Consumer\KafkaConsumerBuilder; use Jobcloud\Kafka\Exception\KafkaConsumerConsumeException; use Jobcloud\Kafka\Exception\KafkaConsumerEndOfPartitionException; use Jobcloud\Kafka\Exception\KafkaConsumerTimeoutException; $consumer = KafkaConsumerBuilder::create() ->withAdditionalConfig( [ 'compression.codec' => 'lz4', 'auto.commit.interval.ms' => 500 ] ) ->withAdditionalBroker('kafka:9092') ->withConsumerGroup('testGroup') ->withAdditionalSubscription('test-topic') ->withConsumerType(KafkaConsumerBuilder::CONSUMER_TYPE_LOW_LEVEL) ->build(); $consumer->subscribe(); while (true) { try { $message = $consumer->consume(); // your business logic $consumer->commit($message); } catch (KafkaConsumerTimeoutException $e) { //no messages were read in a given time } catch (KafkaConsumerEndOfPartitionException $e) { //only occurs if enable.partition.eof is true (default: false) } catch (KafkaConsumerConsumeException $e) { // Failed } }
Avro 消费者
要创建一个 Avro 消费者,请添加 avro 解码器。
<?php use FlixTech\AvroSerializer\Objects\RecordSerializer; use \Jobcloud\Messaging\Kafka\Consumer\KafkaConsumerBuilder; use Jobcloud\Kafka\Exception\KafkaConsumerConsumeException; use Jobcloud\Kafka\Exception\KafkaConsumerEndOfPartitionException; use Jobcloud\Kafka\Exception\KafkaConsumerTimeoutException; use Jobcloud\Kafka\Message\Decoder\AvroDecoder; use Jobcloud\Kafka\Message\KafkaAvroSchema; use Jobcloud\Kafka\Message\Registry\AvroSchemaRegistry; use FlixTech\SchemaRegistryApi\Registry\CachedRegistry; use FlixTech\SchemaRegistryApi\Registry\BlockingRegistry; use FlixTech\SchemaRegistryApi\Registry\PromisingRegistry; use FlixTech\SchemaRegistryApi\Registry\Cache\AvroObjectCacheAdapter; use GuzzleHttp\Client; $cachedRegistry = new CachedRegistry( new BlockingRegistry( new PromisingRegistry( new Client(['base_uri' => 'jobcloud-kafka-schema-registry:9081']) ) ), new AvroObjectCacheAdapter() ); $registry = new AvroSchemaRegistry($cachedRegistry); $recordSerializer = new RecordSerializer($cachedRegistry); //if no version is defined, latest version will be used //if no schema definition is defined, the appropriate version will be fetched form the registry $registry->addBodySchemaMappingForTopic( 'test-topic', new KafkaAvroSchema('bodySchema' , 9 /* , AvroSchema $definition */) ); $registry->addKeySchemaMappingForTopic( 'test-topic', new KafkaAvroSchema('keySchema' , 9 /* , AvroSchema $definition */) ); // If you are only encoding / decoding key or value, only register the schema(s) you need. // It is advised against doing that though, some tools might not play // nice if you don't fully encode your message $decoder = new AvroDecoder($registry, $recordSerializer); $consumer = KafkaConsumerBuilder::create() ->withAdditionalConfig( [ 'compression.codec' => 'lz4', 'auto.commit.interval.ms' => 500 ] ) ->withDecoder($decoder) ->withAdditionalBroker('kafka:9092') ->withConsumerGroup('testGroup') ->withAdditionalSubscription('test-topic') ->build(); $consumer->subscribe(); while (true) { try { $message = $consumer->consume(); // your business logic $consumer->commit($message); } catch (KafkaConsumerTimeoutException $e) { //no messages were read in a given time } catch (KafkaConsumerEndOfPartitionException $e) { //only occurs if enable.partition.eof is true (default: false) } catch (KafkaConsumerConsumeException $e) { // Failed } }
更多信息
替代 messaging-lib
查看 MIGRATION.md 以获取迁移帮助。