lisq / yii2-kafka
yii2 kafka 生产者、消费者组件
dev-main
2022-05-05 11:21 UTC
Requires
- php: ^7.0
- ext-rdkafka: *
This package is auto-updated.
Last update: 2024-09-05 16:57:11 UTC
README
yii2 kafka produce、consume
安装
要求
librdkafka
rdkafka.so
安装此扩展的首选方式是通过 composer。
运行以下命令之一:
php composer.phar require --prefer-dist lisqorz/yii2-rdkafka "*"
或者
"lisqorz/yii2-rdkafka": "*"
将以下内容添加到您的 composer.json
文件的 require 部分:
使用方法
打开 common/config/main.php
'kafka' => [ 'class' => \lisq\kafka\Kafka::class, 'hosts' => 'aaa:9092,bbb:9092,ccc:9092', 'rdkafka' => [ "auto.offset.reset" => "latest", "enable.auto.commit" => 'false', // auth config 'api.version.request' => 'true', 'sasl.mechanisms' => 'PLAIN', 'sasl.username' => '', 'sasl.password' => '', 'security.protocol' => 'SASL_SSL', 'ssl.ca.location' => dirname(dirname(__DIR__)).'/{YOUR_PATH}/ca-cert.pem', ], 'consumers' => [ 'consumeName' => [ 'consume' => OffersConsume::class, 'topic' => ['topic1'], 'groupId' => 'php' ], ] ]
消费者接口
use lisq\kafka\consumer\ConsumerInterface;
use RdKafka\Message;
class FirstConsume implements ConsumerInterface
{
public function execute(Message $message)
{
try {
} catch (\Exception $e) {
}
}
}
生产者
// single publish
Yii::$app->kafka->publishOne(topic,message,pollTimeout,flushTimeout)
// batch publish
Yii::$app->kafka->publishBulk([
'topic1'=>[
['message'=>'message1'],
['message'=>'message2'],
['message'=>'message3'],
],
'topic2'=[
['message'=>'message1'],
['message'=>'message2'],
['message'=>'message3'],
]
],pollTimeout,flushTimeout)
开始消费
./yii kafka/consume consumeName