PHP的列式分析 - 一个用于以高性能方式读取和写入简单列式文件的纯PHP库。

v1.0.4 2024-04-16 13:42 UTC

This package is auto-updated.

Last update: 2024-09-16 14:25:29 UTC


README

列式分析(纯PHP实现)

在GitHub上:https://github.com/envoymediagroup/columna

关于项目

它做什么?

这个库允许您以轻量级、纯PHP实现的方式,以高性能方式编写和读取简单的列式文件格式。

为什么要在PHP中进行列式分析?

这个库最初是Envoy Media Group的“挠痒痒”项目。我们需要快速、适合我们全部PHP栈的列式分析,但发现PHP对主流列式格式(Parquet、ORC等)的支持和性能不足。因此,我们开发了简单的列式格式,并拥有自己的快速写入器和读取器。

它的实战经验如何?

这个库自2022年初以来一直作为Envoy分析和企业智能的骨干在生产中使用。它每天处理数十万次的读取和写入,为业务用户定制报告以及监控和机器学习应用的自动请求。根据我们每天在生产中使用这个库的经验,持续进行错误修复、功能添加和改进。

安装

使用Composer将此库添加到您的项目中

composer require envoymediagroup/columna

文件格式

这个库使用什么文件格式来存储数据?文件扩展名.scf代表简单列式格式,它很简单:文件的所有元数据、其列及其定义和偏移量都存储在第一行的JSON标题中。其余记录是类似于CSV的数据,采用列式排列(每列对应文件中的一行),使用RLE压缩和记录分隔符字符作为RLE分隔符。对字符串应用了一些额外的转义,以增加可以存储和检索的有效值的范围。在此处查看示例文件。

用法

写入器

每个列式文件特定于一个日期和一个度量,具有任意数量的维度。在此示例中,我们将假设一个名为clicks的度量以及三个名为platform_idsite_idurl的维度。请注意,我们将头部和值作为单独的输入提供给写入器;当处理大型数据集并希望通过不在每个数组项上重复关联字符串键来保留一些内存时,这样做是有意义的。

数据类型

目前支持的数据类型包括字符串、整数、浮点数和布尔值,以及特殊的“datetime”类型。除了在评估查询条件时,datetime被视为字符串之外,它们在strtotime()解析和与整数操作(>、<、=等)比较时被视为字符串。目前不支持嵌套数据。虽然可以在字符串类型中存储JSON或其他序列化,但这些值不会被引擎反序列化,因此不能用于评估嵌套值。列定义包括一个空值,它将始终用于数据集中的null的替代品,因此文件中不存储null,在读取文件时也不返回。

用法

下面我们将通过注释来展示如何使用写入器

<?php
require('../vendor/autoload.php');

// Import the classes we need
use EnvoyMediaGroup\Columna\Writer;
use EnvoyMediaGroup\Columna\Reader;
use EnvoyMediaGroup\Columna\ColumnDefinition;

// Create or retrieve our data set
$array = [
    [
        'clicks' => 12,
        'platform_id' => 2,
        'site_id' => 7,
        'url' => 'https://www.foo.com',
    ],
    [
        'clicks' => 31,
        'platform_id' => 2,
        'site_id' => 9,
        'url' => 'https://www.barbaz.net',
    ],
    //... etc.
];

// Define our Metric and our Dimensions
// The names should match the keys in your data set
$MetricDefinition = new ColumnDefinition(
    ColumnDefinition::AXIS_TYPE_METRIC, // metric or dimension
    'clicks',                           // name (should match the keys in your data set)
    ColumnDefinition::DATA_TYPE_INT,    // data type (string, int, float, bool, datetime)
    null,                               // precision (for floats)
    0                                   // empty value (matching the specified data type)
);

$DimensionDefinitions = [
    new ColumnDefinition(
        ColumnDefinition::AXIS_TYPE_DIMENSION,
        'platform_id',
        ColumnDefinition::DATA_TYPE_INT,
        null,
        0
    ),
    new ColumnDefinition(
        ColumnDefinition::AXIS_TYPE_DIMENSION,
        'site_id',
        ColumnDefinition::DATA_TYPE_INT,
        null,
        0
    ),
    new ColumnDefinition(
        ColumnDefinition::AXIS_TYPE_DIMENSION,
        'url',
        ColumnDefinition::DATA_TYPE_STRING,
        null,
        ''
    ),
];

// Set our output path and the date for this file's data
$date = '2022-07-08';
$file_path = "/data_directory/{$date}/{$MetricDefinition->getName()}." . Reader::FILE_EXTENSION;

// Instantiate the Writer 
$Writer = new Writer();

// The Writer expects headers (string keys) separate from data (0-indexed).
//   If your data is associative like the above, you can separate it with
//   this helper function.
list($headers,$data) = $Writer->separateHeadersAndData($array);

// Write the columnar file
$Writer->writeFile(
    $date,
    $MetricDefinition,
    $DimensionDefinitions,
    $headers,
    $data,
    $file_path,
    // Some optional flags:
    //$do_rle_compression = true,   // Perform run-length encoding (RLE) compression
    //$do_cardinality_sort = false, // Sort data by cardinality of columns before RLE
    //$lock_output_file = true      // Acquire an exclusive lock when writing output file
);

现在我们在$file_path处有一个完整的文件。

CombinedWriter

常规的 Writer 允许您将基于行的数据集转换为列文件。然后 CombinedWriter 允许您将多个现有的列文件组合成一个新的列文件,其中包含提供文件中的所有数据。这仅适用于您提供的所有文件都是同一指标、同一日期、相同列的情况。您可以使用此功能将生成数据集和文件的工作分配到大量工作者中,然后使用另一个工作者将这些结果合并成一个包含该指标在该日期上所有数据的单个大文件。您可以使用以下方式使用它

<?php
require('../vendor/autoload.php');

// Import classes
use EnvoyMediaGroup\Columna\CombinedWriter;
use EnvoyMediaGroup\Columna\ColumnDefinition;

// Set our arguments
$date = '2022-07-08';
$metric = 'clicks';
$partial_files = [
    "/tmp/{$date}/{$metric}/partial_1.scf",
    "/tmp/{$date}/{$metric}/partial_2.scf",
    "/tmp/{$date}/{$metric}/partial_3.scf",
    //... etc.
];
$combined_file_path = "/data_directory/{$date}/{$metric}." . Reader::FILE_EXTENSION;

// See Writer example above for metric and dimension definitions
$MetricDefinition = new ColumnDefinition(...); 
$DimensionDefinitions = [...];

// Write the combined file from the partial files
$Writer = new CombinedWriter();
$response = $Writer->writeCombinedFile(
    $date,
    $MetricDefinition,
    $DimensionDefinitions,
    $partial_files,
    $combined_file_path
    // Some optional flags:
    //$lock_output_file = true      // Acquire an exclusive lock when writing output file
);

// If you want to remove the partial files after creating the combined file:
foreach ($partial_files as $partial_file) {
    unlink($partial_file);
}

现在我们有一个文件位于 $combined_file_path,其中包含我们从收集到的 $partial_files 数组中的所有数据。

Reader

以下是读取文件的方法。请注意,此库包含 ReaderBundledReader 类。它们都做同样的事情,您可以使用它们互换,但使用 BundledReader 可以获得轻微的性能提升,因为它减少了 PHP 需要执行的 include() 的次数。这是一个小的胜利,在规模扩大时可以累积起来。

带参数调用,获取数组结果

以参数正常调用 Reader

<?php

// Import the needed classes
use EnvoyMediaGroup\Columna\BundledReader as Reader;
use EnvoyMediaGroup\Columna\Constraint;

// Specify our metric and date, and the corresponding file path
$metric = 'clicks';
$date = '2022-07-08';
$file_path = "/data_directory/{$date}/{$metric}." . Reader::FILE_EXTENSION;

// Set what dimensions we want to include in our results
$dimensions = [
    'platform_id',
    'site_id',
];

// Define our constraints
// Constraints are ANDed within groups, ORed between groups
// This example is equivalent to the following SQL:
//   SELECT * FROM file WHERE (platform_id = 7 AND site_id in (1,3,17)) OR (url LIKE '%sale%');
$constraints = [
    [
        (new Constraint("platform_id",Constraint::EQUALS,7))->toArray(),
        (new Constraint("site_id",Constraint::IN,[1,3,17]))->toArray(),
    ],
    [
        (new Constraint("url",Constraint::CONTAINS,"sale"))->toArray(),
    ]
];

// Group the results by the dimensions we asked for (in this case, platform_id and site_id)
$do_aggregate = true;
// Don't provide extra metadata with sum/count/min/max for each grouping, just aggregate the values
$do_aggregate_meta = false;

$Reader = new Reader();
$Reader->run(
    $date,
    $metric,
    $dimensions,
    $constraints,
    $do_aggregate,
    $do_aggregate_meta,
    $file_path
);

$metadata = $Reader->getMetadata(); // Metadata about the request and results; see sample below.
$data = $Reader->getResults(); // Results of the request; see sample below.

带 JSON 字符串工作负载调用,获取 JSON+CSV 字符串结果

Reader 设计为在运行大量请求时易于使用,这些请求分布在许多工作者进程上,使用 RPC 或消息框架,如 AWS SQS、RabbitMQ 或我们自己的 envoymediagroup/lib-rpc。因此,Reader 可以接受字符串作为其输入,并以字符串作为其输出。请求字符串是 Reader 参数的 JSON 序列化。对于结果字符串,第一行是响应元数据的编码 JSON,以下行是编码为 CSV 的结果数据,带有一点额外的转义,以在编码/解码字符串时提供更多的安全性。Response 类将为您处理反序列化。请确保使用此 Response 类来解析结果,因为它将正确处理转义字符串。

一个示例调用者

<?php

use EnvoyMediaGroup\Columna\Response;

// Craft your request
$workload_array = [
    "date" => "2022-07-08",
    "metric" => "clicks",
    "dimensions" => ["platform_id","site_id"],
    "constraints" => [
        [
            [
                "name" => "platform_id",
                "comparator" => ">=",
                "value" => 5,
            ],
        ],
    ],
    "do_aggregate" => true,
    "do_aggregate_meta" => false,
    "file" => "path/to/file.scf",
];
$workload = json_encode($workload_array);

// Transmit that workload over a network with your RPC framework of choice...
$result_string = $SomeRpcClient->request($workload);

// Unserialize the result with the Response class
$Response = new Response($result_string);

$metadata = $Response->getMetadata();
$results  = $Response->getResults();

一个示例工作者

<?php

use EnvoyMediaGroup\Columna\BundledReader as Reader;

$workload = $SomeRpcClient->receive();

$Reader = new Reader();
$Reader->runFromWorkload($workload);
$result_string = $Reader->getResponsePayload();

// Return that result string over your RPC framework...
$SomeRpcClient->respond($result_string);

元数据

元数据看起来像这样

Array(
  'date' => '2022-07-08', // Date of the file
  'metric' => 'clicks',   // Name of the metric in the file
  'status' => 'success',  // 'success' if records were found, 'empty' if no records were found, 'error' on failure
  'min' => 1,   // Least metric value among the records
  'max' => 64,  // Greatest metric value among the records
  'sum' => 102, // Total metric value among the records
  'matched_row_count' => 102, // Number of records in the file that matched your constraints prior to aggregation
  'result_row_count' => 17,   // Number of records in the result set after aggregation
  'column_meta' => Array( // Description of the columns in the result set
    0 => Array(
      // MD5 is automatically prepended. Records with matching dimension values will have matching md5 hashes.
      // This is helpful if you need to aggregate multiple Reader results together.
      'definition' => Array(
        'axis_type' => 'dimension',
        'name' => 'md5',
        'data_type' => 'string',
        'empty_value' => '',
      ),
      'index' => 0, // Numerical index in each result record that corresponds to this column
    ),
    1 => Array(
      'definition' => Array(
        'axis_type' => 'metric',
        'name' => 'clicks',
        'data_type' => 'int',
        'precision' => NULL,
        'empty_value' => 0,
      ),
      'index' => 1,
    ),
    2 => Array(
      'definition' => Array(
        'axis_type' => 'dimension',
        'name' => 'platform_id',
        'data_type' => 'int',
        'precision' => NULL,
        'empty_value' => 0,
      ),
      'index' => 2,
    ),
    3 => Array(
      'definition' => Array(
        'axis_type' => 'dimension',
        'name' => 'site_id',
        'data_type' => 'int',
        'precision' => NULL,
        'empty_value' => 0,
      ),
      'index' => 3,
    ),
  ),
  'is_aggregated' => true, // Read back of whether these results are aggregated on matching dimension values.
  'aggregate_includes_meta' => false, // Read back of whether the results include metadata for each aggregate grouping.
  'host' => 'worker-1', // Result of php_uname('n'), helpful for tracing/debugging
  'ms_elapsed' => 32,   // Milliseconds it took to complete your request
)

结果

结果数据集看起来像这样。请注意,您可以通过在元数据的 'column_meta' 中引用 'index' 字段来将每个记录中的索引映射到适当的列名称。

Array(
    0 => Array (
        0 => 'a060e57689d68664f873561a78e002d9',
        1 => 3,
        2 => 58,
        3 => 1,
    ),
    1 => Array (
        0 => 'f9f0a70e4d259b63914ccc98ed438d0e',
        1 => 16,
        2 => 54,
        3 => 1,
    ),
    2 => Array (
        0 => '166dea5e7a502516e662d389239bd2fc',
        1 => 4,
        2 => 75,
        3 => 1,
    ),
    // ... etc.
)

问答

为什么你没有使用库 X、内置函数 Y 或设计模式 Z?

简短的回答是性能。我将此库的要求保持得尽可能小,以便使自动加载非常轻量级并减少 include() 文件所花费的时间,当您优化到每个毫秒时,这会迅速累积起来。PHP 的许多内置数组函数实际上比 foreach 遍历相同的数组运行得慢。具有更多抽象的设计模式意味着更多的类和更多的重量。保持简单可以保持快速。

问题、功能请求

查看打开的问题,以获取已知问题的完整列表或提交问题或功能请求。

当然,如果您发现任何严重的错误或安全漏洞,请立即创建问题并通知我(以下为联系方式)。

贡献

贡献使开源社区成为一个如此美妙的学习、灵感和创造的地方。您所做的任何贡献都非常感谢

如果您有改进此项目的建议,请fork存储库并创建一个拉取请求。您也可以简单地打开一个带有“增强”标签的问题。别忘了给项目加星!再次感谢!

  1. 分支项目
  2. 复制 .env.base.env(必需)并更新任何环境变量(可选)
  3. 运行 docker-compose up
  4. 创建你的功能分支(git checkout -b feature/AmazingFeature
  5. 进行修改
  6. 运行 docker exec -it columna composer run test 确保单元测试通过
  7. 运行 docker exec -it columna composer run bundle 创建新的 BundledReader.php
  8. 提交你的更改(git commit -m '添加一些AmazingFeature'
  9. 推送到分支(git push origin feature/AmazingFeature
  10. 打开一个拉取请求

许可证

根据MIT许可证分发。更多信息请参阅LICENSE

联系信息

创建者:Ryan Marlow

Twitter:@myanrarlow

邮箱:ryanmarlow.oss@gmail.com

致谢

以下是我在这个项目中找到的一些有用的资源。