ziffmedia / laravel-ksql
Requires
- php: ^8.1
- ziffmedia/ksql: ^0.4
Requires (Dev)
- calebporzio/sushi: ^2.4
- laravel/pint: ^1.4
- mockery/mockery: ^1.5
- orchestra/testbench: ^7.19
- pestphp/pest: ^1.22
- pestphp/pest-plugin-laravel: ^1.4
This package is auto-updated.
Last update: 2024-09-15 23:07:35 UTC
README
A Laravel-focused client for KsqlDB's REST api. This package is designed to integrate tightly with laravel but does not strictly require it. The core KSQL client only depends on symfony/http-client
.
需求
- PHP 8.1 或更高版本
安装
使用 composer 安装包:composer require ziffmedia/laravel-ksql
发布 ksql.php
配置文件: php artisan vendor:publish --provider "ZiffMedia\LaravelKsql\KsqlServiceProvider"
使用
手动构建基本 KSQL 客户端
$client = new Ziffmedia\LaravelKsql\Client("http://my.ksqldb.server:8088");
手动构建带 HTTP 基本认证的基本 KSQL 客户端
$client = new Ziffmedia\LaravelKsql\Client("http://my.ksqldb.server:8088", "myusername", "mypassword");
注意:如果使用 Confluent Cloud,用户名和密码应分别是 KSQL API 密钥和 API 密钥。
使用 Laravel DI 容器基于配置值工厂客户端
$client = app(ZiffMedia\LaravelKsql\KsqlClient::class)
配置文件和环境变量
<?php return [ "endpoint" => env("KSQL_ENDPOINT"), "auth" => [ "username" => env("KSQL_USERNAME"), "password" => env("KSQL_PASSWORD") ], "consumer_queries" => [ // examples included, but this block is only necessary if using the consumer command "simple_example" => "SELECT * FROM foo EMIT CHANGES;", "custom_event_example" => [ "query" => "SELECT * FROM foo EMIT CHANGES;", "event" => App\Events\FooChanged::class ] ] ];
The KsqlQueryResult Class
All queries (streaming or otherwise) will return an instance of Ziffmedia\LaravelKsql\Ksql\QueryResult
. This class is a value object that contains the following public properties
class QueryResult { public string $query; // the sql string used to produce this result public array $columns; // an associative array keyed by column names in the result, with values equal to the data type for that column public string|null $name; // the convenience name key used for this query. Value will be null unless this result was produced from multiplexing public array $data; // an associative array of column name to column value. This will represent one row of a results data set. }
The KsqlStreamChanged Event
This class is a Laravel-native event. It contains one public property, which is the QueryResult object that caused that event to be emitted. You can listen to this event in your Laravel application, and add business logic based on the contents of the QueryResult
. For more granular control, you can subclass this event to enable queuing and more control over discrete listening for result types when multiplexing or using the ksql-consumer
command. Any method that allows emitting of events in the client will take an optional classname to use for the event emitted.
class KsqlStreamChanged { public QueryResult $result; }
非流查询
$client = new Ziffmedia\LaravelKsql\Client("http://my.ksqldb.server:8088"); // full synchronous query returning an array of QueryResult objects /** @var QueryResult[] $result */ $result = $client->query("SELECT * FROM MYTABLE LIMIT 5"); // use an optional row handler $client->query("SELECT * FROM MYTABLE LIMIT 5", function(QueryResult $row) { dump($row); }) // emit events using the built-in event class $client->query("SELECT * FROM MYTABLE LIMIT 5", true); // emit events using a custom event class $client->query("SELECT * FROM MYTABLE LIMIT 5", App\Event\MyTableChanged::class);
流查询
查询单个流(查询以 "EMIT CHANGES" 结尾)与 ->query()
方法具有相同的基本功能,但 ->stream()
方法旨在在命令行环境中用于长运行进程。 ->stream()
除非在网络级别断开连接或从 KsqlDB HTTP 服务器接收到 EOF,否则不会返回。
$client = new Ziffmedia\LaravelKsql\Client("http://my.ksqldb.server:8088"); // emit events using the built-in event class $client->stream("SELECT * FROM MYTABLE EMIT CHANGES"); // use an optional row handler $client->stream("SELECT * FROM MYTABLE EMIT CHANGES", function(QueryResult $row) { dump($row); }) // emit events using a custom event class $client->stream("SELECT * FROM MYTABLE EMIT CHANGES", App\Event\MyTableChanged::class);
多路复用多个流查询
您可能希望实际使用 HTTP/2 连接池同时多路复用多个流查询。使用 ->multiplex()
方法返回一个流多路复用器对象,可以在调用 ->stream()
开始流式传输之前使用多个查询调用 "构建"。
对于多路复用查询,每个 ->query()
调用都支持与单个流查询相同的选项,包括事件发射事件、使用可调用行处理程序等。每个对 ->query()
的调用都有一个额外的前置参数,该参数是该查询的 "名称键"。这既用于内部标识来自哪个流的数据,也返回到您的 QueryResult
实例中。
$client = new Ziffmedia\LaravelKsql\Client("http://my.ksqldb.server:8088"); $client->multiplex() ->query('mytable', "SELECT * FROM MYTABLE EMIT CHANGES") ->query('yourtable', 'SELECT * FROM YOURTABLE EMIT CHANGES') ->stream();
使用内置的消费者命令
此包提供了一个方便的 artisan
命令,可以读取您的 config/ksql.php
中的流查询列表,并对多个查询发出事件到您的应用程序。这包括查询流并具有在流更改时执行应用程序逻辑的钩子的常见用例。简单地在您的 Laravel 应用程序中创建事件监听器,并运行 artisan ksql-consumer
,您的监听器将在数据通过 KSQL 流(或表)流动时被触发。
$> php artisan ksql-consumer
配置文件语法还提供了一个功能,可以针对每个查询指定自定义事件类,尽管它不允许使用可调用的行处理器。如果您需要行处理器而不是事件监听器(不常见),您需要使用上面的客户端示例来创建自己的工具。