fr05t1k / phpclickhouse
php ClickHouse客户端库
0.17.12.2
2018-01-23 12:50 UTC
Requires
- php: >=5.5.0
- ext-curl: *
Requires (Dev)
- ext-curl: *
- phpunit/phpunit: 5.5.*
This package is auto-updated.
Last update: 2024-09-09 23:46:51 UTC
README
特性
- 无依赖,仅使用curl
- 选择并行查询(异步)
- 并行从CSV文件批量插入
- 启用_http_compression,用于批量插入
- 查找活动主机并检查集群
- 选择WHERE IN ( 本地csv文件 )
- SQL条件和模板
- 表格大小 & 数据库大小
- listPartitions
- dropPartition & dropOldPartitions
- 在集群中truncateTable
- 将数组作为列插入
- 在集群中获取主节点副本
- 在所有节点中获取tableSize
- 异步获取ClickHouse进度
安装composer
composer require smi2/phpclickhouse
或安装子模块
git submodule add https://github.com/smi2/phpClickHouse.git
git submodule init
# update
git submodule update --init --recursive
git submodule update --remote
开始
连接并选择数据库
$config = [ 'host' => '192.168.1.1', 'port' => '8123', 'username' => 'default', 'password' => '' ]; $db = new ClickHouseDB\Client($config); $db->database('default'); $db->setTimeout(1.5); // 1500 ms $db->setTimeout(10); // 10 seconds $db->setConnectTimeOut(5); // 5 seconds
显示表格
print_r($db->showTables());
创建表格
$db->write(' CREATE TABLE IF NOT EXISTS summing_url_views ( event_date Date DEFAULT toDate(event_time), event_time DateTime, site_id Int32, site_key String, views Int32, v_00 Int32, v_55 Int32 ) ENGINE = SummingMergeTree(event_date, (site_id, site_key, event_time, event_date), 8192) ');
显示创建表格
echo $db->showCreateTable('summing_url_views');
插入数据
$stat = $db->insert('summing_url_views', [ [time(), 'HASH1', 2345, 22, 20, 2], [time(), 'HASH2', 2345, 12, 9, 3], [time(), 'HASH3', 5345, 33, 33, 0], [time(), 'HASH3', 5345, 55, 0, 55], ], ['event_time', 'site_key', 'site_id', 'views', 'v_00', 'v_55'] );
选择
$statement = $db->select('SELECT * FROM summing_url_views LIMIT 2');
使用Statement工作
// Count select rows $statement->count(); // Count all rows $statement->countAll(); // fetch one row $statement->fetchOne(); // get extremes min print_r($statement->extremesMin()); // totals row print_r($statement->totals()); // result all print_r($statement->rows()); // totalTimeRequest print_r($statement->totalTimeRequest()); // raw answer JsonDecode array, for economy memory print_r($statement->rawData()); // raw curl_info answer print_r($statement->responseInfo()); // human size info print_r($statement->info()); // if clickhouse-server version >= 54011 $db->settings()->set('output_format_write_statistics',true); print_r($statement->statistics());
将选择结果作为树形显示
$statement = $db->select(' SELECT event_date, site_key, sum(views), avg(views) FROM summing_url_views WHERE site_id < 3333 GROUP BY event_date, url_hash WITH TOTALS '); print_r($statement->rowsAsTree('event_date.site_key')); /* ( [2016-07-18] => Array ( [HASH2] => Array ( [event_date] => 2016-07-18 [url_hash] => HASH2 [sum(views)] => 12 [avg(views)] => 12 ) [HASH1] => Array ( [event_date] => 2016-07-18 [url_hash] => HASH1 [sum(views)] => 22 [avg(views)] => 22 ) ) ) */
删除表格
$db->write('DROP TABLE IF EXISTS summing_url_views');
特性
选择并行查询(异步)
$state1 = $db->selectAsync('SELECT 1 as ping'); $state2 = $db->selectAsync('SELECT 2 as ping'); // run $db->executeAsync(); // result print_r($state1->rows()); print_r($state2->fetchOne('ping'));
并行从CSV文件大量插入
$file_data_names = [ '/tmp/clickHouseDB_test.1.data', '/tmp/clickHouseDB_test.2.data', '/tmp/clickHouseDB_test.3.data', '/tmp/clickHouseDB_test.4.data', '/tmp/clickHouseDB_test.5.data', ]; // insert all files $stat = $db->insertBatchFiles( 'summing_url_views', $file_data_names, ['event_time', 'site_key', 'site_id', 'views', 'v_00', 'v_55'] );
并行错误
selectAsync不执行executeAsync
$select = $db->selectAsync('SELECT * FROM summing_url_views LIMIT 1'); $insert = $db->insertBatchFiles('summing_url_views', ['/tmp/clickHouseDB_test.1.data'], ['event_time']); // 'Exception' with message 'Queue must be empty, before insertBatch, need executeAsync'
见example/exam5_error_async.php
Gzip & enable_http_compression
在飞读CSV文件并压缩zlib.deflate。
$db->settings()->max_execution_time(200); $db->enableHttpCompression(true); $result_insert = $db->insertBatchFiles('summing_url_views', $file_data_names, [...]); foreach ($result_insert as $fileName => $state) { echo $fileName . ' => ' . json_encode($state->info_upload()) . PHP_EOL; }
见example/exam8_http_gzip_batch_insert.php
表格大小 & 数据库大小
结果以人类可读大小显示
print_r($db->databaseSize()); print_r($db->tablesSize()); print_r($db->tableSize('summing_partions_views'));
分区
$count_result = 2; print_r($db->partitions('summing_partions_views', $count_result));
删除分区(预生产)
$count_old_days = 10; print_r($db->dropOldPartitions('summing_partions_views', $count_old_days)); // by `partition_id` print_r($db->dropPartition('summing_partions_views', '201512'));
选择WHERE IN ( 本地csv文件 )
$file_name_data1 = '/tmp/temp_csv.txt'; // two column file [int,string] $whereIn = new \ClickHouseDB\WhereInFile(); $whereIn->attachFile($file_name_data1, 'namex', ['site_id' => 'Int32', 'site_hash' => 'String'], \ClickHouseDB\WhereInFile::FORMAT_CSV); $result = $db->select($sql, [], $whereIn); // see example/exam7_where_in.php
简单的sql条件和模板
conditions已弃用,如需使用:$db->enableQueryConditions();
带有QueryConditions的示例
$db->enableQueryConditions(); $input_params = [ 'select_date' => ['2000-10-10', '2000-10-11', '2000-10-12'], 'limit' => 5, 'from_table' => 'table' ]; $select = ' SELECT * FROM {from_table} WHERE {if select_date} event_date IN (:select_date) {else} event_date=today() {/if} {if limit} LIMIT {limit} {/if} '; $statement = $db->selectAsync($select, $input_params); echo $statement->sql(); /* SELECT * FROM table WHERE event_date IN ('2000-10-10','2000-10-11','2000-10-12') LIMIT 5 FORMAT JSON */ $input_params['select_date'] = false; $statement = $db->selectAsync($select, $input_params); echo $statement->sql(); /* SELECT * FROM table WHERE event_date=today() LIMIT 5 FORMAT JSON */ $state1 = $db->selectAsync( 'SELECT 1 as {key} WHERE {key} = :value', ['key' => 'ping', 'value' => 1] ); // SELECT 1 as ping WHERE ping = "1"
在exam16_custom_degeneration.php
中自定义查询降级示例
SELECT {ifint VAR} result_if_intval_NON_ZERO{/if}
SELECT {ifint VAR} result_if_intval_NON_ZERO {else} BLA BLA{/if}
设置
三种方式设置任何设置
// in array config $config = [ 'host' => 'x', 'port' => '8123', 'username' => 'x', 'password' => 'x', 'settings' => ['max_execution_time' => 100] ]; $db = new ClickHouseDB\Client($config); // settings via constructor $config = [ 'host' => 'x', 'port' => '8123', 'username' => 'x', 'password' => 'x' ]; $db = new ClickHouseDB\Client($config, ['max_execution_time' => 100]); // set method $config = [ 'host' => 'x', 'port' => '8123', 'username' => 'x', 'password' => 'x' ]; $db = new ClickHouseDB\Client($config); $db->settings()->set('max_execution_time', 100); // apply array method $db->settings()->apply([ 'max_execution_time' => 100, 'max_block_size' => 12345 ]); // check if ($db->settings()->getSetting('max_execution_time') !== 100) { throw new Exception('Bad work settings'); } // see example/exam10_settings.php
使用session_id与ClickHouse
useSession()
- 创建新的session_id或使用现有的useSession(value)
// enable session_id $db->useSession(); $sesion_AA=$db->getSession(); // return session_id $db->write(' CREATE TEMPORARY TABLE IF NOT EXISTS temp_session_test (number UInt64)'); $db->write(' INSERT INTO temp_session_test SELECT number*1234 FROM system.numbers LIMIT 30'); // reconnect to continue with other session $db->useSession($sesion_AA);
数组作为列
$db->write(' CREATE TABLE IF NOT EXISTS arrays_test_string ( s_key String, s_arr Array(String) ) ENGINE = Memory '); $db->insert('arrays_test_string', [ ['HASH1', ["a", "dddd", "xxx"]], ['HASH1', ["b'\tx"]], ], ['s_key', 's_arr'] ); // see example/exam12_array.php
用于格式化行的类
var_dump( \ClickHouseDB\FormatLine::CSV( ['HASH1', ["a", "dddd", "xxx"]] ) ); var_dump( \ClickHouseDB\FormatLine::TSV( ['HASH1', ["a", "dddd", "xxx"]] ) ); // example write to file $row=['event_time'=>date('Y-m-d H:i:s'),'arr1'=>[1,2,3],'arrs'=>["A","B\nD\nC"]]; file_put_contents($fileName,\ClickHouseDB\FormatLine::TSV($row)."\n",FILE_APPEND);
集群删除旧分区
示例代码
class my { /** * @return \ClickHouseDB\Cluster */ public function getClickHouseCluster() { return $this->_cluster; } public function msg($text) { echo $text."\n"; } private function cleanTable($dbt) { $sizes=$this->getClickHouseCluster()->getSizeTable($dbt); $this->msg("Clean table : $dbt,size = ".$this->humanFileSize($sizes)); // split string "DB.TABLE" list($db,$table)=explode('.',$dbt); // Get Master node for table $nodes=$this->getClickHouseCluster()->getMasterNodeForTable($dbt); foreach ($nodes as $node) { $client=$this->getClickHouseCluster()->client($node); $size=$client->database($db)->tableSize($table); $this->msg("$node \t {$size['size']} \t {$size['min_date']} \t {$size['max_date']}"); $client->dropOldPartitions($table,30,30); } } public function clean() { $this->msg("clean"); $this->getClickHouseCluster()->setScanTimeOut(2.5); // 2500 ms $this->getClickHouseCluster()->setSoftCheck(true); if (!$this->getClickHouseCluster()->isReplicasIsOk()) { throw new Exception('Replica state is bad , error='.$this->getClickHouseCluster()->getError()); } $this->cleanTable('model.history_full_model_sharded'); $this->cleanTable('model.history_model_result_sharded'); } }
HTTPS
$db = new ClickHouseDB\Client($config); $db->settings()->https();
只读ClickHouse用户
$config = [ 'host' => '192.168.1.20', 'port' => '8123', 'username' => 'ro', 'password' => 'ro', 'readonly' => true ];
直接写入文件
从ClickHouse发送结果,不解析json。
$WriteToFile=new ClickHouseDB\WriteToFile('/tmp/_1_select.csv'); $db->select('select * from summing_url_views',[],null,$WriteToFile); // or $db->selectAsync('select * from summing_url_views limit 4',[],null,new ClickHouseDB\WriteToFile('/tmp/_3_select.tab',true,'TabSeparatedWithNames')); $db->selectAsync('select * from summing_url_views limit 4',[],null,new ClickHouseDB\WriteToFile('/tmp/_4_select.tab',true,'TabSeparated')); $statement=$db->selectAsync('select * from summing_url_views limit 54',[],null,new ClickHouseDB\WriteToFile('/tmp/_5_select.csv',true,ClickHouseDB\WriteToFile::FORMAT_CSV));
插入关联批量
$oneRow = [ 'one' => 1, 'two' => 2, 'thr' => 3, ]; $failRow = [ 'two' => 2, 'one' => 1, 'thr' => 3, ]; $db->insertAssocBulk([$oneRow, $oneRow, $failRow])
progressFunction
// Apply function $db->progressFunction(function ($data) { echo "CALL FUNCTION:".json_encode($data)."\n"; }); $st=$db->select('SELECT number,sleep(0.2) FROM system.numbers limit 5'); // Print // ... // CALL FUNCTION:{"read_rows":"2","read_bytes":"16","total_rows":"0"} // CALL FUNCTION:{"read_rows":"3","read_bytes":"24","total_rows":"0"} // ...
调试 & 详细
$cl->verbose();
PHPUnit测试
在phpunit.xml更改常量
<php> <const name="phpunit_clickhouse_host" value="192.168.1.20" /> <const name="phpunit_clickhouse_port" value="8123" /> <const name="phpunit_clickhouse_user" value="default" /> <const name="phpunit_clickhouse_pass" value="" /> <const name="phpunit_clickhouse_tmp_path" value="/tmp/" /> </php>
许可证
MIT
变更日志
2017-12-28
- 修复
FORMAT JSON
,如果SQL中设置了FORMAT - GetRaw() - 如果不是json,则结果为原始响应
SELECT number as format_id FROM system.numbers LIMIT 3 FORMAT CSVWithNames
2017-12-22
- progressFunction()
- 转义值
2017-12-12
- 如果SQL中设置了FORMAT,则不要设置
FORMAT JSON
2017-11-22
- 添加insertAssocBulk
2017-08-25
- 修复tablesSize(),使用数据库过滤器
- 修复partitions(),使用数据库过滤器
2017-08-14
- 添加session_id支持
2017-02-20
- 构建composer 0.17.02
2016-12-09
- 对于只读用户需要设置:
client->setReadOnlyUser(true);
或$confi['readonly']
,见exam19_readonly_user.php
2016-11-25
client->truncateTable('tableName')
cluster->getMasterNodeForTable('dbName.tableName') // 节点具有is_leader=1
cluster->getSizeTable('dbName.tableName')
cluster->getTables()
cluster->truncateTable('dbName.tableName')
- 见示例cluster_06_truncate_table.php
2016-11-24
- 添加
cluster->setSoftCheck()
- insertBatchFiles()支持
$file_names
- 字符串或数组,$columns_array
- 数组或null - 添加insertBatchStream()返回
\Curler\Request
不执行 - writeStreamData()返回
\Curler\Request
- 修复httpCompression(false)
- 从
\Curler\Request
获取headers作为数组 - 在
Request
中添加setReadFunction( function() )
- 添加StreamInsert类,直接从stream_resource读取到clickhouse:stream
2016-11-04
- 添加
$db->insertBatchTSVFiles()
, - 在
$db->insertBatchFiles(,,,format)
中添加格式参数。 - 已弃用类 CSV
- 添加静态类
\ClickHouseDB\FormatLine:CSV(),\ClickHouseDB\FormatLine:TSV(),\ClickHouseDB\FormatLine:Insert()
- CSV RFC4180 -
\ClickHouseDB\FormatLine::CSV(Array))."\n"
- 更新 exam12_array.php + 单元测试
2016-11-03
$db->enableLogQueries(true)
- 写入系统.query_log$db->enableExtremes(true);
- 默认极端值现在,已禁用$db->isExists($database,$table)
2016-10-27
- 添加连接超时,$db->setConnectTimeOut(5);
- 将默认连接超时改为 5 秒。之前是 1 秒。
- 将 DNS_CACHE 默认值改为 120 秒
2016-10-25 发布 0.16.10
- 修复超时错误并添加测试
2016-10-23
- client->setTimeout($seconds)
- cluster->clientLike($cluster,$ip_addr_like)
- 从驱动器中删除所有迁移代码,移动到 https://github.com/smi2/phpMigrationsClickhouse
2016-09-20 发布 0.16.09
- 版本/发布名称:[零点年点月]
- 支持集群:新的类 Cluster 和 ClusterQuery
- output_format_write_statistics,对于 clickhouse 版本 > v1.1.54019-stable
- 在 select,selectAsync 中写入到文件
- 绑定与条件的退化
- $db->select(new Query("Select..."));
- 删除 findActiveHostAndCheckCluster , clusterHosts , checkServerReplicas
- 添加 cleanQueryDegeneration(),addQueryDegeneration()
- 使用 Conditions 需要 $db->enableQueryConditions(); 默认 Conditions - 禁用;
- CurlerRequest->timeOut(2.5) = 2500 毫秒
- tablesSize() - 添加
sizebytes
2016-08-11 发布 0.2.0
- 错误写入时抛出异常
2016-08-06 发布 0.1.0
- 初始化