daalvand / kafka
此包用于从kafka生产和消费
1.0.0
2021-09-24 10:20 UTC
Requires
- php: ^7.4|^8.0
- ext-rdkafka: *
README
- 此包用于在Laravel中kafka的消费和生产
安装
安装Kafka
Apache Kafka是我们生态系统许多部分所需要的
-
从Apache Kafka C/C++客户端库安装librdkafka
对于Ubuntu:
apt install librdkafka-dev
。对于CentOS:
yum install librdkafka-devel
-
然后从手动安装扩展构建PHP扩展
git clone https://github.com/arnaud-lb/php-rdkafka.git cd php-rdkafka phpize ./configure make all -j 5 sudo make install
-
然后将扩展添加到
php.ini
extension=rdkafka.so
-
然后重启php-fpm服务
service php-fpm restart
安装包
- 运行
composer require daalvand/kafka
2. 发布提供者
Laravel
php artisan vendor:publish --provider="Daalvand\Kafka\KafkaServiceProvider"
Lumen
- 将服务提供者添加到bootstrap/app.php文件中
<?php $app->register(Daalvand\Kafka\KafkaServiceProvider::class);
- 从
/vendor/daalvand/kafka/src/config
复制配置文件到/config
目录。然后在/bootstrap/app.php
文件中进行配置
<?php $app->configure("kafka");
使用方法
生产者
<?php use Daalvand\Kafka\Message\ProducerMessage; use Daalvand\Kafka\Facades\Producer; $producer = Producer::withAdditionalBroker('localhost:9092') ->build(); $message = (new ProducerMessage('topic-name', 0)) ->withKey('test-key') ->withBody('some test message payload') ->withHeaders(['header' => 'value']); $producer->produce($message); $producer->flush(-1);
消费者
<?php use Daalvand\Kafka\Facades\Consumer; use Daalvand\Kafka\Exceptions\ConsumerConsumeException; use Daalvand\Kafka\Exceptions\ConsumerEndOfPartitionException; use Daalvand\Kafka\Exceptions\ConsumerTimeoutException; $consumer = Consumer::withAdditionalConfig([ 'compression.codec' => 'lz4', 'auto.commit.interval.ms' => 500 ]) ->withAdditionalBroker('kafka:9092') ->withConsumerGroup('testGroup') ->withAdditionalSubscription('test-topic') ->build(); $consumer->subscribe(); while (true) { try { $message = $consumer->consume(); // your business logic $consumer->commit($message); } catch (ConsumerTimeoutException $e) { //no messages were read in a given time } catch (ConsumerEndOfPartitionException $e) { //only occurs if enable.partition.eof is true (default: false) } catch (ConsumerConsumeException $e) { // Failed } }
auto.offset.reset
选项对于0.9之前版本的kafka是(largest, smallest)有效的值,而对于0.9之后的版本是(earliest, latest)