ytake / php-ksql
KSQL 是 Apache Kafka 的流式 SQL 引擎。PHP 的 REST 客户端
2.0.0
2020-07-16 01:52 UTC
Requires
- php: ^7.1
- ext-curl: *
- ext-json: *
- fig/http-message-util: ^1.1.4
- guzzlehttp/guzzle: ^6.5.5
Requires (Dev)
- monolog/monolog: ^1.23
- pdepend/pdepend: ^2.5.2
- phploc/phploc: *
- phpmd/phpmd: @stable
- phpunit/phpunit: ^7.1.4
- satooshi/php-coveralls: ^2.0.0
- sebastian/phpcpd: *
- sensiolabs/security-checker: ^4.1.8
README
Apache kafka / Confluent KSQL REST 客户端 for php
什么是 KSQL?
KSQL 是 Apache Kafka 的流式 SQL 引擎。
安装
需要 >= PHP 7.1
$ composer require ytake/php-ksql
用法
请求预设
获取命令状态
<?php use Ytake\KsqlClient\RestClient; use Ytake\KsqlClient\Query\CommandStatus; use Ytake\KsqlClient\Computation\CommandId; $client = new RestClient( "http://localhost:8088" ); $result = $client->requestQuery( new CommandStatus(CommandId::fromString('stream/MESSAGE_STREAM/create')) )->result();
获取状态
<?php use Ytake\KsqlClient\RestClient; use Ytake\KsqlClient\Query\Status; $client = new RestClient( "http://localhost:8088" ); $result = $client->requestQuery(new Status())->result();
获取 KSQL 服务器信息
<?php use Ytake\KsqlClient\RestClient; use Ytake\KsqlClient\Query\ServerInfo; $client = new RestClient( "http://localhost:8088" ); $result = $client->requestQuery(new ServerInfo())->result();
查询 KSQL
<?php use Ytake\KsqlClient\RestClient; use Ytake\KsqlClient\Query\Ksql; $client = new RestClient( "http://localhost:8088" ); $result = $client->requestQuery( new Ksql('DESCRIBE users_original;') )->result();
流式响应客户端
<?php use Ytake\KsqlClient\StreamClient; use Ytake\KsqlClient\Query\Stream; use Ytake\KsqlClient\StreamConsumable; use Ytake\KsqlClient\Entity\StreamedRow; $client = new StreamClient( "http://localhost:8088" ); $result = $client->requestQuery( new Stream( 'SELECT * FROM testing', new class() implements StreamConsumable { public function __invoke(StreamedRow $row) { // stream response consumer } } ) )->result();