daalvand/kafka

此包用于从kafka生产和消费

1.0.0 2021-09-24 10:20 UTC

This package is auto-updated.

Last update: 2024-09-24 16:31:42 UTC


README

  • 此包用于在Laravel中kafka的消费和生产

安装

安装Kafka

Apache Kafka是我们生态系统许多部分所需要的

  1. Apache Kafka C/C++客户端库安装librdkafka

    对于Ubuntu: apt install librdkafka-dev

    对于CentOS: yum install librdkafka-devel

  2. 然后从手动安装扩展构建PHP扩展

    git clone https://github.com/arnaud-lb/php-rdkafka.git
    cd php-rdkafka
    phpize
    ./configure
    make all -j 5
    sudo make install
  3. 然后将扩展添加到php.ini

    extension=rdkafka.so

  4. 然后重启php-fpm服务 service php-fpm restart

安装包

  1. 运行 composer require daalvand/kafka

2. 发布提供者

Laravel

  • php artisan vendor:publish --provider="Daalvand\Kafka\KafkaServiceProvider"

Lumen

  • 将服务提供者添加到bootstrap/app.php文件中
<?php
 $app->register(Daalvand\Kafka\KafkaServiceProvider::class);
  • /vendor/daalvand/kafka/src/config复制配置文件到/config目录。然后在/bootstrap/app.php文件中进行配置
<?php

$app->configure("kafka");

使用方法

生产者

<?php
use Daalvand\Kafka\Message\ProducerMessage;
use Daalvand\Kafka\Facades\Producer;

$producer = Producer::withAdditionalBroker('localhost:9092')
    ->build();

$message = (new ProducerMessage('topic-name', 0))
            ->withKey('test-key')
            ->withBody('some test message payload')
            ->withHeaders(['header' => 'value']);

$producer->produce($message);
$producer->flush(-1);

消费者

<?php

use Daalvand\Kafka\Facades\Consumer;
use Daalvand\Kafka\Exceptions\ConsumerConsumeException;
use Daalvand\Kafka\Exceptions\ConsumerEndOfPartitionException;
use Daalvand\Kafka\Exceptions\ConsumerTimeoutException;

$consumer = Consumer::withAdditionalConfig([
            'compression.codec'       => 'lz4',
            'auto.commit.interval.ms' => 500
    ])
    ->withAdditionalBroker('kafka:9092')
    ->withConsumerGroup('testGroup')
    ->withAdditionalSubscription('test-topic')
    ->build();

$consumer->subscribe();

while (true) {
    try {
        $message = $consumer->consume();
        // your business logic
        $consumer->commit($message);
    } catch (ConsumerTimeoutException $e) {
        //no messages were read in a given time
    } catch (ConsumerEndOfPartitionException $e) {
        //only occurs if enable.partition.eof is true (default: false)
    } catch (ConsumerConsumeException $e) {
        // Failed
    }
}
  • auto.offset.reset选项对于0.9之前版本的kafka是(largest, smallest)有效的值,而对于0.9之后的版本是(earliest, latest)