nirmalsharma / laravel-kafka-php
0.0.3
2023-01-10 07:15 UTC
Requires
- php: >=7.2
This package is auto-updated.
Last update: 2024-09-10 11:06:20 UTC
README
安装 Kafka 包装器
composer require nirmalsharma/laravel-kafka-php
示例
在代码中使用 Kafka 生产者。
在 web.php 中添加以下内容:
use KafkaProducer; $topic = "kafka-topic"; $data = [ "user_ref" => "usr.123456", "message" => "Hello World" ]; $key = "usr.123456"; // Optional, Default null $headers = [ "ContentType" => "application/json", "Timezone" => "GMT +05:30" ]; // Optional KafkaProducer::push($topic, $data, $key, $headers);
在代码中使用 Kafka 消费者。
控制台 KafkaConsumer 代码
namespace App\Console\Commands; use App\Handlers\TestHandler; use Illuminate\Console\Command; use KafkaConsumer; class TestTopicConsumer extends Command { protected $signature = 'kafka:test-consume {--partition=} {--consumer-group=} {--topic=} {--dlq-topic=}'; protected $description = 'Kafka consumer!!'; public function handle(): void { KafkaConsumer::createConsumer(new TestHandler); } public function setKafkaConfig(){ $partition = $this->option('partition'); if( $partition != null){ config([ "kafka.partition" => $partition ]); } $consumer_group_id = $this->option('consumer-group'); if( !empty($consumer_group_id)){ config([ "kafka.consumer_group_id" => $consumer_group_id ]); } $topic = $this->option('topic'); if( !empty($topic)){ config([ "kafka.topic" => $topic ]); } $dlq_topic = $this->option('dlq-topic'); if( !empty($dlq_topic)){ config([ "kafka.dlq_topic" => $dlq_topic ]); } } } TestHandler.php ----------------- namespace App\Handlers; use Illuminate\Support\Facades\Log; class TestHandler { public function __invoke( $message) { dump([ "partition" => $message['raw']->partition, "key" => $message['key'] ]); } }
要开始监听消息,请运行以下命令
php artisan kafka:test-consume --consumer-group=test-local --topic=demo-topic --dlq-topic=demo-dlq
您可以使用 KafkaConsumerException 在业务逻辑或消费者处理逻辑中强制发送消息到死信队列(DLQ)
use Nirmalsharma\LaravelKafkaPhp\Exceptions\KafkaConsumerException;
throw new KafkaConsumerException('Not valid.');
环境变量
要运行此程序,您需要将以下环境变量添加到您的 .env 文件中。配置参考:https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
IS_KAFKA_ENABLED= // Default: 1
KAFKA_BROKERS=
KAFKA_DEBUG= // Default: false
KAFKA_SSL_PROTOCOL= // Default: plaintext
KAFKA_COMPRESSION_TYPE= // Default: none
KAFKA_IDEMPOTENCE= // Default: false
KAFKA_CONSUMER_GROUP_ID=
KAFKA_OFFSET_RESET= // Default: latest
KAFKA_AUTO_COMMIT= // Default: true
KAFKA_DEBUG=false
KAFKA_DLQ_TOPIC=
KAFKA_TOPIC=
作者
许可证
特性
- 轻量级 Kafka 包装器
- 易于在 PHP 中生产 Kafka 事件和消费。