lisq/yii2-kafka

yii2 kafka 生产者、消费者组件

安装: 99

依赖者: 0

建议者: 0

安全性: 0

星标: 0

关注者: 1

分支: 0

开放问题: 0

类型:yii2-extension

dev-main 2022-05-05 11:21 UTC

This package is auto-updated.

Last update: 2024-09-05 16:57:11 UTC


README

yii2 kafka produce、consume

安装

要求

librdkafka
rdkafka.so

安装此扩展的首选方式是通过 composer

运行以下命令之一:

php composer.phar require --prefer-dist lisqorz/yii2-rdkafka "*"

或者

"lisqorz/yii2-rdkafka": "*"

将以下内容添加到您的 composer.json 文件的 require 部分:

使用方法

打开 common/config/main.php

'kafka' => [
    'class' => \lisq\kafka\Kafka::class,
    'hosts' => 'aaa:9092,bbb:9092,ccc:9092',
    'rdkafka' => [
        "auto.offset.reset" => "latest",
        "enable.auto.commit" => 'false',
        // auth config
        'api.version.request' => 'true',
        'sasl.mechanisms' => 'PLAIN',
        'sasl.username' => '',
        'sasl.password' => '',
        'security.protocol' => 'SASL_SSL',
        'ssl.ca.location' => dirname(dirname(__DIR__)).'/{YOUR_PATH}/ca-cert.pem',
    ],
    'consumers' => [
        'consumeName' => [
            'consume' => OffersConsume::class,
            'topic' => ['topic1'],
            'groupId' => 'php'
        ],
    ]
]

消费者接口

use lisq\kafka\consumer\ConsumerInterface;
use RdKafka\Message;

class FirstConsume implements ConsumerInterface
{

    public function execute(Message $message)
    {
        try {
        } catch (\Exception $e) {
        }
    }
}

生产者

// single publish
Yii::$app->kafka->publishOne(topic,message,pollTimeout,flushTimeout)

// batch publish
Yii::$app->kafka->publishBulk([
    'topic1'=>[
        ['message'=>'message1'],
        ['message'=>'message2'],
        ['message'=>'message3'],
    ],
    'topic2'=[
        ['message'=>'message1'],
        ['message'=>'message2'],
        ['message'=>'message3'],
    ]
],pollTimeout,flushTimeout)

开始消费

./yii kafka/consume consumeName