fr05t1k/phpclickhouse

php ClickHouse客户端库

0.17.12.2 2018-01-23 12:50 UTC

This package is auto-updated.

Last update: 2024-09-09 23:46:51 UTC


README

特性

  • 无依赖,仅使用curl
  • 选择并行查询(异步)
  • 并行从CSV文件批量插入
  • 启用_http_compression,用于批量插入
  • 查找活动主机并检查集群
  • 选择WHERE IN ( 本地csv文件 )
  • SQL条件和模板
  • 表格大小 & 数据库大小
  • listPartitions
  • dropPartition & dropOldPartitions
  • 在集群中truncateTable
  • 将数组作为列插入
  • 在集群中获取主节点副本
  • 在所有节点中获取tableSize
  • 异步获取ClickHouse进度

仓库中的俄语文章在habr上

安装composer

composer require smi2/phpclickhouse

或安装子模块

git submodule add https://github.com/smi2/phpClickHouse.git
git submodule init

# update
git submodule update --init --recursive
git submodule update --remote

Packagist

开始

连接并选择数据库

$config = [
    'host' => '192.168.1.1',
    'port' => '8123',
    'username' => 'default',
    'password' => ''
];
$db = new ClickHouseDB\Client($config);
$db->database('default');
$db->setTimeout(1.5);      // 1500 ms
$db->setTimeout(10);       // 10 seconds
$db->setConnectTimeOut(5); // 5 seconds

显示表格

print_r($db->showTables());

创建表格

$db->write('
    CREATE TABLE IF NOT EXISTS summing_url_views (
        event_date Date DEFAULT toDate(event_time),
        event_time DateTime,
        site_id Int32,
        site_key String,
        views Int32,
        v_00 Int32,
        v_55 Int32
    )
    ENGINE = SummingMergeTree(event_date, (site_id, site_key, event_time, event_date), 8192)
');

显示创建表格

echo $db->showCreateTable('summing_url_views');

插入数据

$stat = $db->insert('summing_url_views',
    [
        [time(), 'HASH1', 2345, 22, 20, 2],
        [time(), 'HASH2', 2345, 12, 9,  3],
        [time(), 'HASH3', 5345, 33, 33, 0],
        [time(), 'HASH3', 5345, 55, 0, 55],
    ],
    ['event_time', 'site_key', 'site_id', 'views', 'v_00', 'v_55']
);

选择

$statement = $db->select('SELECT * FROM summing_url_views LIMIT 2');

使用Statement工作

// Count select rows
$statement->count();

// Count all rows
$statement->countAll();

// fetch one row
$statement->fetchOne();

// get extremes min
print_r($statement->extremesMin());

// totals row
print_r($statement->totals());

// result all
print_r($statement->rows());

// totalTimeRequest
print_r($statement->totalTimeRequest());

// raw answer JsonDecode array, for economy memory
print_r($statement->rawData());

// raw curl_info answer
print_r($statement->responseInfo());

// human size info
print_r($statement->info());

// if clickhouse-server version >= 54011
$db->settings()->set('output_format_write_statistics',true);
print_r($statement->statistics());

将选择结果作为树形显示

$statement = $db->select('
    SELECT event_date, site_key, sum(views), avg(views)
    FROM summing_url_views
    WHERE site_id < 3333
    GROUP BY event_date, url_hash
    WITH TOTALS
');

print_r($statement->rowsAsTree('event_date.site_key'));

/*
(
    [2016-07-18] => Array
        (
            [HASH2] => Array
                (
                    [event_date] => 2016-07-18
                    [url_hash] => HASH2
                    [sum(views)] => 12
                    [avg(views)] => 12
                )
            [HASH1] => Array
                (
                    [event_date] => 2016-07-18
                    [url_hash] => HASH1
                    [sum(views)] => 22
                    [avg(views)] => 22
                )
        )
)
*/

删除表格

$db->write('DROP TABLE IF EXISTS summing_url_views');

特性

选择并行查询(异步)

$state1 = $db->selectAsync('SELECT 1 as ping');
$state2 = $db->selectAsync('SELECT 2 as ping');

// run
$db->executeAsync();

// result
print_r($state1->rows());
print_r($state2->fetchOne('ping'));

并行从CSV文件大量插入

$file_data_names = [
    '/tmp/clickHouseDB_test.1.data',
    '/tmp/clickHouseDB_test.2.data',
    '/tmp/clickHouseDB_test.3.data',
    '/tmp/clickHouseDB_test.4.data',
    '/tmp/clickHouseDB_test.5.data',
];

// insert all files
$stat = $db->insertBatchFiles(
    'summing_url_views',
    $file_data_names,
    ['event_time', 'site_key', 'site_id', 'views', 'v_00', 'v_55']
);

并行错误

selectAsync不执行executeAsync

$select = $db->selectAsync('SELECT * FROM summing_url_views LIMIT 1');
$insert = $db->insertBatchFiles('summing_url_views', ['/tmp/clickHouseDB_test.1.data'], ['event_time']);
// 'Exception' with message 'Queue must be empty, before insertBatch, need executeAsync'

见example/exam5_error_async.php

Gzip & enable_http_compression

在飞读CSV文件并压缩zlib.deflate。

$db->settings()->max_execution_time(200);
$db->enableHttpCompression(true);

$result_insert = $db->insertBatchFiles('summing_url_views', $file_data_names, [...]);


foreach ($result_insert as $fileName => $state) {
    echo $fileName . ' => ' . json_encode($state->info_upload()) . PHP_EOL;
}

见example/exam8_http_gzip_batch_insert.php

表格大小 & 数据库大小

结果以人类可读大小显示

print_r($db->databaseSize());
print_r($db->tablesSize());
print_r($db->tableSize('summing_partions_views'));

分区

$count_result = 2;
print_r($db->partitions('summing_partions_views', $count_result));

删除分区(预生产)

$count_old_days = 10;
print_r($db->dropOldPartitions('summing_partions_views', $count_old_days));

// by `partition_id`
print_r($db->dropPartition('summing_partions_views', '201512'));

选择WHERE IN ( 本地csv文件 )

$file_name_data1 = '/tmp/temp_csv.txt'; // two column file [int,string]
$whereIn = new \ClickHouseDB\WhereInFile();
$whereIn->attachFile($file_name_data1, 'namex', ['site_id' => 'Int32', 'site_hash' => 'String'], \ClickHouseDB\WhereInFile::FORMAT_CSV);
$result = $db->select($sql, [], $whereIn);

// see example/exam7_where_in.php

简单的sql条件和模板

conditions已弃用,如需使用:$db->enableQueryConditions();

带有QueryConditions的示例

$db->enableQueryConditions();

$input_params = [
  'select_date' => ['2000-10-10', '2000-10-11', '2000-10-12'],
  'limit' => 5,
  'from_table' => 'table'
];

$select = '
    SELECT * FROM {from_table}
    WHERE
    {if select_date}
        event_date IN (:select_date)
    {else}
        event_date=today()
    {/if}
    {if limit}
    LIMIT {limit}
    {/if}
';

$statement = $db->selectAsync($select, $input_params);
echo $statement->sql();

/*
SELECT * FROM table
WHERE
event_date IN ('2000-10-10','2000-10-11','2000-10-12')
LIMIT 5
FORMAT JSON
*/

$input_params['select_date'] = false;
$statement = $db->selectAsync($select, $input_params);
echo $statement->sql();

/*
SELECT * FROM table
WHERE
event_date=today()
LIMIT 5
FORMAT JSON
*/

$state1 = $db->selectAsync(
    'SELECT 1 as {key} WHERE {key} = :value',
    ['key' => 'ping', 'value' => 1]
);

// SELECT 1 as ping WHERE ping = "1"

exam16_custom_degeneration.php中自定义查询降级示例

SELECT {ifint VAR} result_if_intval_NON_ZERO{/if}
SELECT {ifint VAR} result_if_intval_NON_ZERO {else} BLA BLA{/if}

设置

三种方式设置任何设置

// in array config
$config = [
    'host' => 'x',
    'port' => '8123',
    'username' => 'x',
    'password' => 'x',
    'settings' => ['max_execution_time' => 100]
];
$db = new ClickHouseDB\Client($config);

// settings via constructor
$config = [
    'host' => 'x',
    'port' => '8123',
    'username' => 'x',
    'password' => 'x'
];
$db = new ClickHouseDB\Client($config, ['max_execution_time' => 100]);

// set method
$config = [
    'host' => 'x',
    'port' => '8123',
    'username' => 'x',
    'password' => 'x'
];
$db = new ClickHouseDB\Client($config);
$db->settings()->set('max_execution_time', 100);

// apply array method
$db->settings()->apply([
    'max_execution_time' => 100,
    'max_block_size' => 12345
]);

// check
if ($db->settings()->getSetting('max_execution_time') !== 100) {
    throw new Exception('Bad work settings');
}

// see example/exam10_settings.php

使用session_id与ClickHouse

useSession() - 创建新的session_id或使用现有的useSession(value)

// enable session_id
$db->useSession();
$sesion_AA=$db->getSession(); // return session_id

$db->write(' CREATE TEMPORARY TABLE IF NOT EXISTS temp_session_test (number UInt64)');
$db->write(' INSERT INTO temp_session_test SELECT number*1234 FROM system.numbers LIMIT 30');

// reconnect to continue with other session
$db->useSession($sesion_AA);

数组作为列

$db->write('
    CREATE TABLE IF NOT EXISTS arrays_test_string (
        s_key String,
        s_arr Array(String)
    )
    ENGINE = Memory
');

$db->insert('arrays_test_string',
    [
        ['HASH1', ["a", "dddd", "xxx"]],
        ['HASH1', ["b'\tx"]],
    ],
    ['s_key', 's_arr']
);

// see example/exam12_array.php

用于格式化行的类

var_dump(
    \ClickHouseDB\FormatLine::CSV(
        ['HASH1', ["a", "dddd", "xxx"]]
    )
);

var_dump(
    \ClickHouseDB\FormatLine::TSV(
        ['HASH1', ["a", "dddd", "xxx"]]
    )
);

// example write to file
$row=['event_time'=>date('Y-m-d H:i:s'),'arr1'=>[1,2,3],'arrs'=>["A","B\nD\nC"]];
file_put_contents($fileName,\ClickHouseDB\FormatLine::TSV($row)."\n",FILE_APPEND);

集群删除旧分区

示例代码

class my
{
    /**
     * @return \ClickHouseDB\Cluster
     */
    public function getClickHouseCluster()
    {
            return $this->_cluster;
    }

    public function msg($text)
    {
            echo $text."\n";
    }

    private function cleanTable($dbt)
    {

        $sizes=$this->getClickHouseCluster()->getSizeTable($dbt);
        $this->msg("Clean table : $dbt,size = ".$this->humanFileSize($sizes));

        // split string "DB.TABLE"
        list($db,$table)=explode('.',$dbt);

        // Get Master node for table
        $nodes=$this->getClickHouseCluster()->getMasterNodeForTable($dbt);
        foreach ($nodes as $node)
        {
            $client=$this->getClickHouseCluster()->client($node);

            $size=$client->database($db)->tableSize($table);

            $this->msg("$node \t {$size['size']} \t {$size['min_date']} \t {$size['max_date']}");

            $client->dropOldPartitions($table,30,30);
        }
    }

    public function clean()
    {
        $this->msg("clean");

        $this->getClickHouseCluster()->setScanTimeOut(2.5); // 2500 ms
        $this->getClickHouseCluster()->setSoftCheck(true);
        if (!$this->getClickHouseCluster()->isReplicasIsOk())
        {
            throw new Exception('Replica state is bad , error='.$this->getClickHouseCluster()->getError());
        }

        $this->cleanTable('model.history_full_model_sharded');

        $this->cleanTable('model.history_model_result_sharded');
    }
}

HTTPS

$db = new ClickHouseDB\Client($config);
$db->settings()->https();

只读ClickHouse用户

$config = [
    'host' => '192.168.1.20',
    'port' => '8123',
    'username' => 'ro',
    'password' => 'ro',
    'readonly' => true
];

直接写入文件

从ClickHouse发送结果,不解析json。

$WriteToFile=new ClickHouseDB\WriteToFile('/tmp/_1_select.csv');
$db->select('select * from summing_url_views',[],null,$WriteToFile);
// or
$db->selectAsync('select * from summing_url_views limit 4',[],null,new ClickHouseDB\WriteToFile('/tmp/_3_select.tab',true,'TabSeparatedWithNames'));
$db->selectAsync('select * from summing_url_views limit 4',[],null,new ClickHouseDB\WriteToFile('/tmp/_4_select.tab',true,'TabSeparated'));
$statement=$db->selectAsync('select * from summing_url_views limit 54',[],null,new ClickHouseDB\WriteToFile('/tmp/_5_select.csv',true,ClickHouseDB\WriteToFile::FORMAT_CSV));

插入关联批量

 $oneRow = [
            'one' => 1,
            'two' => 2,
            'thr' => 3,
            ];
            $failRow = [
                'two' => 2,
                'one' => 1,
                'thr' => 3,
            ];

$db->insertAssocBulk([$oneRow, $oneRow, $failRow])

progressFunction

// Apply function

$db->progressFunction(function ($data) {
    echo "CALL FUNCTION:".json_encode($data)."\n";
});
$st=$db->select('SELECT number,sleep(0.2) FROM system.numbers limit 5');


// Print
// ...
// CALL FUNCTION:{"read_rows":"2","read_bytes":"16","total_rows":"0"}
// CALL FUNCTION:{"read_rows":"3","read_bytes":"24","total_rows":"0"}
// ...

调试 & 详细

$cl->verbose();

PHPUnit测试

在phpunit.xml更改常量

<php>
    <const name="phpunit_clickhouse_host" value="192.168.1.20" />
    <const name="phpunit_clickhouse_port" value="8123" />
    <const name="phpunit_clickhouse_user" value="default" />
    <const name="phpunit_clickhouse_pass" value="" />
    <const name="phpunit_clickhouse_tmp_path" value="/tmp/" />
</php>

许可证

MIT

变更日志

2017-12-28

  • 修复FORMAT JSON,如果SQL中设置了FORMAT
  • GetRaw() - 如果不是json,则结果为原始响应SELECT number as format_id FROM system.numbers LIMIT 3 FORMAT CSVWithNames

2017-12-22

  • progressFunction()
  • 转义值

2017-12-12

  • 如果SQL中设置了FORMAT,则不要设置FORMAT JSON

2017-11-22

  • 添加insertAssocBulk

2017-08-25

  • 修复tablesSize(),使用数据库过滤器
  • 修复partitions(),使用数据库过滤器

2017-08-14

  • 添加session_id支持

2017-02-20

  • 构建composer 0.17.02

2016-12-09

  • 对于只读用户需要设置:client->setReadOnlyUser(true);$confi['readonly'],见exam19_readonly_user.php

2016-11-25

  • client->truncateTable('tableName')
  • cluster->getMasterNodeForTable('dbName.tableName') // 节点具有is_leader=1
  • cluster->getSizeTable('dbName.tableName')
  • cluster->getTables()
  • cluster->truncateTable('dbName.tableName')
  • 见示例cluster_06_truncate_table.php

2016-11-24

  • 添加cluster->setSoftCheck()
  • insertBatchFiles()支持$file_names - 字符串或数组,$columns_array - 数组或null
  • 添加insertBatchStream()返回\Curler\Request不执行
  • writeStreamData()返回\Curler\Request
  • 修复httpCompression(false)
  • \Curler\Request获取headers作为数组
  • Request中添加setReadFunction( function() )
  • 添加StreamInsert类,直接从stream_resource读取到clickhouse:stream

2016-11-04

  • 添加$db->insertBatchTSVFiles(),
  • $db->insertBatchFiles(,,,format) 中添加格式参数。
  • 已弃用类 CSV
  • 添加静态类 \ClickHouseDB\FormatLine:CSV(),\ClickHouseDB\FormatLine:TSV(),\ClickHouseDB\FormatLine:Insert()
  • CSV RFC4180 - \ClickHouseDB\FormatLine::CSV(Array))."\n"
  • 更新 exam12_array.php + 单元测试

2016-11-03

  • $db->enableLogQueries(true) - 写入系统.query_log
  • $db->enableExtremes(true); - 默认极端值现在,已禁用
  • $db->isExists($database,$table)

2016-10-27

  • 添加连接超时,$db->setConnectTimeOut(5);
  • 将默认连接超时改为 5 秒。之前是 1 秒。
  • 将 DNS_CACHE 默认值改为 120 秒

2016-10-25 发布 0.16.10

  • 修复超时错误并添加测试

2016-10-23

2016-09-20 发布 0.16.09

  • 版本/发布名称:[零点年点月]
  • 支持集群:新的类 Cluster 和 ClusterQuery
  • output_format_write_statistics,对于 clickhouse 版本 > v1.1.54019-stable
  • 在 select,selectAsync 中写入到文件
  • 绑定与条件的退化
  • $db->select(new Query("Select..."));
  • 删除 findActiveHostAndCheckCluster , clusterHosts , checkServerReplicas
  • 添加 cleanQueryDegeneration(),addQueryDegeneration()
  • 使用 Conditions 需要 $db->enableQueryConditions(); 默认 Conditions - 禁用;
  • CurlerRequest->timeOut(2.5) = 2500 毫秒
  • tablesSize() - 添加 sizebytes

2016-08-11 发布 0.2.0

  • 错误写入时抛出异常

2016-08-06 发布 0.1.0

  • 初始化