jobcloud/messaging-lib

此包已被废弃,不再维护。作者建议使用 jobcloud/php-kafka-lib 包。

Jobcloud PHP 消息库(Kafka)

v5.0.0 2020-02-24 08:34 UTC

README

此库已被弃用,推荐使用 php-kafka-lib
直到 2020 年 10 月底,它仍将接收错误修复,之后将在 Packagist 上标记为废弃。
请查看MIGRATION.md 了解迁移路径。

messaging-lib

CircleCI Maintainability Test Coverage Latest Stable Version Latest Unstable Version

描述

通用的 PHP 消息库 支持

  • Kafka

这是对 arnaud-lb/php-rdkafka 的便捷封装
Avro 支持依赖于 flix-tech/avro-serde-php
要了解更多关于此库中使用的函数的信息,请参阅扩展的 文档

要求

  • php: ^7.1
  • ext-rdkafka: ^4.0.0

安装

composer require jobcloud/messaging-lib "~4.0"

用法

生产者

Kafka

简单示例
<?php

use Jobcloud\Messaging\Kafka\Message\KafkaProducerMessage;
use \Jobcloud\Messaging\Kafka\Producer\KafkaProducerBuilder;

$producer = KafkaProducerBuilder::create()
    ->withAdditionalBroker('localhost:9092')
    ->build();

$message = KafkaProducerMessage::create('test-topic', 0)
            ->withKey('asdf-asdf-asfd-asdf')
            ->withBody('some test message payload')
            ->withHeaders([ 'key' => 'value' ]);

$producer->produce($message);
Avro 生产者

要创建 Avro 生产者,请添加 avro 编码器。

<?php

use FlixTech\AvroSerializer\Objects\RecordSerializer;
use Jobcloud\Messaging\Kafka\Message\KafkaProducerMessage;
use Jobcloud\Messaging\Kafka\Message\Encoder\AvroEncoder;
use Jobcloud\Messaging\Kafka\Message\Registry\AvroSchemaRegistry;
use \Jobcloud\Messaging\Kafka\Producer\KafkaProducerBuilder;
use \Jobcloud\Messaging\Kafka\Message\KafkaAvroSchema;
use FlixTech\SchemaRegistryApi\Registry\CachedRegistry;
use FlixTech\SchemaRegistryApi\Registry\BlockingRegistry;
use FlixTech\SchemaRegistryApi\Registry\PromisingRegistry;
use FlixTech\SchemaRegistryApi\Registry\Cache\AvroObjectCacheAdapter;
use GuzzleHttp\Client;

$cachedRegistry = new CachedRegistry(
    new BlockingRegistry(
        new PromisingRegistry(
            new Client(['base_uri' => 'jobcloud-kafka-schema-registry:9081'])
        )
    ),
    new AvroObjectCacheAdapter()
);

$registry = new AvroSchemaRegistry($cachedRegistry);
$recordSerializer = new RecordSerializer($cachedRegistry);

//if no version is defined, latest version will be used
//if no schema definition is defined, the appropriate version will be fetched form the registry
$registry->addSchemaMappingForTopic(
    'test-topic',
    new KafkaAvroSchema('schemaName' /*, int $version, AvroSchema $definition */)
);

$encoder = new AvroEncoder($registry, $recordSerializer);

$producer = KafkaProducerBuilder::create()
    ->withAdditionalBroker('kafka:9092')
    ->withEncoder($encoder)
    ->build();

$schemaName = 'testSchema';
$version = 1;
$message = KafkaProducerMessage::create('test-topic', 0)
            ->withKey('asdf-asdf-asfd-asdf')
            ->withBody(['name' => 'someName'])
            ->withHeaders([ 'key' => 'value' ]);

$producer->produce($message);

注意:要改进生产者延迟,您可以安装 pcntl 扩展。
消息库中已存在类似描述的代码
https://github.com/arnaud-lb/php-rdkafka#performance--low-latency-settings

消费者

Kafka 高级

<?php

use \Jobcloud\Messaging\Consumer\ConsumerException;
use \Jobcloud\Messaging\Kafka\Consumer\KafkaConsumerBuilder;
use Jobcloud\Messaging\Kafka\Exception\KafkaConsumerEndOfPartitionException;
use Jobcloud\Messaging\Kafka\Exception\KafkaConsumerTimeoutException;

$consumer = KafkaConsumerBuilder::create()
     ->withAdditionalConfig(
        [
            'compression.codec' => 'lz4',
            'auto.commit.interval.ms' => 500
        ]
    )
    ->withAdditionalBroker('kafka:9092')
    ->withConsumerGroup('testGroup')
    ->withTimeout(120 * 10000)
    ->withAdditionalSubscription('test-topic')
    ->build();

$consumer->subscribe();

while (true) {
    try {
        $message = $consumer->consume();
        // your business logic
        $consumer->commit($message);
    } catch (KafkaConsumerTimeoutException $e) {
        //no messages were read in a given time
    } catch (KafkaConsumerEndOfPartitionException $e) {
        //only occurs if enable.partition.eof is true (default: false)
    } catch (ConsumerException $e) {
        // Failed
    } 
}

Kafka 低级

<?php

use \Jobcloud\Messaging\Consumer\ConsumerException;
use \Jobcloud\Messaging\Kafka\Consumer\KafkaConsumerBuilder;
use Jobcloud\Messaging\Kafka\Exception\KafkaConsumerEndOfPartitionException;
use Jobcloud\Messaging\Kafka\Exception\KafkaConsumerTimeoutException;

$consumer = KafkaConsumerBuilder::create()
     ->withAdditionalConfig(
        [
            'compression.codec' => 'lz4',
            'auto.commit.interval.ms' => 500
        ]
    )
    ->withAdditionalBroker('kafka:9092')
    ->withConsumerGroup('testGroup')
    ->withTimeout(120 * 10000)
    ->withAdditionalSubscription('test-topic')
    ->withConsumerType(KafkaConsumerBuilder::CONSUMER_TYPE_LOW_LEVEL)
    ->build();

$consumer->subscribe();

while (true) {
    try {
        $message = $consumer->consume();
        // your business logic
        $consumer->commit($message);
    } catch (KafkaConsumerTimeoutException $e) {
        //no messages were read in a given time
    } catch (KafkaConsumerEndOfPartitionException $e) {
        //only occurs if enable.partition.eof is true (default: false)
    } catch (ConsumerException $e) {
        // Failed
    } 
}

Avro 消费者

要创建 Avro 消费者,请添加 avro 解码器。

<?php

use FlixTech\AvroSerializer\Objects\RecordSerializer;
use Jobcloud\Messaging\Consumer\ConsumerException;
use \Jobcloud\Messaging\Kafka\Consumer\KafkaConsumerBuilder;
use Jobcloud\Messaging\Kafka\Exception\KafkaConsumerEndOfPartitionException;
use Jobcloud\Messaging\Kafka\Exception\KafkaConsumerTimeoutException;
use Jobcloud\Messaging\Kafka\Message\Decoder\AvroDecoder;
use Jobcloud\Messaging\Kafka\Message\KafkaAvroSchema;
use Jobcloud\Messaging\Kafka\Message\Registry\AvroSchemaRegistry;
use FlixTech\SchemaRegistryApi\Registry\CachedRegistry;
use FlixTech\SchemaRegistryApi\Registry\BlockingRegistry;
use FlixTech\SchemaRegistryApi\Registry\PromisingRegistry;
use FlixTech\SchemaRegistryApi\Registry\Cache\AvroObjectCacheAdapter;
use GuzzleHttp\Client;

$cachedRegistry = new CachedRegistry(
    new BlockingRegistry(
        new PromisingRegistry(
            new Client(['base_uri' => 'jobcloud-kafka-schema-registry:9081'])
        )
    ),
    new AvroObjectCacheAdapter()
);

$registry = new AvroSchemaRegistry($cachedRegistry);
$recordSerializer = new RecordSerializer($cachedRegistry);

//if no version is defined, latest version will be used
//if no schema definition is defined, the appropriate version will be fetched form the registry
$registry->addSchemaMappingForTopic(
    'test-topic',
    new KafkaAvroSchema('someSchema' , 9 /* , AvroSchema $definition */)
);

$decoder = new AvroDecoder($registry, $recordSerializer);

$consumer = KafkaConsumerBuilder::create()
     ->withAdditionalConfig(
        [
            'compression.codec' => 'lz4',
            'auto.commit.interval.ms' => 500
        ]
    )
    ->withDecoder($decoder)
    ->withAdditionalBroker('kafka:9092')
    ->withConsumerGroup('testGroup')
    ->withTimeout(120 * 10000)
    ->withAdditionalSubscription('test-topic')
    ->build();

$consumer->subscribe();

while (true) {
    try {
        $message = $consumer->consume();
        // your business logic
        $consumer->commit($message);
    } catch (KafkaConsumerTimeoutException $e) {
        //no messages were read in a given time
    } catch (KafkaConsumerEndOfPartitionException $e) {
        //only occurs if enable.partition.eof is true (default: false)
    } catch (ConsumerException $e) {
        // Failed
    } 
}

生产者池

<?php

use \Jobcloud\Messaging\Producer\ProducerPool;
use \Jobcloud\Messaging\Producer\ProducerInterface;

/** @var ProducerInterface $someKafkaProducer */
/** @var ProducerInterface $someRabbitMQProducer */

$pool = new ProducerPool();
$pool
    ->addProducer($someKafkaProducer)
    ->addProducer($someRabbitMQProducer)
;

$message = KafkaMessage::create('test-topic', 0)
            ->withKey('asdf-asdf-asfd-asdf')
            ->withBody('some test content')
            ->withHeaders([ 'key' => 'value' ]);

$pool->produce($message);