lireincore / kafka-rest-client
Kafka rest客户端
dev-master
2020-05-21 19:20 UTC
Requires
- php: >=7.1
- ext-json: *
- psr/http-client: ^1.0
- psr/http-factory: ^1.0
- psr/http-message: ^1.0
- psr/log: ^1.0
This package is auto-updated.
Last update: 2024-09-22 09:29:57 UTC
README
关于
适用于confluent rest proxy v2的Kafka rest客户端
安装
将"lireincore/kafka-rest-client": "dev-master"
包添加到你的composer.json
文件中的require
部分
或者
$ php composer.phar require lireincore/kafka-rest-client dev-master
使用方法
use Psr\Log\LoggerInterface; use Psr\Http\Client\ClientInterface; use Psr\Http\Message\StreamFactoryInterface; use Psr\Http\Message\RequestFactoryInterface; use LireinCore\KafkaRestClient\Client; use LireinCore\KafkaRestClient\Producer; use LireinCore\KafkaRestClient\Consumer; use LireinCore\KafkaRestClient\KafkaRestException; use LireinCore\KafkaRestClient\Request\SendMessagesRequest; use LireinCore\KafkaRestClient\Request\ConsumerCreateRequest; use LireinCore\KafkaRestClient\Request\ConsumerAssignmentRequest; use LireinCore\KafkaRestClient\Request\GetMessagesRequest; //$client implements Psr\Http\Client\ClientInterface //$requestFactory implements Psr\Http\Message\RequestFactoryInterface //$streamFactory implements Psr\Http\Message\StreamFactoryInterface //$logger implements Psr\Log\LoggerInterface $kafkaClient = new Client('rest-host:8082', $client, $requestFactory, $streamFactory, $logger); //produce message $producer = new Producer($kafkaClient); $request = (new SendMessagesRequest('test_topic')) ->addRecord('test value'); $response = $producer->send($request); /***************************************************************/ //consume message $consumer = new Consumer($kafkaClient); $consumerCreateRequest = new ConsumerCreateRequest('test_group'); $consumerCreateResponse = $consumer->create($consumerCreateRequest); $consumerAssignmentRequest = (new ConsumerAssignmentRequest()) ->addPartition('test_topic', 0); $consumer->assign($consumerAssignmentRequest, $consumerCreateResponse); $getMessagesRequest = new GetMessagesRequest(); $messages = $consumer->pool($getMessagesRequest, $consumerCreateResponse); if ($messages) { //...custom process messages //commit last offsets $consumerCommitOffsetsRequest = $consumer->createConsumerCommitOffsetsRequest($messages); try { $consumer->commit($consumerCommitOffsetsRequest, $consumerCreateResponse); } catch (KafkaRestException $ex) { var_dump($ex->getMessage()); } } $consumer->delete($consumerCreateResponse);
许可证
MIT许可证(MIT)。更多信息请参阅许可证文件。