dev-this/ksqldb-php

PHP 8 的异步 ksqlDB 客户端

dev-master 2021-08-13 15:34 UTC

This package is auto-updated.

Last update: 2024-09-13 22:43:41 UTC


README

目前处于开发中。API 的稳定性在 v1 版本之前不能保证。

需要 PHP 8

composer require dev-this/ksqldb-php

特性

使用方法

创建客户端

有一个用于创建客户端的工厂可用

DevThis\KsqlDB\ClientFactory::create(string $hostname): DevThis\KsqlDB\Client

直到调用客户端命令之前,不会建立 HTTP 连接。

使用方法

$hostname = 'https://:8088';

$client = (new DevThis\KsqlDB\Factory\ClientFactory())->create($hostname);

流式回调

流式查询需要一个实现了回调接口的回调类。建立流是故意阻塞的,直到接收到头部(包括查询 ID)。

DevThis\KsqlDB\Factory\ClientFactory::stream(Statement $statement, StreamCallback $callback): Amp\Promise

回调类必须实现 StreamCallback

interface StreamCallback {
    // Invoked once, at the start of the stream
    // StreamHeader has getters for the query ID, and column names and their data types.
    public function onHeader(StreamHeader $header): void;
    
    // OnEvent will be invoked on each new event
    // StreamEvent is an \ArrayObject
    public function onEvent(StreamEvent $event): void;
}

使用方法

use DevThis\KsqlDB\Interfaces\StreamCallback;
use DevThis\KsqlDB\Statement

$transactionStatement = new Statement("SELECT * FROM transactions EMIT CHANGES;");

$transactionHandler = new class implements StreamCallback {
    public function onHeader(StreamHeader $header): void
    {
        echo sprintf(">Query ID: %s\n", $header->getQueryId());
    }

    public function onEvent(StreamEvent $event): void
    {
        echo "Processing new transaction\n";
        // do something with $event...
    }
}

$stream = $client->stream($transactionStatement, $transactionHandler);
// Query ID
echo $stream->getQueryId();

// Terminate the query
$client->terminate($stream);

// wait indefinitely
\Amp\Promise\wait($promise);

执行语句

执行语句与流式语句类似。主要区别是执行语句不是连续操作。

DevThis\KsqlDB\Client::execute(Statement $statement): ArrayObject

ArrayObject 将包含响应。

功能示例

异步应用程序,将自食其果。消耗它创建的非常事件

use DevThis\KsqlDB\Interfaces\StreamCallback;
use DevThis\KsqlDB\Statement;
use DevThis\KsqlDB\Factory\ClientFactory;
use DevThis\KsqlDB\StreamEvent;
use DevThis\KsqlDB\StreamHeader;

$client = (new ClientFactory())->create('https://:8088');

$createStatement = new Statement("CREATE STREAM cool_data (
    id VARCHAR KEY,
    message VARCHAR,
    timestamp VARCHAR,
) WITH (
    kafka_topic = 'cool_data',
    partitions = 1,
    value_format = 'avro',
    timestamp = 'timestamp',
    timestamp_format = 'yyyy-MM-dd''T''HH:mm:ss'
);");
$streamStatement = new Statement("SELECT * FROM cool_data EMIT CHANGES;");
$coolDataCallback = new class implements \DevThis\KsqlDB\Interfaces\StreamCallback {
    private const SCHEMA_ID = 0;
    private const SCHEMA_MESSAGE = 1;
    private const SCHEMA_TIMESTAMP = 2;

    public function onHeader(StreamHeader $header): void
    {
        echo sprintf(">Query ID: %s\n", $header->getQueryId());
        echo sprintf(">Columns: %s", print_r($header->getColumns(), true));
        echo "--------------------\n";
    }

    public function onEvent(StreamEvent $event): void
    {
        echo "Processing new transaction\n";
        echo sprintf(">ID: %s\n", $event[static::SCHEMA_ID]);
        echo sprintf(">Message: %s\n", $event[static::SCHEMA_MESSAGE]);
        echo sprintf(">Timestamp: %s\n", $event[static::SCHEMA_TIMESTAMP]);
    }
};

$stream = $client->execute($createStatement);

// Run event loop
// https://amphp.org/amp/event-loop/
\Amp\Loop::run(function () use ($client) {
    $stream = $client->streamAsync($streamStatement, $coolDataCallback);

    Loop::repeat(1000, static function() {
        // insert into stream example.
    });
    
    // Terminate stream after 100 seconds.
    Loop::delay(1000 * 100, static function () use ($client, $stream) {
        $client->terminateStream($stream->getQueryId());
    });
});

替代方案