s008nyx / kafka-bus
Kafka bus 集成到 Laravel/Lumen
0.2.1
2021-10-15 07:30 UTC
Requires
- php: ^7.2 || ^8.0
- ext-json: *
- ext-rdkafka: *
- illuminate/config: ^8.0 || ^7.0
- illuminate/contracts: ^8.0 || ^7.0
- illuminate/support: ^8.0 || ^7.0
Requires (Dev)
- mockery/mockery: 1.*
- phpunit/phpunit: 8.*
README
安装
安装包
composer require s008nyx/kafka-bus
对于 Lumen
打开你的 bootstrap/app.php
文件,并在
注册容器绑定
部分之前添加此行
$app->configure('kafka-bus');
- 在
注册服务提供者
部分添加此行
$app->register(\KafkaBus\KafkaBusServiceProvider::class);
添加环境变量
KAFKA_BROKERS="kafka-node01:9093,kafka-node02:9093" KAFKA_AUTOCOMMIT=true KAFKA_GROUP_ID="myGroup" KAFKA_SECURITY_PROTOCOL=SASL_SSL KAFKA_SASL_MECHANISMS=SCRAM-SHA-512 KAFKA_SASL_PASSWORD=password KAFKA_SASL_USERNAME=username KAFKA_SSL_CA_LOCATION=/path/to/ca.crt KAFKA_SSL_CERTIFICATE_LOCATION=/path/to/chain.crt
使用方法
创建处理器
MyHandler.php
<?php use KafkaBus\Error; use KafkaBus\KafkaHandler; use RdKafka\Message; class MyHandler implements KafkaHandler { /** * Topics list * @return array */ public function getTopics(): array { return ['myTopic']; } /** * Processing success message * @param Message $message * @return bool */ public function process(Message $message): bool { // Do something } /** * Processing fail message * @param Error $error * @return bool */ public function error(Error $error): bool { // Do something } }
创建 artisan 命令
KafkaCommand.php
<?php namespace App\Console\Commands; use KafkaBus\Consumer; use Illuminate\Console\Command; class KafkaConsumer extends Command { /** * The name and signature of the console command. * * @var string */ protected $signature = 'kafka:consume'; /** * @param Consumer $consumer */ public function handle(Consumer $consumer) { try { $consumer->consume(new MyHandler()); } catch (\Exception $e) { $this->error($e->getMessage()); } } }
启动命令
php artisan kafka:consume