dnkfk / yii2-kafka
yii2的kafka客户端
v1.0.8
2024-09-29 08:59 UTC
Requires
- php: >=7.4
- ext-json: *
- ext-rdkafka: *
- yiisoft/yii2: ~2.0.49
README
需求
1、yii2
2、php >= 7.4
3、RdKafka扩展,扩展安装如下
通过git源码编译安装:
安装librdkafka:
git clone https://github.com/edenhill/librdkafka.git
cd librdkafka
./configure
make && make install
安装php-rdkafka扩展:
git clone https://github.com/arnaud-lb/php-rdkafka.git
cd php-rdkafka
phpize
./configure --with-php-config=/usr/local/php7.4/bin/php-config
make && make install
ubuntu系统环境下安装:
sudo apt-get install librdkafka-dev
pecl install rdkafka
php.ini配置:
extension = rdkafka.so
使用docker镜像文件构建kafka服务,示例:tmp-docker-compose.yml
在yii2中使用kafka
composer require dnkfk/yii2-kafka
消费者组件配置
component/kafka.php
return [ 'class' => \Dnkfk\KafkaConnection::class, 'conn' => ['localhost:9092'], //连接主机配置 'topics' => ['test_topic'], //所有需要订阅的主题 'binding' => [ [ 'consumer' => 'test_consumer', //消费者名称 'group' => 'test_group', //消费组 'topics' => ['test_topic'], //订阅主题 'callback' => TestConsumer::class, //消费者回调 ] ] ];
发布消息到主题代码示例
直接发布消息
$msg = ['msg' => 'test']; //$msg可以是数组或字符串 \Yii::$app->kafka->produce($msg, 'test_topic');
批量发布消息
//构建生产者主题 $kafka = \Yii::$app->kafka->buildProducerTopic('test_topic'); //批量发布消息到主题 for ($i = 0; $i < 10; $i++) { $kafka->produceMsg("message {$i}"); } //等待消息全部发布完成 $kafka->wait();
启动消费者
php yii kafka/consume test_consumer
消费者消费代码示例
use Dnkfk\ConsumerInterface; use Dnkfk\Message; class TestConsumer implements ConsumerInterface { /** * 执行消费 * * @param Message $message * @return void */ public function execute(Message $message) { $payload = json_decode($message->getPayload(), true); var_dump($payload); //输出:['msg' => 'test'] } }