chocofamilyme / bigquery
用于处理BigQuery的库
2.0.0
2019-09-10 07:38 UTC
Requires
- php: >= 7.0.0
- ext-phalcon: >= 3.0.0
- google/cloud-bigquery: ^1.4
Requires (Dev)
This package is auto-updated.
Last update: 2024-09-10 19:41:53 UTC
README
功能
- 将数据插入BigQuery表
- 在BigQuery中执行查询
要求
- Phalcon 3.x+
- PHP 7.0+
配置
要使用此库,需要创建一个名为 analytics 的配置文件,包含以下字段
analytics.php
... return new \Phalcon\Config([ 'analytics' => [ 'dataset' => 'holding', 'queueName' => 'analytics', 'exchangeType' => 'direct', 'undeliveredDataModel' => \Helper\Analytics\Models\UndeliveredDataMock::class, 'connection' => [ 'keyFilePath' => __DIR__.'/_data/keys/key.json', //'keyFile' => {}, ], 'mappers' => [ 'tableName' => \Chocofamily\Analytics\NullMapper::class, ], 'repeater' => [ 'attempt' => 5, 'exclude' => [ \InvalidArgumentException::class, \Google\Cloud\Core\Exception\NotFoundException::class, ], ], 'pathStorage' => __DIR__.'/storage', ], ]); ...
在 test/bootstrap.php
文件中有将配置添加到 DI 的示例。
UndeliveredData 模型的迁移示例
$table = $this->table('undelivered_data'); $table->addColumn('table_name', 'string', ['null' => false]); $table->addColumn('data', 'text', ['null' => false]); $table->addColumn('status', 'integer', ['default' => 0, 'limit' => 1]); $table->addTimestamps()->create();
示例:将数据流式插入BigQuery
$bufferSize = 50; $validator = new SenderValidator(); $streamer = new StreamerWrapper($validator, $bufferSize) $mapperClass = $this->config['mappers']->get($body['table_name'], NullMapper::class); $mapper = new $mapperClass; $streamer->setMapper($mapper); $streamer->validator->setClientData($data); $streamer->send();
使用作业插入数据
用于加载大量数据,例如报告。
$validator = new SenderValidator(); $runner = new RunnerWrapper($validator) $mapperClass = $this->config['mappers']->get($body['table_name'], NullMapper::class); $mapper = new $mapperClass; $runner->setMapper($mapper); $runner->validator->setClientData($data); $runner->send();
示例:重新发送和删除未送达的数据
$limit = 100; $analytics = $this->getDI()->getShared('config')->analytics->toArray(); $provider = new BigQuery($analytics); do { $undeliveredDataService = new UndeliveredData($analytics->undeliveredDataModel); $undeliveredDataSet = $undeliveredDataService->findAllUndelivered($limit); foreach ($undeliveredDataSet as $undeliveredData) { $data = \json_decode($undeliveredData->data, true); $bigQuery->setTable($undeliveredData->table_name); if ($provider->insert($data)) { $undeliveredData->delete(); } } } while ($undeliveredDataSet->count() >= $limit);
向BigQuery发送请求
如果在查询中未指定 LIMIT,则默认使用 LIMIT 100。
$query = "SELECT * FROM holding.chocolife_test WHERE created_at = \"2018-11-20\" LIMIT 100" $provider = new BigQuery($this->getDI()->getShared('config')->analytics->toArray()); $result = $provider->runQuery($query);