istyle-inc/php-ksql

该软件包已被废弃,不再维护。作者建议使用 ytake/php-ksql 软件包。

KSQL 是 Apache Kafka 的流式 SQL 引擎。PHP 的 REST 客户端

2.0.0 2020-07-16 01:52 UTC

This package is auto-updated.

Last update: 2021-09-25 13:48:54 UTC


README

Apache kafka / Confluent KSQL REST 客户端 for php

Build Status Coverage Status Scrutinizer Code Quality StyleCI

License Latest Version Total Downloads

什么是 KSQL

KSQL 是 Apache Kafka 的流式 SQL 引擎。

什么是 KSQL?

安装

要求 >= PHP 7.1

$ composer require ytake/php-ksql

用法

请求预设

Ytake\KsqlClient\Query\CommandStatus
Ytake\KsqlClient\Query\Status
Ytake\KsqlClient\Query\ServerInfo
Ytake\KsqlClient\Query\Ksql
Ytake\KsqlClient\Query\Stream (用于流)

语法参考

获取命令状态

<?php

use Ytake\KsqlClient\RestClient;
use Ytake\KsqlClient\Query\CommandStatus;
use Ytake\KsqlClient\Computation\CommandId;

$client = new RestClient(
    "http://localhost:8088"
);
$result = $client->requestQuery(
    new CommandStatus(CommandId::fromString('stream/MESSAGE_STREAM/create'))
)->result();

获取状态

<?php

use Ytake\KsqlClient\RestClient;
use Ytake\KsqlClient\Query\Status;

$client = new RestClient(
    "http://localhost:8088"
);
$result = $client->requestQuery(new Status())->result();

获取 KSQL 服务器信息

<?php

use Ytake\KsqlClient\RestClient;
use Ytake\KsqlClient\Query\ServerInfo;

$client = new RestClient(
    "http://localhost:8088"
);
$result = $client->requestQuery(new ServerInfo())->result();

查询 KSQL

<?php

use Ytake\KsqlClient\RestClient;
use Ytake\KsqlClient\Query\Ksql;

$client = new RestClient(
    "http://localhost:8088"
);
$result = $client->requestQuery(
    new Ksql('DESCRIBE users_original;')
)->result();

流式响应客户端

<?php

use Ytake\KsqlClient\StreamClient;
use Ytake\KsqlClient\Query\Stream;
use Ytake\KsqlClient\StreamConsumable;
use Ytake\KsqlClient\Entity\StreamedRow;

$client = new StreamClient(
    "http://localhost:8088"
);
$result = $client->requestQuery(
    new Stream(
        'SELECT * FROM testing',
        new class() implements StreamConsumable {
            public function __invoke(StreamedRow $row) 
            {
                // stream response consumer
            }
        }    
    )
)->result();