chocofamilyme/bigquery

用于处理BigQuery的库

2.0.0 2019-09-10 07:38 UTC

This package is auto-updated.

Last update: 2024-09-10 19:41:53 UTC


README

功能

  • 将数据插入BigQuery表
  • 在BigQuery中执行查询

要求

  • Phalcon 3.x+
  • PHP 7.0+

配置

要使用此库,需要创建一个名为 analytics 的配置文件,包含以下字段

analytics.php

...
return new \Phalcon\Config([
    'analytics' => [
        'dataset'       => 'holding',
        'queueName'     => 'analytics',
        'exchangeType'  => 'direct',

        'undeliveredDataModel' => \Helper\Analytics\Models\UndeliveredDataMock::class,
        'connection' => [
            'keyFilePath' => __DIR__.'/_data/keys/key.json',
            //'keyFile'     => {},
        ],
        'mappers' => [
            'tableName' => \Chocofamily\Analytics\NullMapper::class,
        ],

        'repeater'    => [
            'attempt' => 5,
            'exclude' => [
                \InvalidArgumentException::class,
                \Google\Cloud\Core\Exception\NotFoundException::class,
            ],
        ],

        'pathStorage' => __DIR__.'/storage',
    ],
]);
...

test/bootstrap.php 文件中有将配置添加到 DI 的示例。

UndeliveredData 模型的迁移示例

$table = $this->table('undelivered_data');
$table->addColumn('table_name', 'string', ['null' => false]);
$table->addColumn('data', 'text', ['null' => false]);
$table->addColumn('status', 'integer', ['default' => 0, 'limit' => 1]);
$table->addTimestamps()->create();

示例:将数据流式插入BigQuery

$bufferSize = 50;
$validator = new SenderValidator();
$streamer    = new StreamerWrapper($validator, $bufferSize)

$mapperClass = $this->config['mappers']->get($body['table_name'], NullMapper::class);
$mapper = new $mapperClass;

$streamer->setMapper($mapper);
$streamer->validator->setClientData($data);
$streamer->send();

使用作业插入数据

用于加载大量数据,例如报告。

$validator = new SenderValidator();
$runner    = new RunnerWrapper($validator)

$mapperClass = $this->config['mappers']->get($body['table_name'], NullMapper::class);
$mapper = new $mapperClass;

$runner->setMapper($mapper);
$runner->validator->setClientData($data);
$runner->send();

示例:重新发送和删除未送达的数据

$limit = 100;

$analytics = $this->getDI()->getShared('config')->analytics->toArray();
$provider  = new BigQuery($analytics);

do {
    $undeliveredDataService = new UndeliveredData($analytics->undeliveredDataModel);
    $undeliveredDataSet = $undeliveredDataService->findAllUndelivered($limit);

    foreach ($undeliveredDataSet as $undeliveredData) {
        $data = \json_decode($undeliveredData->data, true);
        $bigQuery->setTable($undeliveredData->table_name);
        if ($provider->insert($data)) {
            $undeliveredData->delete();
        }
    }
} while ($undeliveredDataSet->count() >= $limit);

向BigQuery发送请求

如果在查询中未指定 LIMIT,则默认使用 LIMIT 100。

$query = "SELECT * FROM holding.chocolife_test WHERE created_at = \"2018-11-20\" LIMIT 100"
$provider = new BigQuery($this->getDI()->getShared('config')->analytics->toArray());
$result = $provider->runQuery($query);