dev-this / ksqldb-php
PHP 8 的异步 ksqlDB 客户端
dev-master
2021-08-13 15:34 UTC
Requires
- php: >=8.0
- amphp/http-client: ^4.6
Requires (Dev)
- phpunit/phpunit: ^9.0
This package is auto-updated.
Last update: 2024-09-13 22:43:41 UTC
README
目前处于开发中。API 的稳定性在 v1 版本之前不能保证。
需要 PHP 8
composer require dev-this/ksqldb-php
特性
- 异步操作(感谢 amphp/amp!)
- 所有 Confluent 期望的客户功能
- 支持推送/拉取查询(HTTP/2)(https://docs.ksqldb.io/en/latest/developer-guide/ksqldb-rest-api/streaming-endpoint/)
- 终止推送查询
- 将新行数据插入现有的 ksqlDB 流
- 列出现有的流、表、主题和查询
- 创建和删除流和表
- 终止持久查询
- 原生
Amp\Promise功能
使用方法
创建客户端
有一个用于创建客户端的工厂可用
DevThis\KsqlDB\ClientFactory::create(string $hostname): DevThis\KsqlDB\Client
直到调用客户端命令之前,不会建立 HTTP 连接。
使用方法
$hostname = 'https://:8088'; $client = (new DevThis\KsqlDB\Factory\ClientFactory())->create($hostname);
流式回调
流式查询需要一个实现了回调接口的回调类。建立流是故意阻塞的,直到接收到头部(包括查询 ID)。
DevThis\KsqlDB\Factory\ClientFactory::stream(Statement $statement, StreamCallback $callback): Amp\Promise
回调类必须实现 StreamCallback
interface StreamCallback { // Invoked once, at the start of the stream // StreamHeader has getters for the query ID, and column names and their data types. public function onHeader(StreamHeader $header): void; // OnEvent will be invoked on each new event // StreamEvent is an \ArrayObject public function onEvent(StreamEvent $event): void; }
使用方法
use DevThis\KsqlDB\Interfaces\StreamCallback; use DevThis\KsqlDB\Statement $transactionStatement = new Statement("SELECT * FROM transactions EMIT CHANGES;"); $transactionHandler = new class implements StreamCallback { public function onHeader(StreamHeader $header): void { echo sprintf(">Query ID: %s\n", $header->getQueryId()); } public function onEvent(StreamEvent $event): void { echo "Processing new transaction\n"; // do something with $event... } } $stream = $client->stream($transactionStatement, $transactionHandler); // Query ID echo $stream->getQueryId(); // Terminate the query $client->terminate($stream); // wait indefinitely \Amp\Promise\wait($promise);
执行语句
执行语句与流式语句类似。主要区别是执行语句不是连续操作。
DevThis\KsqlDB\Client::execute(Statement $statement): ArrayObject
ArrayObject 将包含响应。
功能示例
异步应用程序,将自食其果。消耗它创建的非常事件
use DevThis\KsqlDB\Interfaces\StreamCallback; use DevThis\KsqlDB\Statement; use DevThis\KsqlDB\Factory\ClientFactory; use DevThis\KsqlDB\StreamEvent; use DevThis\KsqlDB\StreamHeader; $client = (new ClientFactory())->create('https://:8088'); $createStatement = new Statement("CREATE STREAM cool_data ( id VARCHAR KEY, message VARCHAR, timestamp VARCHAR, ) WITH ( kafka_topic = 'cool_data', partitions = 1, value_format = 'avro', timestamp = 'timestamp', timestamp_format = 'yyyy-MM-dd''T''HH:mm:ss' );"); $streamStatement = new Statement("SELECT * FROM cool_data EMIT CHANGES;"); $coolDataCallback = new class implements \DevThis\KsqlDB\Interfaces\StreamCallback { private const SCHEMA_ID = 0; private const SCHEMA_MESSAGE = 1; private const SCHEMA_TIMESTAMP = 2; public function onHeader(StreamHeader $header): void { echo sprintf(">Query ID: %s\n", $header->getQueryId()); echo sprintf(">Columns: %s", print_r($header->getColumns(), true)); echo "--------------------\n"; } public function onEvent(StreamEvent $event): void { echo "Processing new transaction\n"; echo sprintf(">ID: %s\n", $event[static::SCHEMA_ID]); echo sprintf(">Message: %s\n", $event[static::SCHEMA_MESSAGE]); echo sprintf(">Timestamp: %s\n", $event[static::SCHEMA_TIMESTAMP]); } }; $stream = $client->execute($createStatement); // Run event loop // https://amphp.org/amp/event-loop/ \Amp\Loop::run(function () use ($client) { $stream = $client->streamAsync($streamStatement, $coolDataCallback); Loop::repeat(1000, static function() { // insert into stream example. }); // Terminate stream after 100 seconds. Loop::delay(1000 * 100, static function () use ($client, $stream) { $client->terminateStream($stream->getQueryId()); }); });