jobcloud / messaging-lib
v5.0.0
2020-02-24 08:34 UTC
Requires
- php: ^7.3
- ext-json: *
- ext-rdkafka: ^4.0.0
- flix-tech/avro-serde-php: ^1.3
Requires (Dev)
- johnkary/phpunit-speedtrap: ^3.1
- kwn/php-rdkafka-stubs: ^2.0.0
- php-mock/php-mock-phpunit: ^2.5
- phpstan/phpstan: ^0.11.12
- phpunit/phpunit: ^8.4
- rregeer/phpunit-coverage-check: ^0.3.1
- squizlabs/php_codesniffer: ^3.4.2
- dev-main
- v5.0.0
- v5.0.0-rc1
- v5.0.0-beta
- v5.0.0-alpha2
- v5.0.0-alpha
- dev-master / 4.0.x-dev
- v4.0.5
- v4.0.4
- v4.0.3
- v4.0.2
- v4.0.1
- v4.0.0
- v4.0.0-rc5
- v4.0.0-rc4
- v4.0.0-rc3
- v4.0.0-rc2
- v4.0.0-rc1
- v4.0.0-beta3
- v4.0.0-beta2
- v4.0.0-beta
- 3.0.x-dev
- v3.0.2
- v3.0.1
- v3.0.0
- v2.0.0
- 1.0.x-dev
- v1.0.3
- v1.0.2
- v1.0.1
- v1.0.0
- dev-switch-to-php7.4-image
This package is auto-updated.
Last update: 2020-11-24 12:57:34 UTC
README
此库已被弃用,推荐使用 php-kafka-lib。
直到 2020 年 10 月底,它仍将接收错误修复,之后将在 Packagist 上标记为废弃。
请查看MIGRATION.md 了解迁移路径。
messaging-lib
描述
通用的 PHP 消息库 支持
- Kafka
这是对 arnaud-lb/php-rdkafka 的便捷封装
Avro 支持依赖于 flix-tech/avro-serde-php
要了解更多关于此库中使用的函数的信息,请参阅扩展的 文档
要求
- php: ^7.1
- ext-rdkafka: ^4.0.0
安装
composer require jobcloud/messaging-lib "~4.0"
用法
生产者
Kafka
简单示例
<?php use Jobcloud\Messaging\Kafka\Message\KafkaProducerMessage; use \Jobcloud\Messaging\Kafka\Producer\KafkaProducerBuilder; $producer = KafkaProducerBuilder::create() ->withAdditionalBroker('localhost:9092') ->build(); $message = KafkaProducerMessage::create('test-topic', 0) ->withKey('asdf-asdf-asfd-asdf') ->withBody('some test message payload') ->withHeaders([ 'key' => 'value' ]); $producer->produce($message);
Avro 生产者
要创建 Avro 生产者,请添加 avro 编码器。
<?php use FlixTech\AvroSerializer\Objects\RecordSerializer; use Jobcloud\Messaging\Kafka\Message\KafkaProducerMessage; use Jobcloud\Messaging\Kafka\Message\Encoder\AvroEncoder; use Jobcloud\Messaging\Kafka\Message\Registry\AvroSchemaRegistry; use \Jobcloud\Messaging\Kafka\Producer\KafkaProducerBuilder; use \Jobcloud\Messaging\Kafka\Message\KafkaAvroSchema; use FlixTech\SchemaRegistryApi\Registry\CachedRegistry; use FlixTech\SchemaRegistryApi\Registry\BlockingRegistry; use FlixTech\SchemaRegistryApi\Registry\PromisingRegistry; use FlixTech\SchemaRegistryApi\Registry\Cache\AvroObjectCacheAdapter; use GuzzleHttp\Client; $cachedRegistry = new CachedRegistry( new BlockingRegistry( new PromisingRegistry( new Client(['base_uri' => 'jobcloud-kafka-schema-registry:9081']) ) ), new AvroObjectCacheAdapter() ); $registry = new AvroSchemaRegistry($cachedRegistry); $recordSerializer = new RecordSerializer($cachedRegistry); //if no version is defined, latest version will be used //if no schema definition is defined, the appropriate version will be fetched form the registry $registry->addSchemaMappingForTopic( 'test-topic', new KafkaAvroSchema('schemaName' /*, int $version, AvroSchema $definition */) ); $encoder = new AvroEncoder($registry, $recordSerializer); $producer = KafkaProducerBuilder::create() ->withAdditionalBroker('kafka:9092') ->withEncoder($encoder) ->build(); $schemaName = 'testSchema'; $version = 1; $message = KafkaProducerMessage::create('test-topic', 0) ->withKey('asdf-asdf-asfd-asdf') ->withBody(['name' => 'someName']) ->withHeaders([ 'key' => 'value' ]); $producer->produce($message);
注意:要改进生产者延迟,您可以安装 pcntl
扩展。
消息库中已存在类似描述的代码
https://github.com/arnaud-lb/php-rdkafka#performance--low-latency-settings
消费者
Kafka 高级
<?php use \Jobcloud\Messaging\Consumer\ConsumerException; use \Jobcloud\Messaging\Kafka\Consumer\KafkaConsumerBuilder; use Jobcloud\Messaging\Kafka\Exception\KafkaConsumerEndOfPartitionException; use Jobcloud\Messaging\Kafka\Exception\KafkaConsumerTimeoutException; $consumer = KafkaConsumerBuilder::create() ->withAdditionalConfig( [ 'compression.codec' => 'lz4', 'auto.commit.interval.ms' => 500 ] ) ->withAdditionalBroker('kafka:9092') ->withConsumerGroup('testGroup') ->withTimeout(120 * 10000) ->withAdditionalSubscription('test-topic') ->build(); $consumer->subscribe(); while (true) { try { $message = $consumer->consume(); // your business logic $consumer->commit($message); } catch (KafkaConsumerTimeoutException $e) { //no messages were read in a given time } catch (KafkaConsumerEndOfPartitionException $e) { //only occurs if enable.partition.eof is true (default: false) } catch (ConsumerException $e) { // Failed } }
Kafka 低级
<?php use \Jobcloud\Messaging\Consumer\ConsumerException; use \Jobcloud\Messaging\Kafka\Consumer\KafkaConsumerBuilder; use Jobcloud\Messaging\Kafka\Exception\KafkaConsumerEndOfPartitionException; use Jobcloud\Messaging\Kafka\Exception\KafkaConsumerTimeoutException; $consumer = KafkaConsumerBuilder::create() ->withAdditionalConfig( [ 'compression.codec' => 'lz4', 'auto.commit.interval.ms' => 500 ] ) ->withAdditionalBroker('kafka:9092') ->withConsumerGroup('testGroup') ->withTimeout(120 * 10000) ->withAdditionalSubscription('test-topic') ->withConsumerType(KafkaConsumerBuilder::CONSUMER_TYPE_LOW_LEVEL) ->build(); $consumer->subscribe(); while (true) { try { $message = $consumer->consume(); // your business logic $consumer->commit($message); } catch (KafkaConsumerTimeoutException $e) { //no messages were read in a given time } catch (KafkaConsumerEndOfPartitionException $e) { //only occurs if enable.partition.eof is true (default: false) } catch (ConsumerException $e) { // Failed } }
Avro 消费者
要创建 Avro 消费者,请添加 avro 解码器。
<?php use FlixTech\AvroSerializer\Objects\RecordSerializer; use Jobcloud\Messaging\Consumer\ConsumerException; use \Jobcloud\Messaging\Kafka\Consumer\KafkaConsumerBuilder; use Jobcloud\Messaging\Kafka\Exception\KafkaConsumerEndOfPartitionException; use Jobcloud\Messaging\Kafka\Exception\KafkaConsumerTimeoutException; use Jobcloud\Messaging\Kafka\Message\Decoder\AvroDecoder; use Jobcloud\Messaging\Kafka\Message\KafkaAvroSchema; use Jobcloud\Messaging\Kafka\Message\Registry\AvroSchemaRegistry; use FlixTech\SchemaRegistryApi\Registry\CachedRegistry; use FlixTech\SchemaRegistryApi\Registry\BlockingRegistry; use FlixTech\SchemaRegistryApi\Registry\PromisingRegistry; use FlixTech\SchemaRegistryApi\Registry\Cache\AvroObjectCacheAdapter; use GuzzleHttp\Client; $cachedRegistry = new CachedRegistry( new BlockingRegistry( new PromisingRegistry( new Client(['base_uri' => 'jobcloud-kafka-schema-registry:9081']) ) ), new AvroObjectCacheAdapter() ); $registry = new AvroSchemaRegistry($cachedRegistry); $recordSerializer = new RecordSerializer($cachedRegistry); //if no version is defined, latest version will be used //if no schema definition is defined, the appropriate version will be fetched form the registry $registry->addSchemaMappingForTopic( 'test-topic', new KafkaAvroSchema('someSchema' , 9 /* , AvroSchema $definition */) ); $decoder = new AvroDecoder($registry, $recordSerializer); $consumer = KafkaConsumerBuilder::create() ->withAdditionalConfig( [ 'compression.codec' => 'lz4', 'auto.commit.interval.ms' => 500 ] ) ->withDecoder($decoder) ->withAdditionalBroker('kafka:9092') ->withConsumerGroup('testGroup') ->withTimeout(120 * 10000) ->withAdditionalSubscription('test-topic') ->build(); $consumer->subscribe(); while (true) { try { $message = $consumer->consume(); // your business logic $consumer->commit($message); } catch (KafkaConsumerTimeoutException $e) { //no messages were read in a given time } catch (KafkaConsumerEndOfPartitionException $e) { //only occurs if enable.partition.eof is true (default: false) } catch (ConsumerException $e) { // Failed } }
生产者池
<?php use \Jobcloud\Messaging\Producer\ProducerPool; use \Jobcloud\Messaging\Producer\ProducerInterface; /** @var ProducerInterface $someKafkaProducer */ /** @var ProducerInterface $someRabbitMQProducer */ $pool = new ProducerPool(); $pool ->addProducer($someKafkaProducer) ->addProducer($someRabbitMQProducer) ; $message = KafkaMessage::create('test-topic', 0) ->withKey('asdf-asdf-asfd-asdf') ->withBody('some test content') ->withHeaders([ 'key' => 'value' ]); $pool->produce($message);