ytake/php-ksql

KSQL 是 Apache Kafka 的流式 SQL 引擎。PHP 的 REST 客户端

维护者

详细信息

github.com/ytake/php-ksql

源代码

问题

资助包维护!
ytake

2.0.0 2020-07-16 01:52 UTC

This package is auto-updated.

Last update: 2024-08-25 20:34:57 UTC


README

Apache kafka / Confluent KSQL REST 客户端 for php

Build Status Coverage Status Scrutinizer Code Quality StyleCI

License Latest Version Total Downloads

什么是 KSQL?

KSQL 是 Apache Kafka 的流式 SQL 引擎。

什么是 KSQL?

安装

需要 >= PHP 7.1

$ composer require ytake/php-ksql

用法

请求预设

语法参考

获取命令状态

<?php

use Ytake\KsqlClient\RestClient;
use Ytake\KsqlClient\Query\CommandStatus;
use Ytake\KsqlClient\Computation\CommandId;

$client = new RestClient(
    "http://localhost:8088"
);
$result = $client->requestQuery(
    new CommandStatus(CommandId::fromString('stream/MESSAGE_STREAM/create'))
)->result();

获取状态

<?php

use Ytake\KsqlClient\RestClient;
use Ytake\KsqlClient\Query\Status;

$client = new RestClient(
    "http://localhost:8088"
);
$result = $client->requestQuery(new Status())->result();

获取 KSQL 服务器信息

<?php

use Ytake\KsqlClient\RestClient;
use Ytake\KsqlClient\Query\ServerInfo;

$client = new RestClient(
    "http://localhost:8088"
);
$result = $client->requestQuery(new ServerInfo())->result();

查询 KSQL

<?php

use Ytake\KsqlClient\RestClient;
use Ytake\KsqlClient\Query\Ksql;

$client = new RestClient(
    "http://localhost:8088"
);
$result = $client->requestQuery(
    new Ksql('DESCRIBE users_original;')
)->result();

流式响应客户端

<?php

use Ytake\KsqlClient\StreamClient;
use Ytake\KsqlClient\Query\Stream;
use Ytake\KsqlClient\StreamConsumable;
use Ytake\KsqlClient\Entity\StreamedRow;

$client = new StreamClient(
    "http://localhost:8088"
);
$result = $client->requestQuery(
    new Stream(
        'SELECT * FROM testing',
        new class() implements StreamConsumable {
            public function __invoke(StreamedRow $row) 
            {
                // stream response consumer
            }
        }    
    )
)->result();