level23/druid-client

Druid PHP 客户端,用于执行查询等操作


README

Build Coverage Status Packagist Version Total Downloads Quality Score Software License

本项目的目标是简化从 Druid 中选择数据的操作。

该项目提供了一个简单的查询构建器,用于创建复杂的 Druid 查询。

它还提供了一种管理 Druid 中数据源(表)以及从文件中导入新数据的方法。

要求

本软件包仅需要 Guzzle 作为依赖项。PHP 和 Guzzle 版本要求如下。

安装

要安装此软件包,您可以使用 composer

composer require level23/druid-client

您还可以将其作为 ZIP 文件下载,并将其包含在项目中,前提是您的项目中已有 Guzzle。

变更日志和升级

有关不同版本中的更改以及如何升级到最新版本的信息,请参阅 变更日志

Laravel/Lumen 支持

本软件包已针对 Laravel/Lumen 准备就绪。它可以在 Laravel/Lumen 项目中使用,但不是必需的。

Laravel

对于 Laravel,软件包将自动发现。

Lumen

如果您正在使用 Lumen 项目,只需将服务提供者在 bootstrap/app.php 中包含即可。

// Register the druid-client service provider
$app->register(Level23\Druid\DruidServiceProvider::class);

Laravel/Lumen 配置

您还应在 Laravel/Lumen 项目的 .env 中定义正确的端点 URL。

DRUID_BROKER_URL=http://broker.url:8082
DRUID_COORDINATOR_URL=http://coordinator.url:8081
DRUID_OVERLORD_URL=http://overlord.url:8090
DRUID_RETRIES=2
DRUID_RETRY_DELAY_MS=500
DRUID_TIMEOUT=60
DRUID_CONNECT_TIMEOUT=10
DRUID_POLLING_SLEEP_SECONDS=2

如果您正在使用 Druid 路由器进程,您还可以设置路由器 URL,然后它将用于代理、总管和协调器。

DRUID_ROUTER_URL=http://druid-router.url:8080

待办事项

  • 在 CompactTaskBuilder 中支持构建 metricSpec 和 DimensionSpec
  • 实现基于 Hadoop 的批量摄取(索引)
  • 实现 Avro 流和 Avro OCF 输入格式

示例

有关更多信息,请参阅 druid 单机教程中的几个示例。见 此页面

目录

文档

以下是一个如何使用此包的示例。

请查看内联注释以获取更多信息/反馈。

示例

<?php

error_reporting(E_ALL);
ini_set('display_errors', 'On');

include __DIR__ . '/../vendor/autoload.php';

use Level23\Druid\DruidClient;
use Level23\Druid\Types\Granularity;
use Level23\Druid\Filters\FilterBuilder;

$client = new DruidClient(['router_url' => 'https://router.url:8080']);

$response = $client->query('traffic-hits', Granularity::ALL)
    // REQUIRED: you have to select the interval where to select the data from.
    ->interval('now - 1 day', 'now')
    // Simple dimension select
    ->select('browser')
    // Select a dimension with a different output name.
    ->select('country_iso', 'Country')
    // Alternative way to select a dimension with a different output name. 
    // If you want, you can select multiple dimensions at once.
    ->select(['mccmnc' => 'carrierCode'])
    // Select a dimension, but change its value using a lookup function.
    ->lookup('carrier_title', 'mccmnc', 'carrierName', 'Unknown')
    // Select a dimension, but use an expression to change the value.
    ->selectVirtual("timestamp_format(__time, 'yyyy-MM-dd HH:00:00')", 'hour')
    // Summing a metric.
    ->sum('hits', 'totalHits')
    // Sum hits which only occurred at night
    ->sum('hits', 'totalHitsNight', function(FilterBuilder $filter) {
        $filter->whereInterval('__time', ['yesterday 20:00/today 6:00']); 
    })
    // Count the total number of rows (per the dimensions selected) and store it in totalNrRecords.
    ->count('totalNrRecords')
    // Count the number of dimensions. NOTE: Theta Sketch extension is required to run this aggregation.
    ->distinctCount('browser', 'numberOfBrowsers')
    // Build some filters.
    ->where('hits', '>', 1000)
    // When no operator is given, we assume an equals (=)
    ->where('browser', 'Yandex.Browser')
    ->orWhere('browser_version', '17.4.0')
    // Where filters using Closures are supported.
    ->orWhere(function (FilterBuilder $builder) {
        $builder->where('browser_version', '17.5.0');
        $builder->where('browser_version', '17.6.0');
    })
    // Filter using an IN filter.
    ->whereIn('video_id', [1, 152, 919])
    // Filter using a between filter. It's an inclusive filter, like "age >= 18 and age <= 99".   
    ->whereBetween('age', 18, 99)
    // Limit the number of results.
    ->limit(5)
    // Apply a having filter, this is applied after selecting the records. 
    ->having('totalHits', '>', 100)
    // Sort the results by this metric/dimension
    ->orderBy('totalHits', 'desc')
    // Execute the query. Optionally you can specify Query Context parameters.
    ->execute(['groupByIsSingleThreaded' => false, 'sortByDimsFirst' => true]);

DruidClient

DruidClient 类是所有操作开始的地方。您初始化 druid 客户端的一个实例,该实例包含您实例的配置。

DruidClient 构造函数有以下参数

有关配置设置的完整列表,请查看默认值,这些默认值在 DruidClient 类的 $config 属性中定义。

此类支持一些 Druid 的新功能。为了确保您的服务器支持这些功能,建议您提供 version 配置设置。

默认情况下,我们将使用 guzzle 客户端来处理应用程序和 Druid 服务器之间的连接。如果您想更改此设置,例如,因为您想使用代理,您可以使用自定义 guzzle 客户端来执行此操作。

自定义 guzzle 客户端的示例

// Create a custom guzzle client which uses an http proxy.
$guzzleClient = new GuzzleHttp\Client([
    'proxy' => 'tcp://:8125',
    'timeout' => 30,
    'connect_timeout' => 10
]);

// Create a new DruidClient, which uses our custom Guzzle Client 
$druidClient = new DruidClient(
    ['router_url' => 'http://druid.router.com'], 
    $guzzleClient
);

// Query stuff here.... 

DruidClient 类提供了各种方法。最常用的是 query() 方法,它允许您构建并执行查询。

DruidClient::auth()

如果您已配置了带有身份验证的 Druid 集群,您可以使用此方法提供您的用户名/密码。用户名/密码将以 HTTP Basic Auth 参数的形式发送到请求中。

另请参阅:https://druid.apache.org/docs/latest/operations/auth/

auth() 方法有两个参数

您还可以覆盖客户端并使用自己的机制。请参阅DruidClient

DruidClient::query()

query() 方法为您提供一个 QueryBuilder 实例,允许您构建查询然后执行它。

示例

$client = new DruidClient(['router_url' => 'https://router.url:8080']);

// retrieve our query builder, group the results per day.
$builder = $client->query('wikipedia', Granularity::DAY);

// Now build your query ....
// $builder->select( ... )->where( ... )->interval( ... );  

查询方法有两个参数

QueryBuilder 允许您选择维度、聚合度量数据、应用过滤条件和 having 过滤条件等。

当您未指定数据源时,您需要在查询构建器中稍后指定它。有各种方法可以这样做。请参阅QueryBuilder:数据源

有关查询构建器的更多信息,请参阅以下章节。

DruidClient::cancelQuery()

cancelQuery() 方法允许您取消查询。要取消查询,您必须知道其唯一标识符。当您执行查询时,您可以在查询上下文中自己指定唯一标识符。

示例

$client = new DruidClient(['router_url' => 'https://router.url:8080']);

// For example, this returns my-query6148716d3772c
$queryId = uniqid('my-query');

// Please note: this will be blocking until we have got result from druid.
// So cancellation has to be done within another php process. 
$result = $client
    ->query('wikipedia', Granularity::DAY) 
    ->interval('2015-09-12 00:00:00', '2015-09-13 00:00:00')
    ->select(['namespace', 'page'])
    ->execute(['queryId' => $queryId]);

现在您可以在另一个进程中取消此查询。例如,如果您将正在运行的查询存储在某个地方,您可以通过执行此操作来“停止”正在运行的查询

$client->cancelQuery('my-query6148716d3772c')

查询方法有1个参数

如果取消失败,该方法将抛出异常。否则,它不会返回任何结果。

另请参阅:https://druid.apache.org/docs/latest/querying/querying.html#query-cancellation

DruidClient::compact()

方法 compact() 返回一个 CompactTaskBuilder 对象,允许您构建一个压缩任务。

有关更多信息,请参阅 compact()

DruidClient::reindex()

方法 compact() 返回一个 IndexTaskBuilder 对象,允许您构建一个重新索引任务。

有关更多信息,请参阅 reindex()

DruidClient::taskStatus()

方法 taskStatus() 允许您获取任务标识符的状态。

有关更多信息及示例,请参阅 reindex()compact()

DruidClient::pollTaskStatus()

方法 pollTaskStatus() 允许您等待直到任务状态不是 RUNNING

有关更多信息及示例,请参阅 reindex()compact()

DruidClient::metadata()

方法 metadata() 返回一个 MetadataBuilder 对象,允许您从您的 druid 实例检索元数据。有关更多信息,请参阅元数据章节。

QueryBuilder: 通用查询方法

我们将描述一些通用方法,这些方法可以由(几乎)所有查询使用。

interval()

由于 Druid 是一个时序数据库,您始终需要指定您想要查询的时间范围。使用此方法,您可以做到这一点。

间隔方法非常灵活,支持各种参数格式。

所有这些示例都是有效的

// Select an interval with string values. Anything which can be parsed by the DateTime object
// can be given. Also, "yesterday" or "now" is valid.
$builder->interval('2019-12-23', '2019-12-24');

// When a string is given which contains a slash, we will split it for you and parse it as "begin/end".
$builder->interval('yesterday/now');

// A "raw" interval as druid uses them is also allowed
$builder->interval('2015-09-12T00:00:00.000Z/2015-09-13T00:00:00.000Z');

// You can also give DateTime objects
$builder->interval(new DateTime('yesterday'), new DateTime('now'));

// Carbon is also supported, as it extends DateTime
$builder->interval(Carbon::now()->subDay(), Carbon::now());

// Timestamps are also supported:
$builder->interval(1570643085, 1570729485);

开始日期应在结束日期之前。如果不是,将抛出 InvalidArgumentException

您可以多次调用此方法以从不同的数据集中选择。

方法 interval() 有以下参数

limit()

方法 limit() 允许您限制查询的结果集。

限制可以用于所有查询类型。然而,对于 TopN 查询和 Select 查询,它是强制性的。

参数 $offset 仅适用于 GroupByScan 查询,并且仅从 druid 版本 0.20.0 开始支持。

返回结果时跳过这么多行。跳过的行仍然需要内部生成然后丢弃,这意味着提高偏移量可能会导致查询使用额外的资源。

一起,$limit$offset 可以用于实现分页。但是,请注意,如果底层数据源在页面提取之间被修改,从而影响总体查询结果,那么不同的页面可能不会相互对齐。

示例

// Limit the result to 50 rows, but skipping the first 20 rows.
$builder->limit(50, 20);

方法 limit() 有以下参数

orderBy()

方法 orderBy() 允许您以给定方式对结果进行排序。此方法仅适用于 GroupByTopN 查询。您应使用 orderByDirection()

示例

$builder
  ->select('channel')
  ->longSum('deleted')
  ->orderBy('deleted', OrderByDirection::DESC)
  ->groupBy();

方法 orderBy() 有以下参数

有关排序顺序的更多信息,请参阅此页面:https://druid.apache.org/docs/latest/querying/sorting-orders.html

请注意:此方法因查询类型而异。请阅读以下内容以了解每种查询类型中此方法的工作方式。

GroupBy 查询

您可以多次调用此方法,向查询添加排序。GroupBy 查询仅允许在给出限制的情况下对结果进行排序。如果您不提供限制,我们将使用默认限制 999999

TopN 查询

对于此查询类型,调用此方法为强制要求。您应该使用您想按其排序结果的维度或度量来调用此方法。

orderByDirection()

orderByDirection() 方法允许您指定排序的方向。此方法仅适用于时间序列、选择和扫描查询。对于分组和 TopN 查询,请使用 orderBy()

示例

$response = $client->query('wikipedia', 'hour')
    ->interval('2015-09-12 00:00:00', '2015-09-13 00:00:00')
    ->longSum('deleted')    
    ->select('__time', 'datetime')
    ->orderByDirection(OrderByDirection::DESC)
    ->timeseries();

orderByDirection() 方法具有以下参数

pagingIdentifier()

pagingIdentifier() 允许您对结果集进行分页。这仅适用于 SELECT 查询。

当您执行选择查询时,您将返回一个分页标识符。要请求下一个“页面”,请在您的下一个请求中使用此分页标识符。

示例

// Build a select query
$builder = $client->query('wikipedia')
    ->interval('2015-09-12 00:00:00', '2015-09-13 00:00:00')
    ->select(['__time', 'channel', 'user', 'deleted', 'added'])    
    ->limit(10);

// Execute the query for "page 1"
$response1 = $builder->selectQuery();

// Now, request "page 2".
 $builder->pagingIdentifier($response1->getPagingIdentifier());

// Execute the query for "page 2".
$response2 = $builder->selectQuery($context);

分页标识符是一个数组,看起来像这样

  'wikipedia_2015-09-12T00:00:00.000Z_2015-09-13T00:00:00.000Z_2019-09-26T18:30:14.418Z' => 10,
)

pagingIdentifier() 方法具有以下参数

subtotals()

subtotals() 方法允许您检索查询中各个维度上的聚合。这与 MySQL 中的 WITH ROLLUP 逻辑相当。

注意:此方法仅适用于分组查询!

示例

// Build a groupBy query with subtotals
$response = $client->query('wikipedia')
    ->interval('2015-09-12 20:00:00', '2015-09-12 22:00:00')
    ->selectVirtual("timestamp_format(__time, 'yyyy-MM-dd HH:00:00')", 'hour')
    ->select('namespace')
    ->count('edits')
    ->longSum('added')
    // select all namespaces which begin with Draft.
    ->where('namespace', 'like', 'Draft%')
    ->subtotals([
        ['hour', 'namespace'], // get the results per hour, namespace 
        ['hour'], // get the results per hour
        [] // get the results in total (everything together)
    ])
    ->groupBy();

示例响应(注意:结果已转换为表格以获得更好的可读性)

+------------+---------------------+-------+-------+
| namespace  | hour                | added | edits | 
+------------+---------------------+-------+-------+
| Draft      | 2015-09-12 20:00:00 | 0     | 1     | 
| Draft talk | 2015-09-12 20:00:00 | 359   | 1     | 
| Draft      | 2015-09-12 21:00:00 | 656   | 1     |
+------------+---------------------+-------+-------+ 
|            | 2015-09-12 20:00:00 | 359   | 2     | 
|            | 2015-09-12 21:00:00 | 656   | 1     |
+------------+---------------------+-------+-------+ 
|            |                     | 1015  | 3     | 
+------------+---------------------+-------+-------+

如你所见,前三条记录是按‘小时’和‘命名空间’的结果。
然后,有两条记录仅按‘小时’。
最后,最后一条记录是‘总计’。

subtotals() 方法具有以下参数

metrics()

使用 metrics() 方法,您可以在执行 selectQuery() 时指定要选择的度量。

注意:此仅适用于选择查询类型!

示例

$result = $client->query('wikipedia')
    ->interval('2015-09-12 00:00:00', '2015-09-13 00:00:00')
    ->select(['__time', 'channel', 'user'])
    ->metrics(['deleted', 'added'])
    ->selectQuery();

metrics() 方法具有以下参数

dimensions()

使用 dimensions() 方法,您可以为搜索查询指定应使用的维度。

注意:此仅适用于搜索查询类型!另请参阅 搜索 查询。此方法不应与为其他查询类型选择维度混淆。有关选择查询维度的更多信息,请参阅 维度选择

// Build a Search Query
$response = $client->query('wikipedia')
    ->interval('2015-09-12 00:00:00', '2015-09-13 00:00:00')
    ->dimensions(['namespace', 'channel']) 
    ->searchContains('wikipedia')
    ->search();

dimensions() 方法具有以下参数

toArray()

toArray() 方法将尝试构建查询。我们将尝试自动检测最佳查询类型。然后,我们将构建查询并返回查询作为数组。

示例

$builder = $client->query('wikipedia')
    ->interval('2015-09-12 00:00:00', '2015-09-13 00:00:00')
    ->select(['__time', 'channel', 'user', 'deleted', 'added'])    
    ->limit(10);

// Show the query as an array
print_r($builder->toArray());

toArray() 方法具有以下参数

toJson()

toJson() 方法将尝试构建查询。我们将尝试自动检测最佳查询类型。然后,我们将构建查询并返回查询作为 JSON 字符串。

示例

$builder = $client->query('wikipedia')
    ->interval('2015-09-12 00:00:00', '2015-09-13 00:00:00')
    ->select(['__time', 'channel', 'user', 'deleted', 'added'])    
    ->limit(10);

// Show the query as an array
var_export($builder->toJson());

toJson() 方法具有以下参数

QueryBuilder: 数据源

默认情况下,您将指定查询中要从其中选择数据的数据源。例如

$builder = $client->query('wikipedia');

在本章中,我们将解释如何动态更改它,例如,连接其他数据源。

from()

您可以使用此方法覆盖/更改当前使用的数据源(如果有的话)。

您可以提供一个字符串,该字符串将被解释为 druid 数据源表。您还可以指定一个实现 DataSourceInterface 的对象。

此方法具有以下参数

$builder = $client->query('hits_short');

// For example, use a different dataSource if the given date is older than one week.
if( Carbon::parse($date)->isBefore(Carbon::now()->subWeek()) ) {
    $builder->from('hits_long');
}

fromInline()

内联数据源允许您查询嵌入在查询本身中的少量数据。当您想在不需要先加载数据的情况下在少量数据上编写查询时,它们很有用。它们也用作连接的输入。

每一行都是一个数组,其长度必须与列名列表的长度完全相同。每个数组的第一个元素对应于列名列表中的第一个列,依此类推。

另请参阅: https://druid.apache.org/docs/latest/querying/datasource.html#inline

此方法具有以下参数

$builder = $client->query()->fromInline(
    ["country", "city"]
    [
        ["United States", "San Francisco"], 
        ["Canada", "Calgary"]
    ]
)->select(["country", "city"]); // etc. 

join()

使用此方法可以连接另一个数据源。自 druid 版本 0.23.0 以来,此功能可用。

请注意,连接在 Druid 中作为子查询执行,这可能会对性能产生重大影响。

参见

$builder = $client->query('users')
    ->interval('now - 1 week', 'now')
    ->join('departments', 'dep', 'dep.id = users.department_id')
    ->select([ /*...*/ ]);

您也可以将子查询指定为连接。例如

$builder = $client->query('users')
    ->interval('now - 1 week', 'now')
    ->join(function(\Level23\Druid\Queries\QueryBuilder $subQuery) {
        $subQuery->from('departments')
            ->where('name', '!=', 'Staff');
    }, 'dep', 'dep.id = users.department_id')
    ->select([ /*...*/ ]);

您还可以指定另一个数据源作为值。例如,您可以创建一个新的 JoinDataSource 对象并将其作为值传递。然而,为此已经创建了简单的方法(例如 joinLookup()),所以您可能不需要使用此方法。当您想与内联数据连接时,这可能会很有用(您可以使用 InlineDataSource

此方法具有以下参数

leftJoin()

这与 join() 方法的工作方式相同,但连接类型始终为 LEFT。

innerJoin()

这与 join() 方法的工作方式相同,但连接类型始终为 INNER。

joinLookup()

使用此方法,您可以以数据源的方式连接查找。

查找数据源是以键值对为导向的,并且始终恰好有两列:k(键)和v(值),并且两者都是字符串。

示例

$builder = $client->query('users')
    ->interval('now - 1 week', 'now')
    ->join('departments', 'dep', 'users.department_id = dep.k')
    ->select('dep.v', 'departmentName')
    ->select('...')

参见:https://druid.apache.org/docs/latest/querying/datasource.html#lookup

此方法具有以下参数

union()

并集允许您将两个或多个表视为单个数据源。在 SQL 中,这是通过将 UNION ALL 操作符直接应用于表来完成的,称为“表级并集”。在原生查询中,这是通过“union”数据源来完成的。

使用原生并集数据源时,被并集的表不需要具有相同的模式。如果它们不完全匹配,则存在于一个表中但不在另一个表中存在的列将被视为在它们不存在的表中包含所有空值。

在两种情况下,都无法使用诸如表达式、列别名、JOIN、GROUP BY、ORDER BY 等功能与表并集。

参见:https://druid.apache.org/docs/latest/querying/datasource.html#union

示例

$builder = $client->query('hits_us')
    ->union(['hits_eu', 'hits_as'], true);

// This will result in a query on the dataSources: hits_us, hits_eu and hits_as.
// This is because the "append" argument is set to true.

$builder = $client->query('hits_us')
    ->union(['hits_eu', 'hits_as'], false);

// This will result in a query on the dataSources: hits_eu and hits_as.
// This is because the "append" argument is set to false. It will overwrite the current dataSource.

此方法具有以下参数

QueryBuilder: 维度选择

维度是您通常过滤的域,或按 分组 数据的字段。典型示例包括:国家、名称、城市等。

要选择 维度,您可以使用以下方法之一

select()

此方法具有以下参数

此方法允许您以各种方式选择维度,如上面示例所示。

您可以使用

简单维度选择

$builder->select('country_iso');

具有替代输出名称的维度选择

$builder->select('country_iso', 'Country');

一次选择多个维度

$builder->select(['browser', 'country_iso', 'age', 'gender']);

一次选择具有替代输出名称的多个维度

$builder->select([
    'browser'     => 'TheBrowser', 
    'country_iso' => 'CountryIso', 
    'age'         => 'Age',
    'gender'      => 'MaleOrFemale'
])

更改维度的输出类型

$builder->select('age', null, DataType::LONG);

lookup()

此方法允许您使用注册的查找函数查找维度。有关注册的查找函数的更多信息,请参阅以下页面

查找是将 ID 值转换为用户可读名称的便捷方式,例如将 user_id 转换为 username,而无需在数据集中存储用户名。

此方法具有以下参数

示例

$builder->lookup('lookupUsername', 'user_id', 'username', 'Unknown'); 

inlineLookup()

此方法允许您使用预定义的列表查找维度。

查找是将 ID 值转换为用户可读名称的便捷方式,例如将 category_id 转换为 category,而无需在数据集中存储类别。

此方法具有以下参数

示例

$departments = [
    1 => 'Administration',
    2 => 'Marketing',
    3 => 'Shipping',
    4 => 'IT',
    5 => 'Accounting',
    6 => 'Finance'
];

$builder->inlineLookup($departments, 'department_id', 'department', 'Unknown'); 

multiValueListSelect()

此维度规范仅保留给定列表中存在的值。

参见

示例

$builder->multiValueListSelect('tags', ['a', 'b', 'c'], 'testTags', DataType::STRING); 

multiValueRegexSelect()

此维度规范仅保留与正则表达式匹配的值。

参见

示例

$builder->multiValueRegexSelect('tags', '^test', 'testTags', DataType::STRING); 

multiValuePrefixSelect()

此维度规范仅保留与给定前缀匹配的值。

参见

示例

$builder->multiValuePrefixSelect('tags', 'test', 'testTags', DataType::STRING); 

QueryBuilder: 指标聚合

度量是您通常聚合的字段,例如求和此字段的值,典型示例包括:

  • 收入
  • 点击量
  • 点击/观看/购买次数
  • 转化率
  • 页面浏览量
  • 计数器

要聚合度量,您可以使用以下方法之一。

所有指标聚合都支持过滤器选择。如果提供了这个条件,指标聚合将只应用于与过滤器匹配的记录。

示例

// count how many page views are done by kids
$builder->longSum('pageViews', 'pageViewsByKids', function(FilterBuilder $filter) {
    $filter->where('age', '<=', 16); 
});

请参阅此页面: https://druid.apache.org/docs/latest/querying/aggregations.html

此方法使用以下参数

count()

此聚合将返回匹配过滤器的行数。

请注意,计数聚合器计算Druid行数,这并不总是反映摄取的原始事件数。这是因为Druid可以在摄取时配置汇总数据。要计算摄取数据的行数,请在摄取时包含计数聚合器,并在查询时包含longSum聚合器。

示例

$builder->count('nrOfResults');

sum()

sum()聚合计算值的总和,作为64位有符号整数。

注意:替代方案包括:longSum()doubleSum()floatSum(),您可以通过使用适当的方法名直接指定输出类型。这些方法没有$type参数。

示例

$builder->sum('views', 'totalViews');

sum()聚合方法有以下参数

min()

min()聚合计算所有度量值的最大值。

注意:替代方案包括:longMin()doubleMin()floatMin(),您可以通过使用适当的方法名直接指定输出类型。这些方法没有$type参数。

示例

$builder->min('age', 'minAge');

min()聚合方法有以下参数

max()

max()聚合计算所有度量值的最小值。

注意:替代方案包括:longMax()doubleMax()floatMax(),您可以通过使用适当的方法名直接指定输出类型。这些方法没有$type参数。

示例

$builder->max('age', 'maxAge');

max()聚合方法有以下参数

first()

first()聚合计算具有最小时间戳的度量值或不存在行时的0。

注意:替代方案包括:longFirst()doubleFirst()floatFirst()stringFirst(),您可以通过使用适当的方法名直接指定输出类型。这些方法没有$type参数。

示例

$builder->first('device');

first()聚合方法有以下参数

last()

last()聚合计算具有最大时间戳的度量值或不存在行时的0。

请注意,在启用汇总的段上使用最后聚合器的查询将返回汇总值,而不是原始摄取数据中的最后一个值。

注意:替代方案包括:longLast()doubleLast()floatLast()stringLast(),您可以通过使用适当的方法名直接指定输出类型。这些方法没有$type参数。

示例

$builder->last('email');

last()聚合方法有以下参数

any()

any()聚合将获取任何度量值。这也可能为null。

注意:替代方案包括:longAny()doubleAny()floatAny()stringAny(),您可以通过使用适当的方法名直接指定输出类型。这些方法没有$type参数。

示例

$builder->any('price');

any()聚合方法有以下参数

javascript()

javascript()聚合在一系列列上计算任意JavaScript函数(既可以是指标也可以是维度)。您的JavaScript函数应返回浮点值。

注意:基于JavaScript的功能默认是禁用的。请参阅Druid JavaScript编程指南,了解有关使用Druid的JavaScript功能的指南,包括如何启用它的说明: https://druid.apache.org/docs/latest/development/javascript.html

示例

$builder->javascript(
    'result',
    ['x', 'y'],
    "function(current, a, b)      { return current + (Math.log(a) * b); }",
    "function(partialA, partialB) { return partialA + partialB; }",
    "function()                   { return 10; }"
);

javascript()聚合方法有以下参数

hyperUnique()

hyperUnique() 聚合使用 HyperLogLog 计算在索引时间被聚合为 "hyperUnique" 指标维度的估计基数。

请注意:当 Theta Sketch 扩展可用时,请使用 distinctCount(),因为它要快得多。

有关更多信息,请参阅此页面:https://druid.apache.org/docs/latest/querying/hll-old.html#hyperunique-aggregator

此页面也非常好地解释了 hyperUnique 的用法:https://cleanprogrammer.net/getting-unique-counts-from-druid-using-hyperloglog/

示例

$builder->hyperUnique('dimension', 'myResult');

hyperUnique() 聚合方法有以下参数

cardinality()

cardinality() 聚合计算 Apache Druid(孵化版)维度集合的基数,使用 HyperLogLog 估计基数。

请注意:当 Theta Sketch 扩展可用时,请使用 distinctCount(),因为它要快得多。此聚合器也将比使用 hyperUnique() 聚合器索引列慢得多。

通常,我们强烈建议您在不需要关注维度的单个值的情况下,使用 distinctCount()hyperUnique() 聚合器而不是 cardinality() 聚合器。

当将 $byRow 设置为 false(默认值)时,它计算由所有给定维度的所有维度值的并集组成的集合的基数。对于单个维度,这相当于

SELECT COUNT(DISTINCT (dimension))
FROM <datasource>

对于多个维度,这相当于类似以下的内容

SELECT COUNT(DISTINCT (value))
FROM (SELECT dim_1 as value
      FROM <datasource>
      UNION
      SELECT dim_2 as value
      FROM <datasource>
      UNION
      SELECT dim_3 as value
      FROM <datasource>)

当将 $byRow 设置为 true 时,它按行计算基数,即不同维度组合的基数。这相当于类似以下的内容

SELECT COUNT(*)
FROM (SELECT DIM1, DIM2, DIM3 FROM <datasource> GROUP BY DIM1, DIM2, DIM3)

有关更多信息,请参阅:https://druid.apache.org/docs/latest/querying/hll-old.html#cardinality-aggregator

示例

$builder->cardinality( 'nrOfCategories', ['category_id']);    

您还可以使用一个 Closure 函数,它将接收一个 DimensionBuilder。这样,您可以构建更复杂的情况,例如

$builder->cardinality(
    'itemsPerCountry',
    function(DimensionBuilder $dimensions) {
        // select the country name by its iso value.
        $dimensions->lookup('country_name', 'iso');        
    },
    false, # byRow
    false # round
);

cardinality() 聚合方法有以下参数

distinctCount()

distinctCount() 聚合函数计算给定维度的唯一发生次数。

此方法使用 Theta Sketch 扩展,并且应该启用以使用此聚合器。
有关更多信息,请参阅:https://druid.apache.org/docs/latest/development/extensions-core/datasketches-theta.html

示例

// Count the distinct number of categories. 
$builder->distinctCount('category_id', 'categoryCount');

distinctCount() 聚合方法有以下参数

doublesSketch()

doublesSketch() 聚合函数将创建一个 DoubleSketch 数据字段,该字段可用于各种后聚合方法,以对收集到的数据进行额外计算。

DoubleSketch 是一种可合并的流式算法,用于估计值的分布,并近似回答关于值排名、分布的概率质量函数(PMF)或直方图、累积分布函数(CDF)和分位数(中位数、最小值、最大值、95百分位数等)的查询。

此方法使用 datasketches 扩展,并且应该启用以使用此聚合器。
有关更多信息,请参阅:https://druid.apache.org/docs/latest/development/extensions-core/datasketches-quantiles.html

示例

// Get the 95th percentile of the salaries per country.
$builder = $client->query('dataSource')
    ->interval('now - 1 hour', 'now')
    ->select('country')
    ->doublesSketch('salary', 'salaryData') // this collects the data 
    ->quantile('quantile95', 'salaryData', 0.95) // this uses the data which was collected 

要查看关于 doubleSketch 数据的更多信息,请参阅 sketchSummary() 后聚合方法。

doublesSketch() 聚合方法有以下参数

QueryBuilder: 过滤器

使用过滤器,您可以针对某些值进行过滤。以下过滤器可用

where()

这可能是最常用的过滤器。它非常灵活。

此方法使用以下参数

以下 $operator 值被支持

此方法支持快速等于简写。例如

$builder->where('name', 'John');

等同于

$builder->where('name', '=', 'John');

我们还支持使用一个 Closure 将各种过滤器组合成一个过滤器。它将接收一个 FilterBuilder。例如

$builder->where(function (FilterBuilder $filterBuilder) {
    $filterBuilder->orWhere('namespace', 'Talk');
    $filterBuilder->orWhere('namespace', 'Main');
});
$builder->where('channel', 'en');

这将与 SQL 等价:SELECT ... WHERE (namespace = 'Talk' OR 'namespace' = 'Main') AND 'channel' = 'en';

最后,您还可以提供一个原始的过滤器对象。例如

$builder->where( new SelectorFilter('name', 'John') );

然而,这并不推荐,也不应该需要。

orWhere()

where() 相同,但现在我们将使用 or 而不是 and 来连接之前添加的过滤器。

whereNot()

使用这个过滤器,您可以构建一个不应该匹配的过滤器集。因此,它是反转的。

示例

$builder->whereNot(function (FilterBuilder $filterBuilder) {
    $filterBuilder->orWhere('namespace', 'Talk');
    $filterBuilder->orWhere('namespace', 'Main');
});

您可以将此与其他所有过滤器一起使用!

此方法具有以下参数

orWhereNot()

whereNot() 相同,但现在我们将使用 or 而不是 and 来连接之前添加的过滤器。

whereNull()

Druid 已经更改了其 NULL 处理方式。现在您可以通过配置 druid.generic.useDefaultValueForNull=false 来配置它以存储 NULL 值。

如果配置了此选项,您可以使用此过滤器过滤 NULL 值。

此方法具有以下参数

示例

// filter on all places where city name is NULL.
$builder->whereNull('city'); 

// filter on all places where the country is NOT NULL!
$builder->whereNot(function (FilterBuilder $filterBuilder) {
    $filterBuilder->whereNull('country');    
});

orWhereNull()

whereNull() 相同,但现在我们将使用 or 而不是 and 来连接之前添加的过滤器。

whereIn()

使用此方法,您可以过滤使用多个值的记录。

此方法具有以下参数

示例

// filter where country in "it", "de" or "au".
$builder->whereIn('country_iso', ['it', 'de', 'au']); 

orWhereIn()

whereIn() 相同,但现在我们将使用 or 而不是 and 来连接之前添加的过滤器。

whereArrayContains()

使用此方法,您可以过滤数组是否包含指定的元素。

此方法具有以下参数

示例

$builder->whereArrayContains('features', 'myNewFeature'); 

orWhereArrayContains()

whereArrayContains() 相同,但现在我们将使用 or 而不是 and 来连接之前添加的过滤器。

whereBetween()

此过滤器将选择给定维度大于或等于给定 $minValue 且小于给定 $maxValue 的记录。

SQL 等价于:SELECT ... WHERE field >= $minValue AND field < $maxValue

此方法具有以下参数

orWhereBetween()

whereBetween() 相同,但现在我们将使用 or 而不是 and 来连接之前添加的过滤器。

whereColumn()

whereColumn() 过滤器将两个维度进行比较。只有当维度匹配时,才会返回记录。

whereColumn() 过滤器有以下参数

orWhereColumn()

whereColumn() 相同,但现在我们将使用 or 而不是 and 来连接之前添加的过滤器。

whereInterval()

区间过滤器使可以对包含长毫秒值的列进行范围过滤,边界以 ISO 8601 时间间隔指定。它适用于 __time 列、长指标列和具有可解析为长毫秒的值的维度。

此过滤器将 ISO 8601 间隔转换为长毫秒起始/结束范围。然后它将使用 between 过滤器来查看间隔是否匹配。

此方法具有以下参数

$intervals 数组可以包含以下内容

  • 一个 Interval 对象
  • druid 中使用的原始间隔字符串。例如:“2019-04-15T08:00:00.000Z/2019-04-15T09:00:00.000Z”
  • 间隔字符串,使用 /(例如“12-02-2019/13-02-2019”)分隔起始和结束
  • 包含两个元素的数组,分别为起始和结束日期。这些可以是 DateTime 对象、Unix 时间戳或任何可以被 DateTime::__construct 解析的内容

有关更多信息,请参阅 interval() 方法。

示例

$builder->whereInterval('__time', ['12-09-2019/13-09-2019', '19-09-2019/20-09-2019']);

orWhereInterval()

whereInterval() 相同,但现在我们将使用 or 而不是 and 来连接之前添加的过滤器。

whereFlags()

此过滤器允许您在维度值应与过滤器使用位运算 AND 比较匹配的情况下进行过滤。

支持 64 位整数。

从版本 0.20.2 开始,Druid 支持 位运算标志。在此之前,我们构建了自己的变体,但那时需要 JavaScript 支持。要使用 JavaScript 变体,您应该将此方法的第 4 个参数传递为 true

默认情况下禁用了基于JavaScript的功能。有关使用Druid的JavaScript功能(包括如何启用它的说明)的指南,请参阅Druid JavaScript编程指南:https://druid.apache.org/docs/latest/development/javascript.html

示例

$client = new \Level23\Druid\DruidClient([
    'router_url' => 'https://router.url:8080',
]);

$client->query('myDataSource')
    ->interval('now - 1 day', 'now')
    // Select records where the first and third bit are enabled (1 and 4)
    ->whereFlags('flags', (1 | 4));

此方法具有以下参数

orWhereFlags()

whereFlags()相同,但现在我们将使用or而不是and来连接之前添加的过滤器。

whereExpression()

此过滤器允许您根据Druid表达式进行筛选。另请参阅:https://druid.apache.org/docs/latest/querying/math-expr

此过滤器提供了更多灵活性,但由于尚未实施所有筛选优化,因此可能比本页上其他过滤器的组合性能较低。

示例

$client = new \Level23\Druid\DruidClient([
    'router_url' => 'https://router.url:8080',
]);

$client->query('myDataSource')
    ->interval('now - 1 day', 'now')
    ->whereExpression('((product_type == 42) && (!is_deleted))');

此方法具有以下参数

orWhereExpression()

whereExpression()相同,但现在我们将使用or而不是and来连接之前添加的过滤器。

whereSpatialRectangular()

此过滤器允许您根据您的空间维度是否在给定的矩形区域内筛选记录。

示例

$client = new \Level23\Druid\DruidClient([
    'router_url' => 'https://router.url:8080',
]);

$client->query('myDataSource')
    ->interval('now - 1 day', 'now')
    ->whereSpatialRectangular('location', [0.350189, 51.248163], [-0.613861, 51.248163]);

此方法具有以下参数

orWhereSpatialRectangular()

whereSpatialRectangular()相同,但现在我们将使用or而不是and来连接之前添加的过滤器。

whereSpatialRadius()

此过滤器允许您根据您的空间维度是否在给定点的半径内筛选记录。

示例

$client = new \Level23\Druid\DruidClient([
    'router_url' => 'https://router.url:8080',
]);

$client->query('myDataSource')
    ->interval('now - 1 day', 'now')
    ->whereSpatialRectangular('location', [0.350189, 51.248163], [-0.613861, 51.248163]);

此方法具有以下参数

orElseSpatialRadius()

whereSpatialRadius()相同,但现在我们将使用or而不是and来连接之前添加的过滤器。

whereSpatialPolygon()

此过滤器允许您根据您的空间维度是否在给定的多边形内筛选记录。

示例

$client = new \Level23\Druid\DruidClient([
    'router_url' => 'https://router.url:8080',
]);

$client->query('myDataSource')
    ->interval('now - 1 day', 'now')
    ->whereSpatialPolygon('location', [0.350189, 51.248163], [-0.613861, 51.248163]);

此方法具有以下参数

orElseSpatialPolygon()

orWhereSpatialPolygon()相同,但现在我们将使用or而不是and来连接之前添加的过滤器。

QueryBuilder:拥有过滤条件

使用筛选器,您可以在数据检索后筛选记录。这允许您对聚合值进行筛选。

另请参阅此页面:https://druid.apache.org/docs/latest/querying/having.html

以下是对所有筛选方法的说明。

having()

having()过滤器与where()过滤器非常相似,非常灵活。

此方法具有以下参数

以下 $operator 值被支持

此方法支持快速等于简写。例如

// select everybody with 2 kids
$builder->having('sumKids', 2);

等同于

$builder->having('sumKids', '=', 2);

我们还支持使用Closure在1个过滤器中分组各种筛选。它将接收一个HavingBuilder。例如

$builder->having(function (FilterBuilder $filterBuilder) {
    $filterBuilder->orHaving('sumKats', '>', 0);
    $filterBuilder->orHaving('sumDogs', '>', 0);
});
$builder->having('sumKids', '=', 0);

这相当于SQL等价语句:SELECT ... HAVING (sumKats > 0 OR sumDogs > 0) AND sumKids = 0;

最后,您还可以提供原始筛选器或筛选过滤器对象。例如

// example using a having filter
$builder->having( new GreaterThanHavingFilter('totalViews', 15) );

// example using a "normal" filter.
$builder->having( new SelectorFilter('totalViews', '15') );

然而,这并不推荐,也不应该需要。

orHaving()

having()相同,但现在我们将使用or而不是and来连接之前添加的筛选过滤器。

QueryBuilder:虚拟列

虚拟列允许您基于一个表达式创建一个新的“虚拟”列。这非常强大,但在Druid手册中并未很好地记录。

Druid表达式允许您执行各种操作,例如

  • 执行查找并使用结果
  • 对值执行数学运算
  • 使用if,else表达式
  • 连接字符串
  • 使用“case”语句
  • 等等。

有关可用表达式的完整列表,请参阅此页面:https://druid.apache.org/docs/latest/querying/math-expr

要使用虚拟列,您应该使用virtualColumn()方法

virtualColumn()

此方法根据给定的表达式创建一个虚拟列。

虚拟列是在查询期间从一组列创建的查询可查询列“视图”。

虚拟列可能从多个底层列中获取,尽管虚拟列始终以单个列的形式呈现。

虚拟列可以用作维度或作为聚合器的输入。

注意:虚拟列不会自动添加到您的输出中。如果您想将其也添加到输出中,请单独选择它。使用selectVirtual()一次完成。

示例

// Increase our reward with $2,00 if this sale was done by a promoter. 
$builder->virtualColumn('if(promo_id > 0, reward + 2, 0)', 'rewardWithPromoterPayout', 'double')
    // Now sum all our rewards with the promoter payouts included.
    ->doubleSum('rewardWithPromoterPayout', 'totalRewardWithPromoterPayout');

此方法具有以下参数

selectVirtual()

此方法创建了一个虚拟列,就像 virtualColumn() 方法一样,但此方法还会在输出中选取该虚拟列。

示例

// Select the mobile device type as text, but only if isMobileDevice = 1 
$builder->selectVirtual(
    "if( isMobileDevice = 1, case_simple( mobileDeviceType, '1', 'samsung', '2', 'apple', '3', 'nokia', 'other'), 'no mobile device')", 
    "deviceType"
);

此方法具有以下参数

QueryBuilder:后聚合

后聚合是在从 druid 数据库中检索结果之后执行的聚合。

fieldAccess()

fieldAccess() 后聚合方法本身并不是一个聚合方法,但您需要它来访问在其他后聚合中使用到的字段。

例如,当您想计算每个职位功能的平均薪资时

$builder
    ->select('jobFunction')
    ->doubleSum('salary', 'totalSalary')
    ->longSum('nrOfEmployees')
    // avgSalary = totalSalary / nrOfEmployees   
    ->divide('avgSalary', function(PostAggregationsBuilder $builder) {
        $builder->fieldAccess('totalSalary');
        $builder->fieldAccess('nrOfEmployees');
    });

但是,您也可以使用这种简写,它将被转换为 fieldAccess 方法

$builder
    ->select('jobFunction')
    ->doubleSum('salary', 'totalSalary')
    ->longSum('nrOfEmployees')
    // avgSalary = totalSalary / nrOfEmployees   
    ->divide('avgSalary', ['totalSalary', 'nrOfEmployees']);

这完全一样。我们将为您将给定的字段转换为 fieldAccess()

fieldAccess() 后聚合器有以下参数

constant()

constant() 后聚合方法允许您定义一个可以在后聚合函数中使用的常量。

例如,当您想根据半径计算圆的面积时

根据公式 (半径 x 半径 x π) 查找圆的面积。

$builder
    ->select('radius')
    ->multiply('area', function(PostAggregationsBuilder $builder){
        $builder->multiply('r2', ['radius', 'radius']);
        $builder->constant('3.141592654', 'pi');
    });

constant() 后聚合器有以下参数

表达式

expression() 后聚合方法允许您提供原生 druid 表达式,该表达式允许您计算结果值。

Druid表达式允许您执行各种操作,例如

  • 执行查找并使用结果
  • 对值执行数学运算
  • 使用 if,else 语句
  • 连接字符串
  • 使用“case”语句
  • 等等。

有关可用表达式的完整列表,请参阅此页面:https://druid.apache.org/docs/latest/querying/math-expr

示例

$builder
    ->sum('kids', 'totalKids')
    ->sum('adults', 'totalAdults')
    ->expression('totalHumans', 'totalKids + totalAdults', null, DataType::LONG)

expression() 后聚合器有以下参数

divide()

divide() 后聚合方法将给定的字段进行除法运算。如果除以 0,结果总是 0。

示例

$builder
    ->select('jobFunction')
    ->doubleSum('salary', 'totalSalary')
    ->longSum('nrOfEmployees')
    // avgSalary = totalSalary / nrOfEmployees   
    ->divide('avgSalary', ['totalSalary', 'nrOfEmployees']);

第一个参数是结果名称,结果将在输出中可用。您想除以的字段可以通过以下方式提供。下面将描述这些方式

方法 1:数组

您可以将要用于除法的字段作为数组提供。它们将被转换为 fieldAccess() 调用。

示例

$builder->divide('avgSalary', ['totalSalary', 'nrOfEmployees']);

方法 2:可变参数列表

您可以在方法调用中提供要用于除法的字段作为额外参数。它们将被转换为 fieldAccess() 调用。

示例

// This will become: avgSalary = totalSalary / nrOfEmployees / totalBonus
$builder->divide('avgSalary', 'totalSalary', 'nrOfEmployees', 'totalBonus');

方法 3:闭包

您还可以提供闭包,这允许您构建更高级的数学计算。

示例

// This will become: avgSalary = totalSalary / nrOfEmployees / ( bonus + tips )
$builder->divide('avgSalary', function(PostAggregationsBuilder $builder){    
    $builder->fieldAccess('totalSalary');
    $builder->fieldAccess('nrOfEmployees');    

    $builder->add('totalBonus', ['bonus', 'tips']);    
});

divide() 后聚合器有以下参数

multiply()

multiply() 后聚合方法将给定的字段相乘。

示例

$builder->multiply('volume', ['width', 'height', 'depth']);

multiply() 后聚合器有以下参数

subtract()

subtract() 后聚合方法将给定的字段相减。

示例

$builder->subtract('total', ['revenue', 'taxes']);

subtract() 后聚合器有以下参数

add()

add() 后聚合方法将给定的字段相加。

示例

$builder->add('total', ['salary', 'bonus']);

add() 后聚合器有以下参数

quotient()

quotient() 后聚合方法将计算给定字段值之间的商。除法运算类似于常规的浮点除法。

示例

// for example: quotient = 15 / 4 = 3 (e.g., how much times fits 4 into 15?)
$builder->quotient('quotient', ['dividend', 'divisor']);

add() 后聚合器有以下参数

longGreatest()doubleGreatest()

longGreatest()doubleGreatest() 后聚合方法计算所有字段的最大值。

doubleMax() 聚合器和 doubleGreatest() 后聚合器之间的区别在于,doubleMax 返回特定列的所有行的最高值,而 doubleGreatest 返回同一行的多个列的最高值。这些类似于 SQL MAX 和 GREATEST 函数。

示例

$builder 
  ->longSum('a', 'totalA')
  ->longSum('b', 'totalB')
  ->longSum('c', 'totalC')
  ->longGreatest('highestABC', ['a', 'b', 'c']);    

longGreatest()doubleGreatest() 后聚合器有以下参数

longLeast()doubleLeast()

longLeast()doubleLeast() 后聚合方法计算所有字段的最小值。

doubleMin() 聚合器和 doubleLeast() 后聚合器的区别在于,doubleMin 返回特定列中所有行的最低值,而 doubleLeast 返回同一行中多个列的最低值。它们类似于 SQL 中的 MIN 和 LEAST 函数。

示例

$builder 
  ->longSum('a', 'totalA')
  ->longSum('b', 'totalB')
  ->longSum('c', 'totalC')
  ->longLeast('lowestABC', ['a', 'b', 'c']);    

longLeast()doubleLeast() 后聚合器有以下参数

postJavascript()

postJavascript() 后聚合方法允许您对给定的字段应用给定的 JavaScript 函数。字段按给定顺序作为参数传递给 JavaScript 函数。

注意:基于JavaScript的功能默认是禁用的。请参阅Druid JavaScript编程指南,了解有关使用Druid的JavaScript功能的指南,包括如何启用它的说明: https://druid.apache.org/docs/latest/development/javascript.html

示例

$builder->postJavascript(
    'absPercent',
    'function(delta, total) { return 100 * Math.abs(delta) / total; }',
    ['delta', 'total']
);    

postJavascript() 后聚合方法有以下参数

hyperUniqueCardinality()

hyperUniqueCardinality() 后聚合器用于包装 hyperUnique 对象,以便可以在后聚合中使用。

示例

$builder
  ->count('rows')
  ->hyperUnique('unique_users', 'uniques')
  ->divide('averageUsersPerRow', function(PostAggregationsBuilder $builder){    
      $builder->hyperUniqueCardinality('unique_users');
      $builder->fieldAccess('rows');    
  });

hyperUniqueCardinality() 后聚合器有以下参数

quantile()

quantile() 后聚合器用于返回输入流假设排序版本中给定分数之前值的近似值。

此方法使用 Apache DataSketches 库,应启用以使用此后聚合器。
有关更多信息,请参阅:https://druid.apache.org/docs/latest/development/extensions-core/datasketches-theta.html

示例

// Get the 95th percentile of the salaries per country.
$builder = $client->query('dataSource')
    ->interval('now - 1 hour', 'now')
    ->select('country')
    ->doublesSketch('salary', 'salaryData') // this collects the data 
    ->quantile('quantile95', 'salaryData', 0.95) // this uses the data which was collected 

quantile() 后聚合器有以下参数

quantiles()

quantiles() 后聚合器返回与给定分数数组相对应的量级数组。

此方法使用 Apache DataSketches 库,应启用以使用此后聚合器。
有关更多信息,请参阅:https://druid.apache.org/docs/latest/development/extensions-core/datasketches-theta.html

示例

// Get the 95th percentile of the salaries per country.
$builder = $client->query('dataSource')
    ->interval('now - 1 hour', 'now')
    ->select('country')
    ->doublesSketch('salary', 'salaryData') // this collects the data 
    ->quantiles('quantile95', 'salaryData', [0.8, 0.95]) // this uses the data which was collected 

quantiles() 后聚合器有以下参数

histogram()

histogram() 后聚合器根据定义直方图区间或直方图数量的分割点数组返回直方图的近似值。一个由 m 个唯一、单调递增的分割点组成的数组将实数线划分为 m+1 个连续的、不相交的区间。区间的定义是包含左分割点,不包括右分割点。如果指定了直方图数量而不是分割点,则将最小值和最大值之间的区间划分为给定数量的等间距区间。

此方法使用 Apache DataSketches 库,应启用以使用此后聚合器。
有关更多信息,请参阅:https://druid.apache.org/docs/latest/development/extensions-core/datasketches-theta.html

示例

// Create our builder
$builder = $client->query('dataSource')
    ->interval('now - 1 hour', 'now')
    ->select('country')
    ->doublesSketch('salary', 'salaryData') // this collects the data 
    // This would spit the data in "buckets". 
    // It will return an array with the number of people earning, 1000 or less, 
    // the number of people earning 1001 to 1500, etc.
    ->histogram('salaryGroups', 'salaryData', [1000, 1500, 2000, 2500, 3000, 3500, 4000, 4500, 5000, 5500]);  

histogram() 后聚合器有以下参数

参数 $splitPoints$numBins 是互斥的。

rank()

rank() 后聚合器返回给定值的近似排名,该值是小于该值的分布的分数。

此方法使用 Apache DataSketches 库,应启用以使用此后聚合器。
有关更多信息,请参阅:https://druid.apache.org/docs/latest/development/extensions-core/datasketches-theta.html

示例

// Create our builder
$builder = $client->query('dataSource')
    ->interval('now - 1 hour', 'now')
    ->select('country')
    ->doublesSketch('salary', 'salaryData') // this collects the data 
    // This will get the ranking of the value 2500 compared to all available "salary" values in the resultset.
    // The result will be a float between 0 and 1.
    ->rank('mySalaryRank', 'salaryData', 2500);  

rank() 后聚合器有以下参数

参数 $splitPoints$numBins 是互斥的。

cdf()

CDF 代表累积分布函数。

cdf() 后聚合器根据定义直方图边界的分割点数组返回累积分布函数的近似值。一个由 m 个唯一、单调递增的分割点组成的数组将实数线划分为 m+1 个连续的、不相交的区间。区间的定义是包含左分割点,不包括右分割点。结果数组中的分数可以视为每个分割点的排名,还有一个始终为 1 的额外排名。

此方法使用 Apache DataSketches 库,应启用以使用此后聚合器。
有关更多信息,请参阅:https://druid.apache.org/docs/latest/development/extensions-core/datasketches-theta.html

示例

// Create our builder
$builder = $client->query('dataSource')
    ->interval('now - 1 hour', 'now')
    ->select('country')
    ->doublesSketch('salary', 'salaryData') // this collects the data 
    ->cdf('salaryGroups', 'salaryData', [1000, 1500, 2000, 2500, 3000, 3500, 4000, 4500, 5000, 5500]);

cdf() 后聚合器有以下参数

sketchSummary()

CDF 代表累积分布函数。

sketchSummary() 后聚合器返回可用于调试的 sketch 摘要。这是调用 toString() 方法的输出。

此方法使用 Apache DataSketches 库,应启用以使用此后聚合器。
有关更多信息,请参阅:https://druid.apache.org/docs/latest/development/extensions-core/datasketches-theta.html

示例

// Create our builder
$builder = $client->query('dataSource')
    ->interval('now - 1 hour', 'now')
    ->select('country')
    ->doublesSketch('salary', 'salaryData') // this collects the data 
    ->sketchSummary('debug', 'salaryData');

sketchSummary() 后聚合器有以下参数

示例输出

### Quantiles HeapUpdateDoublesSketch SUMMARY: 
   Empty                        : false
   Direct, Capacity bytes       : false, 
   Estimation Mode              : true
   K                            : 128
   N                            : 28,025
   Levels (Needed, Total, Valid): 7, 7, 5
   Level Bit Pattern            : 1101101
   BaseBufferCount              : 121
   Combined Buffer Capacity     : 1,152
   Retained Items               : 761
   Compact Storage Bytes        : 6,120
   Updatable Storage Bytes      : 9,248
   Normalized Rank Error        : 1.406%
   Normalized Rank Error (PMF)  : 1.711%
   Min Value                    : 0.000000e+00
   Max Value                    : 8.000000e-03
### END SKETCH SUMMARY

QueryBuilder:搜索过滤条件

搜索过滤器是仅用于搜索查询的过滤器。它们允许您指定应应用于给定维度的哪个过滤器。

有几种不同的过滤器可用

searchContains()

searchContains() 方法允许您过滤维度,其中维度包含您给出的值。您可以指定匹配是否区分大小写。

示例

// Build a Search Query using a "contains" filter
$response = $client->query('wikipedia')
    ->interval('2015-09-12 00:00:00', '2015-09-13 00:00:00')
    ->dimensions(['namespace'])
    ->searchContains('Wikipedia', true) // case sensitive!
    ->search();

searchContains() 方法有以下参数

searchFragment()

searchFragment() 方法允许您根据维度中包含所有给定字符串值的条件进行筛选。您可以指定匹配是否区分大小写。

示例

// Build a Search Query using a "fragment" filter.
$response = $client->query('wikipedia')
    ->interval('2015-09-12 00:00:00', '2015-09-13 00:00:00')
    ->dimensions(['page'])
    ->searchFragment(['United', 'States'], true) // case sensitive!     
    ->search();

searchFragment() 方法有以下参数

searchRegex()

searchRegex() 方法允许您根据维度匹配给定的正则表达式进行筛选。

有关正则表达式的更多信息,请参阅此页面:https://docs.oracle.com/javase/6/docs/api/java/util/regex/Pattern.html

示例

// Build a Search Query using a "regex" filter.
$response = $client->query('wikipedia')
    ->interval('2015-09-12 00:00:00', '2015-09-13 00:00:00')
    ->dimensions(['page'])
    ->searchRegex('^Wiki')      
    ->search();

searchRegex() 方法有以下参数

QueryBuilder:执行查询

以下方法允许您执行使用其他方法构建的查询。有各种查询类型可用,或者您可以使用 execute() 方法,该方法会尝试检测最适合您的查询的最佳查询类型。

execute()

此方法将分析您在查询构建器中提供的数据,并尝试使用最适合您的查询类型。如果您不想使用“内部逻辑”,您应使用以下方法之一。

$response = $builder
  ->select('channel')
  ->longSum('deleted')
  ->orderBy('deleted', OrderByDirection::DESC)
  ->execute();

execute() 方法有以下参数

您可以使用包含上下文参数的数组,或使用 QueryContext 对象(或与您选择的查询类型相关的任何上下文对象,如 ScanQueryContext)。有关查询特定上下文的更多信息,请参阅以下查询描述。

QueryContext 对象包含适用于所有查询的上下文属性。

响应

此方法的结果取决于执行的查询。每个查询都有自己的响应对象。然而,所有查询响应都扩展了 QueryResponse 对象。因此,每个查询响应都有一个 $response->raw() 方法,它将返回 druid 返回的原始数据数组。还有一个 $response->data() 方法,它将数据以“标准化”的方式返回,以便可以直接使用。

groupBy()

groupBy() 方法将以 GroupBy 查询的形式执行您构建的查询。

这是最常用的查询类型。然而,它不是最快的。如果您在以时间为唯一分组进行聚合或对单个维度进行有序的 groupBy,请考虑使用 Timeseries 和 TopN 查询以及 groupBy。

有关更多信息,请参阅此页面:https://druid.apache.org/docs/latest/querying/groupbyquery.html

使用 GroupBy 查询,您可以聚合度量值并按您选择的维度进行分组。

示例

$builder = $client->query('wikipedia', Granularity::HOUR);

$result = $builder 
    ->interval('2015-09-12 00:00:00', '2015-09-13 00:00:00')
    ->select(['namespace', 'page'])    
    ->count('edits')
    ->longSum('added')
    ->longSum('deleted')
    ->where('isRobot', 'false')
    ->groupBy();

groupBy() 方法有以下参数

上下文

groupBy() 方法接受 1 个参数,即查询上下文。这可以是一个键 => 值对的数组,或者一个 GroupByQueryContext 对象。

使用查询上下文的示例

$builder = $client->query('wikipedia', Granularity::HOUR);

$builder 
    ->interval('2015-09-12 00:00:00', '2015-09-13 00:00:00')
    ->select(['namespace', 'page'])    
    ->count('edits')
    ->longSum('added')
    ->longSum('deleted')
    ->where('isRobot', 'false');

// Create the query context 
$context = new GroupByQueryContext();
$context->setNumParallelCombineThreads(5);

// Execute the query using the query context.
$result = $builder->groupBy($context);

响应

此查询的响应将是 GroupByQueryResponse(这适用于两种查询策略)。
$response->raw() 方法将返回由 druid 返回的原始数据数组。
$response->data() 方法以“标准化”的方式返回数据数组,以便可以直接使用。

topN()

topN() 方法将以 TopN 查询的形式执行您的查询。TopN 查询根据某些标准返回给定维度中值的排序集合。

有关 TopN 查询的更多信息,请参阅此页面:https://druid.apache.org/docs/latest/querying/topnquery.html

示例

$response = $client->query('wikipedia', Granularity::ALL)
    ->interval('2015-09-12 00:00:00', '2015-09-13 00:00:00')
    ->select('channel')
    ->count('edited')
    ->limit(10)
    ->orderBy('edited', 'desc')
    ->topN();

topN() 方法有以下参数

上下文

topN() 方法接收 1 个参数,即查询上下文。查询上下文是一个键 => 值对的数组,或者一个 TopNQueryContext 对象。上下文允许您更改查询执行的行为。

示例

$builder = $client->query('wikipedia', Granularity::ALL)
    ->interval('2015-09-12 00:00:00', '2015-09-13 00:00:00')
    ->select('channel')
    ->count('edited')
    ->limit(10)
    ->orderBy('edited', 'desc');

// Create specific query context for our query
$context = new TopNQueryContext();
$context->setMinTopNThreshold(1000);

// Execute the query
$response = $builder->topN($context);

响应

此查询的响应将是 TopNQueryResponse
$response->raw() 方法将返回由 druid 返回的原始数据数组。
$response->data() 方法以“标准化”的方式返回数据数组,以便可以直接使用。

selectQuery()

selectQuery() 方法会将您的查询作为选择查询执行。不要将此方法与 select() 方法混淆,后者将选择查询的维度。

selectQuery() 返回原始的 Druid 数据。它不允许您聚合指标。它确实支持分页。

然而,在可能的情况下,鼓励使用扫描查询类型而不是选择查询。在涉及大量段的情况下,选择查询可能会产生非常高的内存和性能开销。扫描查询没有这个问题。两者之间主要的区别是,扫描查询不支持分页。然而,扫描查询类型能够在不进行分页的情况下返回几乎无限数量的结果,因此在许多情况下是不必要的。

更多信息请参阅: https://druid.apache.org/docs/latest/querying/select-query.html

示例

// Build a select query
$builder = $client->query('wikipedia')
    ->interval('2015-09-12 00:00:00', '2015-09-13 00:00:00')
    ->select(['__time', 'channel', 'user', 'deleted', 'added'])
    ->orderByDirection(OrderByDirection::DESC)
    ->limit(10);

// Execute the query.
$response = $builder->selectQuery($context);

// ... Use your response (page 1) here! ...

// echo "Identifier for page 2: " . var_export($response->pagingIdentifier(), true) . "\n\n";

// Now, request "page 2".
$builder->pagingIdentifier($response->pagingIdentifier());

// Execute the query.
$response = $builder->selectQuery($context);

// ... Use your response (page 2) here! ...

selectQuery() 方法有以下参数

上下文

selectQuery() 方法接收一个参数,即查询上下文。查询上下文可以是键值对数组,或 QueryContext 对象。没有 SelectQueryContext,因为此查询类型没有特定的上下文参数。上下文允许您更改查询执行的行为。

示例

// Example of setting query context. It can also be supplied as an array in the selectQuery() method call.
$context = new QueryContext();
$context->setPriority(100);

// Execute the query.
$response = $builder->selectQuery($context);

响应

此查询的响应将是 SelectQueryResponse
$response->raw() 方法将返回由 druid 返回的原始数据数组。
$response->data() 方法以“标准化”的方式返回数据数组,以便可以直接使用。
$response->pagingIdentifier() 方法返回分页标识符。分页标识符可能是这样的

Array(
    'wikipedia_2015-09-12T00:00:00.000Z_2015-09-13T00:00:00.000Z_2019-09-12T14:15:44.694Z' => 19
)

scan()

scan() 方法会将您的查询作为扫描查询执行。扫描查询以流模式返回原始 Apache Druid(孵化)行。选择查询和扫描查询之间最大的区别是,扫描查询在返回给客户端之前不会保留所有返回的行。选择查询将保留行在内存中,如果返回太多的行,则可能导致内存压力。扫描查询可以在不发出另一个分页查询的情况下返回所有行。

更多信息请参阅此页面: https://druid.apache.org/docs/latest/querying/scan-query.html

示例

// Build a scan query
$builder = $client->query('wikipedia')
    ->interval('2015-09-12 00:00:00', '2015-09-13 00:00:00')
    ->select(['__time', 'channel', 'user', 'deleted', 'added'])
    ->orderByDirection(OrderByDirection::DESC)
    ->limit(10);

// Execute the query.
$response = $builder->scan();

scan() 方法有以下参数

上下文

scan() 方法的第一个参数是查询上下文。查询上下文可以是键值对数组,或 ScanQueryContext 对象。上下文允许您更改查询执行的行为。

示例

// Example of setting query context. It can also be supplied as an array in the scan() method call.
$context = new ScanQueryContext();
$context->setPriority(100);
$context->setMaxRowsQueuedForOrdering(5000);

// Execute the query.
$response = $builder->scan($context);

响应

此查询的响应将是 ScanQueryResponse
$response->raw() 方法将返回由 druid 返回的原始数据数组。
$response->data() 方法以“标准化”的方式返回数据数组,以便可以直接使用。

ScanQueryResultFormat

您可以选择两种结果格式

对于 ScanQueryResultFormat::NORMAL_LIST$response->data() 示例

array (
  0 => 
  array (
    'timestamp' => '2015-09-12T23:59:59.200Z',
    '__time' => 1442102399200,
    'channel' => '#en.wikipedia',
    'user' => 'Eva.pascoe',
    'deleted' => 0,
    'added' => 182,
  ),
)

对于 ScanQueryResultFormat::COMPACTED_LIST$response->data() 示例

array (
  0 => 
  array (
    0 => '2015-09-12T23:59:59.200Z',
    1 => 1442102399200,
    2 => '#en.wikipedia',
    3 => 'Eva.pascoe',
    4 => 0,
    5 => 182,
  ),  
)

timeseries()

timeseries() 方法将您的查询作为时间序列查询执行。它将返回按给定时间粒度分组的的数据。

有关时间序列查询的更多信息,请参阅此页面: https://druid.apache.org/docs/latest/querying/timeseriesquery.html

示例

// Build a TimeSeries query
$builder = $client->query('wikipedia', Granularity::HOUR)
    ->interval('2015-09-12 00:00:00', '2015-09-13 00:00:00')
    ->longSum('added')
    ->longSum('deleted')
    ->count('edited')
    ->select('__time', 'datetime')
    ->orderByDirection(OrderByDirection::DESC);

// Execute the query.
$response = $builder->timeseries();

timeseries() 方法有以下参数

上下文

timeseries() 方法接收一个参数,即查询上下文。查询上下文可以是键值对数组,或 TimeSeriesQueryContext 对象。上下文允许您更改查询执行的行为。

示例

// Example of setting query context. It can also be supplied as an array in the timeseries() method call.
$context = new TimeSeriesQueryContext();
$context->setSkipEmptyBuckets(true);

// Execute the query.
$response = $builder->timeseries($context);

响应

此查询的响应将是 TimeSeriesQueryResponse
$response->raw() 方法将返回由 druid 返回的原始数据数组。
$response->data() 方法以“标准化”的方式返回数据数组,以便可以直接使用。

search()

search() 方法将您的查询作为搜索查询执行。搜索查询将返回匹配特定搜索选择的维度的唯一值。响应将包含匹配您搜索条件的维度、维度的值以及出现的次数。

有关搜索查询的更多信息,请参阅此页面:https://druid.apache.org/docs/latest/querying/searchquery.html

请参阅搜索过滤器,了解如何指定您的搜索过滤器。

示例

// Build a Search Query
$builder = $client->query('wikipedia')
    ->interval('2015-09-12 00:00:00', '2015-09-13 00:00:00')
    ->dimensions(['namespace']) // If left out, all dimensions are searched
    ->searchContains('wikipedia')
    ->limit(150);

// Execute the query, sorting by String Length (shortest first).
$response = $builder->search([], SortingOrder::STRLEN);

search()方法有以下参数

上下文

search()方法接收的第一个参数是查询上下文。查询上下文是一个键值对数组,或者一个QueryContext对象。该上下文允许您更改查询执行的行为。

示例

// Example of setting query context. It can also be supplied as an array in the search() method call.
$context = new QueryContext();
$context->setPriority(100);

// Execute the query.
$response = $builder->search($context);

响应

此查询的响应将是SearchQueryResponse
$response->raw() 方法将返回由 druid 返回的原始数据数组。
$response->data() 方法以“标准化”的方式返回数据数组,以便可以直接使用。

元数据

除了查询数据外,DruidClient类还允许您从Druid设置中提取元数据。

metadata()方法返回一个MetadataBuilder实例。使用此实例,您可以检索有关Druid设置的各种元数据信息。

以下是我们描述的最常用方法。

metadata()->intervals()

intervals()方法返回给定$dataSource的所有区间。

示例

$intervals = $client->metadata()->intervals('wikipedia');

intervals()方法有1个参数

它将返回如下响应

[
  "2019-08-19T14:00:00.000Z/2019-08-19T15:00:00.000Z" => [ "size" => 75208,  "count" => 4 ],
  "2019-08-19T13:00:00.000Z/2019-08-19T14:00:00.000Z" => [ "size" => 161870, "count" => 8 ],
]

metadata()->interval()

MetadataBuilder上的interval()方法将返回有关给定区间的所有详细信息。

示例

// retrieve the details regarding the given interval.
$response = $client->metadata()->interval('wikipedia', '2015-09-12T00:00:00.000Z/2015-09-13T00:00:00.000Z');

方法 interval() 有以下参数

它将返回以下数组

$response = [
    '2015-09-12T00:00:00.000Z/2015-09-13T00:00:00.000Z' =>
        [
            'wikipedia_2015-09-12T00:00:00.000Z_2015-09-13T00:00:00.000Z_2019-09-26T18:30:14.418Z' =>
                [
                    'metadata' =>
                        [
                            'dataSource'    => 'wikipedia',
                            'interval'      => '2015-09-12T00:00:00.000Z/2015-09-13T00:00:00.000Z',
                            'version'       => '2019-09-26T18:30:14.418Z',
                            'loadSpec'      =>
                                [
                                    'type' => 'local',
                                    'path' => '/etc/apache-druid-0.15.1-incubating/var/druid/segments/wikipedia/2015-09-12T00:00:00.000Z_2015-09-13T00:00:00.000Z/2019-09-26T18:30:14.418Z/0/index.zip',
                                ],
                            'dimensions'    => 'added,channel,cityName,comment,countryIsoCode,countryName,deleted,delta,isAnonymous,isMinor,isNew,isRobot,isUnpatrolled,metroCode,namespace,page,regionIsoCode,regionName,user',
                            'metrics'       => '',
                            'shardSpec'     =>
                                [
                                    'type'         => 'numbered',
                                    'partitionNum' => 0,
                                    'partitions'   => 0,
                                ],
                            'binaryVersion' => 9,
                            'size'          => 4817636,
                            'identifier'    => 'wikipedia_2015-09-12T00:00:00.000Z_2015-09-13T00:00:00.000Z_2019-09-26T18:30:14.418Z',
                        ],
                    'servers'  =>
                        [
                            0 => 'localhost:8083',
                        ],
                ],
        ],
];

metadata()->structure()

structure()方法创建一个代表给定dataSource结构的Structure对象。它将检索最后已知区间或您提供的区间的结构。

示例

// Retrieve the strucutre of our dataSource
$structure = $client->metadata()->structure('wikipedia');

structure()方法有以下参数

示例响应

Level23\Druid\Metadata\Structure Object
(
    [dataSource] => wikipedia
    [dimensions] => Array
        (
            [channel] => STRING
            [cityName] => STRING
            [comment] => STRING
            [countryIsoCode] => STRING
            [countryName] => STRING                        
            [isAnonymous] => STRING
            [isMinor] => STRING
            [isNew] => STRING
            [isRobot] => STRING
            [isUnpatrolled] => STRING            
            [namespace] => STRING
            [page] => STRING
            [regionIsoCode] => STRING
            [regionName] => STRING
            [user] => STRING
        )

    [metrics] => Array
        (
            [added] => LONG
            [deleted] => LONG
            [delta] => LONG
            [metroCode] => LONG 
        )
)

metadata()->timeBoundary()

timeBoundary()方法返回给定dataSource的时间边界。它找到给定dataSource中记录的第一和/或最后出现。

可选地,您还可以应用过滤器。例如,仅查看满足特定条件的记录的第一和/或最后出现时间。

返回类型因给定的$bound而异。如果给出TimeBound::BOTH(或null,含义相同),我们将返回包含minTime和maxTime的数组

array(
 'minTime' => \DateTime object,
 'maxTime' => \DateTime object
)

如果只请求一个时间,无论是TimeBound::MIN_TIME还是TimeBound::MAX_TIME,我们将返回一个DateTime对象。

timeBoundary()方法有以下参数

示例

// Example of only retrieving the MAX time
$response = $client->metadata()->timeBoundary('wikipedia', TimeBound::MAX_TIME, function(FilterBuilder $builder) {
    $builder->where('channel', '!=', '#vi.wikipedia');
});

echo $response->format('d-m-Y H:i:s');

// Example of only retrieving BOTH times
$response = $client->metadata()->timeBoundary('wikipedia', TimeBound::BOTH);

echo $response['minTime']->format('d-m-Y H:i:s') .' / '. $response['maxTime']->format('d-m-Y H:i:s');

metadata()->dataSources()

此方法将返回所有dataSource作为数组。

示例

// Retrieve all data sources
$dataSources = $client->metadata()->dataSources();

foreach($dataSources as $dataSource) { 
    // ...
}

metadata()->rowCount()

检索给定dataSource和区间中的行数。

rowCount()方法有以下参数

示例

// Retrieve the total records for the past week.
$numRows = $client->metadata()->rowCount("wikipedia", "now - 1 week", "now");

重新索引/压缩数据/终止

Druid以段的形式存储数据。当您想更新一些数据时,您必须重建整个段。因此,当数据仍然是“新鲜”时,我们使用较小的段。根据我们的经验,如果需要更新(重建)数据,那么通常都是新鲜数据。通过将新鲜数据保留在较小的段中,我们只需要重建1小时的数据,而不是整个月或更长时间。

例如,我们使用小时段用于“今天”和“昨天”,然后有一些进程会在之后将此数据转换为较大的段。

因此,重新索引和压缩数据对我们来说非常重要。在这里,我们向您展示如何使用它。

注意:当您重新索引数据时,Druid将收集数据并将其放入新的段中。旧段不会删除,但会被标记为未使用。这与Laravel软删除的原理相同。要永久删除未使用的段,您应使用kill任务。以下是一个示例。

默认情况下,我们已经添加了检查以确保您已选择一个完整的区间。这可以防止许多问题。如果您希望这样,我们添加了一个名为skipIntervalValidation的特殊上下文设置。当您将其设置为true时,我们将不会验证compact()reindex()方法提供的给定区间。

示例

// Build our compact task.
$taskId = $client->compact('wikipedia')
    ->interval('2015-09-12T00:00:00.000Z/2015-09-13T00:00:00.000Z ')
    ->segmentGranularity(Granularity::DAY) 
    ->execute([ 'skipIntervalValidation' => true ]); // Ignore interval validation. 

compact()

使用compact()方法,您可以创建一个压缩任务。压缩任务可以用来改变您现有数据的段大小。
压缩任务在内部生成一个具有一些固定参数的索引任务来执行压缩工作。

更多信息请参阅此页面:https://druid.apache.org/docs/latest/ingestion/data-management.html#compact

示例

$client = new DruidClient(['router_url' => 'http://127.0.0.1:8888']);

// Build our compact task.
$taskId = $client->compact('wikipedia')
    ->interval('2015-09-12T00:00:00.000Z/2015-09-13T00:00:00.000Z ')
    ->segmentGranularity(Granularity::DAY) // set our new segment size (it was for example "hour")
    ->execute();

echo "Inserted task with id: " . $taskId . "\n";

// Start polling task status.
while (true) {
    $status = $client->taskStatus($taskId);
    echo $status->getId() . ': ' . $status->getStatus() . "\n";

    if ($status->getStatus() != 'RUNNING') {
        break;
    }
    sleep(2);
}

// Or, simply use:
// $status = $client->pollTaskStatus($taskId);

echo "Final status: \n";
print_r($status->data());

compact方法将返回一个CompactTaskBuilder对象,允许您指定其他所需数据。

注意:我们目前还不支持构建metricSpec和DimensionSpec。

reindex()

使用reindex()方法,您可以重新索引已经在druid数据源中的数据。您可以做得比使用compact()方法更多。

例如,您可以过滤或转换现有数据或更改查询粒度

$client = new DruidClient(['router_url' => 'http://127.0.0.1:8888']);

// Create our custom input source.
$source = new DruidInputSource('wikipedia');
$source->interval('2015-09-12T00:00:00.000Z/2015-09-13T00:00:00.000Z');
$source->where('namespace', 'not like', '%Draft%');

// Build our reindex task
$taskId = $client->reindex('wikipedia-new')
    ->interval('2015-09-12T00:00:00.000Z/2015-09-13T00:00:00.000Z ')
    ->parallel()
    // Here we overwrite our "source" data, we define our own source data.
    ->inputSource($source) 
    ->segmentGranularity(Granularity::DAY)
    ->queryGranularity(Granularity::HOUR)
    ->rollup()
    ->transform(function (\Level23\Druid\Transforms\TransformBuilder $builder) {
        $builder->transform('"true"', 'isRobot');
        $builder->where('comment', 'like', '%Robot%');
    })
    ->execute();

echo "Inserted task with id: " . $taskId . "\n";

// Start polling task status.
while (true) {
    $status = $client->taskStatus($taskId);
    echo $status->getId() . ': ' . $status->getStatus() . "\n";

    if ($status->getStatus() != 'RUNNING') {
        break;
    }
    sleep(2);
}

// Or, simply use:
// $status = $client->pollTaskStatus($taskId);

echo "Final status: \n";
print_r($status->data());

reindex方法将返回一个IndexTaskBuilder对象,允许您指定其他所需数据。默认情况下,我们将使用DruidInputSource从现有数据源中导入数据。

如果您想改变读取数据的源,可以使用inputSource()方法。有关其他输入源,请参阅输入源章节。

kill()

kill()方法将返回一个KillTaskBuilder对象。这允许您指定区间和可选的任务ID。然后您可以执行它。

终止任务将删除与您提供的区间匹配的所有未使用的段。如果您经常重新索引数据,您可能也经常使用此任务,否则您将存储所有旧版本的数据。

如果您想删除尚未标记为未使用的段,可以使用markAsUnused()方法

示例

$client = new DruidClient(['router_url' => 'http://127.0.0.1:8888']);

// Build our kill task and execute it.
$taskId = $client->kill('wikipedia')
    ->interval('2015-09-12T00:00:00.000Z/2015-09-13T00:00:00.000Z ')
    ->markAsUnused() // mark segments as unused
    ->execute();

echo "Kill task inserted with id: " . $taskId . "\n";

// Start polling task status.
while (true) {
    $status = $client->taskStatus($taskId);
    echo $status->getId() . ': ' . $status->getStatus() . "\n";

    if ($status->getStatus() != 'RUNNING') {
        break;
    }
    sleep(2);
}

// Or, simply use:
// $status = $client->pollTaskStatus($taskId);

echo "Final status: \n";
print_r($status->data());

使用批索引作业导入数据

当您想手动将数据导入druid时,可以使用简单的index任务。当您要导入数据时,您必须指定一个输入源。输入源是数据读取的地方。

有各种输入源,例如本地文件、HTTP端点或从SQL源检索的数据。下面我们将描述所有可用的输入源,但首先我们将解释如何创建索引任务。

$client->index(...)方法返回一个IndexTaskBuilder对象,允许您指定您的索引任务。

重要的是要理解druid默认会替换您的SEGMENTS!所以,例如,如果您将数据存储在DAY段中,那么您必须在一个任务中导入整个段的数据。否则,第二个任务将替换之前的数据。

要解决这个问题,您可以使用appendToExisting(),这将允许您在不删除之前导入的数据的情况下向现有段追加。

有关IndexTaskBuilder的更多方法,请参阅下面的示例。在每个方法调用上方,我们都添加了一些注释作为解释

$client = new DruidClient(['router_url' => 'http://127.0.0.1:8888']);

// First, define your inputSource. 
$inputSource = new \Level23\Druid\InputSources\HttpInputSource([
    'https://your-site.com/path/to/file1.json',
    'https://your-site.com/path/to/file2.json',
]);

# Now, build and execute our index task
$taskId = $client->index('myTableName', $inputSource)
    // specify the date range which will be imported.
    ->interval('now - 1 week', 'now')
    // Specify that we want to "rollup" our data 
    ->rollup()
    // We want to make segment files of 1 week of data
    ->segmentGranularity(Granularity::WEEK)
    // We want to be able to query at minimum level of HOUR data.
    ->queryGranularity(Granularity::HOUR)
    // Process the input source parallel (like multithreaded instead of 1 thread).
    ->parallel()
    // By default, an INDEX task will OVERWRITE _segments_. If you want to APPEND, use this: 
    ->appendToExisting()    
    // Set a unique id for this task.
    ->taskId('MY-TASK')
    // Specify your "time" column in your input source
    ->timestamp('time', 'posix')
    // Now we will add some dimensions which we want to add to our data-source.
    // These are the field names to read from input records, as well as the column name stored in generated segments.
    ->dimension('country', 'string')
    ->dimension('age', 'long')
    ->dimension('version', 'float')
    // You can also import spatial dimensions (x,y(,z)) coordinates
    ->spatialDimension('location', ['lat', 'long'])
    // Import multi-value dimensions
    ->multiValueDimension('tags', 'string')
    // Add the metrics which we want to ingest from our input source. (only when rollup is enabled!)
    ->sum('clicks', 'totalClicks', 'long')
    ->sum('visits', 'totalVisits', 'long')
    ->sum('revenue', 'profit', 'float')
    // Execute the task
    ->execute();
    
// If you want to stop your task (for whatever reason), you can call:    
// $client->cancelQuery($taskId);    
    
// Now poll for our final status    
$status = $client->pollTaskStatus($taskId);

echo "Final status: \n";
print_r($status->data());       

输入源

要索引数据,您需要指定数据读取的位置。您可以使用

AzureInputSource

AzureInputSource从您的Azure Blob存储或Azure Data Lake源读取数据。

重要!您需要将druid-azure-extensions作为扩展包含在内,才能使用Azure输入源。

构造函数允许您指定以下参数

以下参数之一是必需的。当您并行执行索引任务时,每个任务将处理给定对象中的一个(或多个)。

示例

// First, define your inputSource. 
$inputSource = new \Level23\Druid\InputSources\AzureInputSource([
    'azure://bucket/file1.json',
    'azure://bucket/file2.json',
]);

# Now, start building your task (import it into a datasource called azureData) 
$indexTaskBuilder = $client->index('azureData', $inputSource);
// $indexTaskBuilder-> ...

GoogleCloudInputSource

GoogleCloudInputSource从您的Azure Blob存储或Azure Data Lake源读取数据。

重要!您需要将druid-google-extensions作为扩展来使用Google Cloud Storage输入源。

构造函数允许您指定以下参数

以下参数之一是必需的。当您并行执行索引任务时,每个任务将处理给定对象中的一个(或多个)。

示例

// First, define your inputSource. 
$inputSource = new \Level23\Druid\InputSources\GoogleCloudInputSource([
    'gs://bucket/file1.json',
    'gs://bucket/file2.json',
]);

# Now, start building your task (import it into a datasource called googleData) 
$indexTaskBuilder = $client->index('googleData', $inputSource);
// $indexTaskBuilder-> ...

S3InputSource

S3InputSource从Amazon S3读取数据。

重要!您需要将druid-s3-extensions作为扩展来使用S3输入源。

构造函数允许您指定以下参数

以下参数之一是必需的。当您并行执行索引任务时,每个任务将处理给定对象中的一个(或多个)。

示例

// First, define your inputSource. 
$inputSource = new \Level23\Druid\InputSources\S3InputSource(
    [
        's3://bucket/file1.json',
        's3://bucket/file2.json',
    ],
    [], // no prefixes
    [], // no objects
    [
        "accessKeyId" => "KLJ78979SDFdS2", 
        "secretAccessKey" => "KLS89s98sKJHKJKJH8721lljkd", 
        "assumeRoleArn" => "arn:aws:iam::2981002874992:role/role-s3",
    ]
);

# Now, start building your task (import it into a datasource called awsS3Data) 
$indexTaskBuilder = $client->index('awsS3Data', $inputSource);
// $indexTaskBuilder-> ...

HdfsInputSource

HdfsInputSource直接从HDFS存储读取文件。

重要!您需要将druid-hdfs-storage作为扩展来使用HDFS输入源。

构造函数允许您指定以下参数

当您并行执行索引任务时,每个任务将处理给定文件中的一个(或多个)。

示例

// First, define your inputSource. 
$inputSource = new \Level23\Druid\InputSources\HdfsInputSource(
    ["hdfs://namenode_host/foo/bar/file.json", "hdfs://namenode_host/bar/foo/file2.json"]
);

# Now, start building your task (import it into a datasource called hdfsData) 
$indexTaskBuilder = $client->index('hdfsData', $inputSource);
// $indexTaskBuilder-> ...

HttpInputSource

HttpInputSource直接通过HTTP从远程站点读取文件。

构造函数允许您指定以下参数

当您并行执行索引任务时,每个任务将处理给定文件(URI)中的一个(或多个)。

示例

// First, define your inputSource. 

// Example 1. Without Basic Authentication 
$inputSource = new \Level23\Druid\InputSources\HttpInputSource(
    ["http://example.com/uri1", "http://example2.com/uri2"]
);

// Example 2. In this example we have a plain username-password combination. 
$inputSource = new \Level23\Druid\InputSources\HttpInputSource(
    ["http://example.com/uri1", "http://example2.com/uri2"],
    "username",
    "password"
);

// Example 3. In this example we use the password provider. 
$inputSource = new \Level23\Druid\InputSources\HttpInputSource(
    ["http://example.com/uri1", "http://example2.com/uri2"],
    "username",
    [
        "type" => "environment",
        "variable" => "HTTP_INPUT_SOURCE_PW"
    ]
);

# Now, start building your task (import it into a datasource called httpData) 
$indexTaskBuilder = $client->index('httpData', $inputSource);
// $indexTaskBuilder-> ...

InlineInputSource

InlineInputSource直接从给定内容读取数据。它可以用于演示或快速测试解析和模式。

构造函数允许您指定以下参数

示例

// First, define your inputSource. 
$inputSource = new \Level23\Druid\InputSources\InlineInputSource([
    ["row1", 16, 9.18], 
    ["row2", 12, 9.22],
    // ...
]);

# Now, start building your task (import it into a datasource called inlineData) 
$indexTaskBuilder = $client->index('inlineData', $inputSource);
// $indexTaskBuilder-> ...

LocalInputSource

LocalInputSource直接从本地存储读取文件。

构造函数允许您指定以下参数

示例

// First, define your inputSource. 

// Example 1, specify the files to ingest
$inputSource = new \Level23\Druid\InputSources\LocalInputSource([
    ["/bar/foo/file.json", "/foo/bar/file.json"]
]);

// Example 2, specify a dir and wildcard for files to ingest
$inputSource = new \Level23\Druid\InputSources\LocalInputSource([
    [],
    "/path/to/dir",
    "*.json"
]);

# Now, start building your task (import it into a datasource called inlineData) 
$indexTaskBuilder = $client->index('inlineData', $inputSource);
// $indexTaskBuilder-> ...

DruidInputSource

DruidInputSource直接从现有的Druid段读取数据。

构造函数允许您指定以下参数

示例

// First, define your inputSource. 

// Example 1, specify the files to ingest
$inputSource = new \Level23\Druid\InputSources\DruidInputSource('hits');

// only process records from a week ago until now.
$inputSource->interval('now - 1 week', 'now');

// only process records matching these filters.
$inputSource->where('browser', 'Android');
$inputSource->whereIn('version', ['8', '9', '10']);
// etc.

# Now, start building your task (import it into a datasource called androidHits) 
$indexTaskBuilder = $client->index('androidHits', $inputSource);
// $indexTaskBuilder-> ...

SqlInputSource

SqlInputSource直接从数据库使用您指定的查询读取记录。在并行模式下,每个任务将处理一个或多个查询。

注意:如果您想使用mysql作为源,您必须在Druid中启用扩展mysql-metadata-storage。如果您想使用postgresql作为源,您必须在Druid中启用扩展postgresql-metadata-storage

由于此输入源具有固定的输入格式以读取事件,因此在使用此输入源时不需要在摄取规范中指定inputFormat字段。请在使用此输入源之前参考下面的推荐做法部分。

更多信息请见https://druid.apache.org/docs/latest/ingestion/native-batch.html#sql-input-source

构造函数允许您指定以下参数

示例

// First, define your inputSource. 

// Example 1, specify the files to ingest
$inputSource = new \Level23\Druid\InputSources\SqlInputSource(
    "jdbc:mysql://host:port/schema",
    "username",
    "password",
    [
        "select * from table where type = 'a'", 
        "select * from table where type = 'b'"
    ]
);
# Now, start building your task (import it into a datasource called mysqlData) 
$indexTaskBuilder = $client->index('mysqlData', $inputSource);
// $indexTaskBuilder-> ...

CombiningInputSource

CombiningInputSource允许您从多个位置检索数据。它结合了各种输入源方法。

此输入源仅在所有代理输入源都是可分割的并且可以被并行任务使用时使用。此输入源将识别其代理的拆分,并且每个拆分将由工作任务处理。与其他输入源类似,此输入源支持单个inputFormat。因此,请注意,需要inputFormat的代理输入源必须具有相同的数据输入格式。

构造函数允许您指定以下参数

示例

// First, define your inputSource. 

// Example 1, specify the files to ingest
$inputSource = new \Level23\Druid\InputSources\CombiningInputSource([
    new \Level23\Druid\InputSources\HttpInputSource(['http://127.0.0.1/file.json']),
    new \Level23\Druid\InputSources\S3InputSource(['s3://bucket/file2.json'])
]);

# Now, start building your task (import it into a datasource called combinedData) 
$indexTaskBuilder = $client->index('combinedData', $inputSource);
// $indexTaskBuilder-> ...

输入格式

对于大多数输入源,您还需要指定传入数据的格式。您可以使用输入格式来完成此操作。您可以在TaskBuilder中选择多个输入格式。以下是对它们的解释。

csvFormat()

csvFormat()允许您指定csv数据的构建方式。

此方法允许您指定以下参数

请注意,skipHeaderRows将在从标题中查找列名之前应用。例如,如果您将skipHeaderRows设置为2并将findColumnsFromHeader设置为true,则任务将跳过前两行,然后从第三行提取列信息。

示例

$inputSource = new HttpInputSource( /*...*/ );

$builder = $client->index('data', $inputSource)
    ->csvFormat(['name', 'age'], null, true, 2)
    //-> ....
;

tsvFormat()

tsvFormat()允许您指定tsv数据的构建方式。

此方法允许您指定以下参数

请确保将分隔符更改为数据相应的分隔符。像CSV一样,您必须指定列以及您想要索引的列的子集。

请注意,skipHeaderRows将在从标题中查找列名之前应用。例如,如果您将skipHeaderRows设置为2并将findColumnsFromHeader设置为true,则任务将跳过前两行,然后从第三行提取列信息。

示例

$inputSource = new HttpInputSource( /*...*/ );

$builder = $client->index('data', $inputSource)
    ->tsvFormat(['name', 'age'], "|", null, true, 2)
    //-> ....
;

jsonFormat()

jsonFormat()允许您指定数据的格式。

另请参阅

此方法允许您指定以下参数

flattenSpec对象在可能嵌套的输入数据(如JSON或Avro)与Druid的平面数据模型之间架起了桥梁。它是inputFormat对象中的对象。

$inputSource = new HttpInputSource( /*...*/ );

// Here we define how our fields are "read" from the input source. 
$spec = new FlattenSpec(true);
$spec->field(FlattenFieldType::ROOT, 'baz');
$spec->field(FlattenFieldType::JQ, 'foo_bar', '$.foo.bar');
$spec->field(FlattenFieldType::PATH, 'first_food', '.thing.food[1]');

$builder = $client->index('data', $inputSource)
    ->jsonFormat($spec, ['ALLOW_SINGLE_QUOTES' => true, 'ALLOW_UNQUOTED_FIELD_NAMES' => true])
    //-> ....
;

orcFormat()

orcFormat() 允许您指定 ORC 输入格式。但是,要使用此输入源,您应该已经将 druid-orc-extensions 添加到 druid。

参见

此方法允许您指定以下参数

flattenSpec 对象在可能嵌套的输入数据与 Druid 的平面数据模型之间架起桥梁。它是输入格式对象中的一个对象。

$inputSource = new HttpInputSource( /*...*/ );

// Here we define how our fields are "read" from the input source. 
$spec = new FlattenSpec(true);
$spec->field(FlattenFieldType::ROOT, 'baz');
$spec->field(FlattenFieldType::JQ, 'foo_bar', '$.foo.bar');
$spec->field(FlattenFieldType::PATH, 'first_food', '.thing.food[1]');

$builder = $client->index('data', $inputSource)
    ->orcFormat($spec, true)
    //-> ....
;

parquetFormat()

parquetFormat() 允许您指定 Parquet 输入格式。但是,要使用此输入源,您应该已经将 druid-parquet-extensions 添加到 druid。

参见

此方法允许您指定以下参数

flattenSpec 对象在可能嵌套的输入数据与 Druid 的平面数据模型之间架起桥梁。它是输入格式对象中的一个对象。

$inputSource = new HttpInputSource( /*...*/ );

// Here we define how our fields are "read" from the input source. 
$spec = new FlattenSpec(true);
$spec->field(FlattenFieldType::ROOT, 'baz');
$spec->field(FlattenFieldType::PATH, 'nested', '$.path.to.nested');

$builder = $client->index('data', $inputSource)
    ->parquetFormat($spec, true)
    //-> ....
;

protobufFormat()

parquetFormat() 允许您指定 Protobuf 输入格式。但是,要使用此输入源,您应该已经将 druid-protobuf-extensions 添加到 druid。

参见

此方法允许您指定以下参数

flattenSpec 对象在可能嵌套的输入数据与 Druid 的平面数据模型之间架起桥梁。它是输入格式对象中的一个对象。

$inputSource = new HttpInputSource( /*...*/ );

// Here we define how our fields are "read" from the input source. 
$spec = new FlattenSpec(true);
$spec->field(FlattenFieldType::ROOT, 'baz');
$spec->field(FlattenFieldType::PATH, 'someRecord_subInt', '$.someRecord.subInt');

$builder = $client->index('data', $inputSource)
    ->protobufFormat([
        "type" => "file",
        "descriptor" => "file:///tmp/metrics.desc",
        "protoMessageType" => "Metrics"
    ], $spec)
    //-> ....
;