thiagobrauer/laravel-kafka

此包最新版本(dev-main)没有提供许可证信息。

dev-main 2021-08-18 11:58 UTC

This package is auto-updated.

Last update: 2024-09-18 19:40:30 UTC


README

此包基于 anam-hossain 的示例

安装

  1. 安装 librdkafka 库

  2. 安装 php-rdkafka PECL 扩展

  3. 使用 composer 安装此包

composer require thiagobrauer/laravel-kafka
  1. 发布包的配置文件
php artisan vendor:publish --provider="ThiagoBrauer\LaravelKafka\ServiceProvider"
  1. 将以下属性添加到您的 .env 文件中,根据需要更改值
KAFKA_PRODUCER_SERVERS=kafka:9092
KAFKA_PRODUCER_DEBUG=true
KAFKA_CONSUMER_SERVERS=kafka:9092
KAFKA_CONSUMER_TOPICS=inventories
KAFKA_CONSUMER_GROUP_ID=group1

您可以使用逗号(,)作为分隔符设置多个生产者服务器、消费者服务器和消费者主题。

用法

生产者

要发送消息,只需使用 KafkaProducer.php

use ThiagoBrauer\LaravelKafka\KafkaProducer;

...

$producer new KafkaProducer()
$producer->setTopic('topic1')->send('message');

消费者

首先,您需要创建一个类来处理接收到的消息。该类必须扩展 ThiagoBrauer\LaravelKafka\Handlers\MessageHandler 并实现 handle 方法,如下例所示

<?php

namespace App\Kafka\Handlers;

use ThiagoBrauer\LaravelKafka\Handlers\MessageHandler;
use RdKafka\Message;

class KafkaMessageHandler extends MessageHandler
{
    public function handle(Message $message)
    {
        var_dump($message);
    }
}

然后,将您的类添加到 config/laravel_kafka.php 文件中的 message_handlers 部分,按主题组织

...

'message_handlers' => [
    'topic1' => [
        App\Kafka\Handlers\KafkaMessageHandler::class   
    ],
    'topic2' => [
        App\Kafka\Handlers\KafkaMessageHandler::class   
    ]        
]

...

并运行 php artisan config:cache

之后,您就可以开始使用消费者了

php artisan kafka:consume

您可以使用 .env 文件中的变量或命令行选项来定义消费者配置

KAFKA_PRODUCER_SERVERS=kafka:9092
KAFKA_PRODUCER_DEBUG=true
KAFKA_PRODUCER_COMPRESSION=snappy
KAFKA_CONSUMER_SERVERS=kafka:9092
KAFKA_CONSUMER_TOPICS=inventories
KAFKA_CONSUMER_GROUP_ID=group1
KAFKA_CONSUMER_COMMIT_ASYNC=true
KAFKA_CONSUMER_TIMEOUT_MS=120000
KAFKA_CONSUMER_AUTO_OFFSET_RESET=earliest
KAFKA_CONSUMER_AUTO_COMMIT=true
php artisan kafka:consume --servers=kafka:9092
php artisan kafka:consume --topics=inventories
php artisan kafka:consume --group_id=group1
php artisan kafka:consume --timeout_ms=120000
php artisan kafka:consume --auto_offset_reset=earliest
php artisan kafka:consume --commit_async
php artisan kafka:consume --auto_commit
php artisan kafka:consume --once // consume only one message