dnkfk/yii2-kafka

yii2的kafka客户端

安装次数: 1

依赖关系: 0

建议者: 0

安全性: 0

星标: 2

关注者: 1

分支: 0

开放问题: 0

类型:yii2-extension

v1.0.8 2024-09-29 08:59 UTC

This package is auto-updated.

Last update: 2024-09-29 09:04:41 UTC


README

需求

1、yii2

2、php >= 7.4

3、RdKafka扩展,扩展安装如下

    通过git源码编译安装:

    安装librdkafka:
    git clone https://github.com/edenhill/librdkafka.git
    cd librdkafka
    ./configure
    make && make install
    
    安装php-rdkafka扩展:
    git clone https://github.com/arnaud-lb/php-rdkafka.git
    cd php-rdkafka
    phpize
    ./configure --with-php-config=/usr/local/php7.4/bin/php-config
    make && make install

    ubuntu系统环境下安装:

    sudo apt-get install librdkafka-dev
    pecl install rdkafka
    
    php.ini配置:
    extension = rdkafka.so

使用docker镜像文件构建kafka服务,示例:tmp-docker-compose.yml

在yii2中使用kafka

 composer require dnkfk/yii2-kafka

消费者组件配置

component/kafka.php

    return [
        'class'   => \Dnkfk\KafkaConnection::class,
        'conn'    => ['localhost:9092'], //连接主机配置
        'topics'  => ['test_topic'], //所有需要订阅的主题
        'binding' => [
            [
                'consumer' => 'test_consumer', //消费者名称
                'group'    => 'test_group', //消费组
                'topics'   => ['test_topic'], //订阅主题
                'callback' => TestConsumer::class, //消费者回调
            ]
        ]
    ];

发布消息到主题代码示例

直接发布消息

    $msg = ['msg' => 'test']; //$msg可以是数组或字符串
    \Yii::$app->kafka->produce($msg, 'test_topic');

批量发布消息

    //构建生产者主题
    $kafka = \Yii::$app->kafka->buildProducerTopic('test_topic');
    //批量发布消息到主题
    for ($i = 0; $i < 10; $i++) {
        $kafka->produceMsg("message {$i}");
    }
    //等待消息全部发布完成
    $kafka->wait();

启动消费者

php yii kafka/consume test_consumer

消费者消费代码示例

    use Dnkfk\ConsumerInterface;
    use Dnkfk\Message;
    
    class TestConsumer implements ConsumerInterface
    {
    
        /**
         * 执行消费
         *
         * @param Message $message
         * @return void
         */
        public function execute(Message $message)
        {
            $payload = json_decode($message->getPayload(), true);
    
            var_dump($payload); //输出:['msg' => 'test']
        }
    }