ccmbenchmark / bigquery-bundle
使用 Symfony 批量上传数据到 Google bigquery
Requires
- php: >=7.1
- ext-curl: *
- ext-json: *
- google/apiclient: ^2.10
- google/cloud-bigquery: ^1.23
- google/cloud-storage: ^1.25
- guzzlehttp/guzzle: ^5.3|^6.0|^7.0
- symfony/config: ^4.2|^5.0|^6.0
- symfony/dependency-injection: ^4.1.12|^5.0|^6.0
- symfony/http-kernel: ^4.0|^5.0|^6.0
Requires (Dev)
- atoum/atoum: ^4.0
- atoum/stubs: *
README
此捆绑包提供了一种简单的方法,可以将数据批量上传到 Google bigquery。
概念
使用此捆绑包需要了解以下 3 个概念
实体
实体是任何实现了 \CCMBenchmark\BigQueryBundle\BigQuery\Entity\RowInterface
接口的对象。此接口扩展了 JsonSerializable 以处理导出到 bigquery。实体与一个元数据对象相关联。
元数据
元数据负责存储与您的实体相关的模式以及其他用于将数据存储到 bigquery 的信息。元数据是实现了 \CCMBenchmark\BigQueryBundle\BigQuery\MetadataInterface
接口的一个类。
单元工作
单元工作由捆绑包提供。它负责存储数据,然后将其上传到 bigquery。
完整示例
<?php class MyMetadata implements CCMBenchmark\BigQueryBundle\BigQuery\MetadataInterface { public function getEntityClass(): string { return MyEntity::class; } public function getDatasetId(): string { return 'mydataset'; } public function getProjectId(): string { return 'myproject'; } public function getTableId(): string { return 'mytable'; } public function getSchema(): array { return [ [ "mode"=> "NULLABLE", "name"=> "sessions", "type"=> "INTEGER" ] ]; } } class MyEntity implements CCMBenchmark\BigQueryBundle\BigQuery\Entity\RowInterface { use CCMBenchmark\BigQueryBundle\BigQuery\Entity\RowTrait; private $sessions; public function __construct(int $sessions) { $this->sessions = $sessions; } public function jsonSerialize() { return [ 'sessions' => $this->sessions, 'created_at' => (new \Datetime())->format('Y-m-d H:i:s'), ]; } } // @var $unitOfWork \CCMBenchmark\BigQueryBundle\BigQuery\UnitOfWork $unitOfWork->addData( new MyEntity( 1000 ) ); // Your data will be uploaded when calling this method $unitOfWork->flush();
入门
安装此包
- 要求包
composer require ccmbenchmark/bigquery-bundle
- 将其添加到您的内核
Symfony 4+:
<?php //config/bundles.php return [ (..) \CCMBenchmark\BigQueryBundle\BigQueryBundle::class => ['all' => true] ]
Symfony 3.4:
<?php //app/AppKernel.php $bundles = array( (...) new \CCMBenchmark\BigQueryBundle\BigQueryBundle(), );
在 google cloud storage 和 google bigquery 上设置您的项目
要使用 google cloud storage 上传数据到 google big query,您需要
- 一个有效的 google cloud project,并具有以下 API:BigQuery API、Cloud Storage
- 一个有效的 服务帐户,具有 json api 标识符
- 设置您的账户的计费
注意:使用 google cloud storage 和 google big query 可能会产生 Google 的费用。因此,使用此捆绑包可能会产生账户费用。您对此负责。
设置捆绑包
#config/packages/big_query.yml big_query: cloudstorage: bucket: [Name of your bucket in google cloud storage] api: application_name: "My application" credentials_file: "[Path to your credentials in json format]" proxy: ## Remove this section if you don't have any proxy or set the values to "~" host: "%proxy.host%" port: "%proxy.port"
创建和声明您的元数据
要创建元数据,创建一个新的类实现 MetadataInterface。
为了自动将您的元数据注册到 UnitOfWork 中,此捆绑包提供了一个可以在该服务上声明的标签。
//config/services/services.xml <service id="AppBundle\MyMetadata"> <tag name="big_query.metadata" /> </service>
此时,您的元数据已通过 CompilerPass 声明到 UnitOfWork 中,您现在可以上传数据了。
使用 UnitOfWork 服务
您需要使用服务 CCMBenchmark\BigQueryBundle\BigQuery\UnitOfWork
。此服务提供了一个简单的 API。
将数据上传到 google bigquery
调用 addData
将新的实体存储到上传。当所有实体都处于 UnitOfWork 中时,调用 flush
上传它们。
从 google bigquery 请求数据
调用 requestData
向指定的 projectId
发送请求,它将返回一个 \Google\Service\Bigquery\GetQueryResultsResponse
<?php class myDataSource { private string $projectId; private UnitOfWork $unitOfWork; public function __construct(UnitOfWork $unitOfWork, string $projectId) { $this->unitOfWork = $unitOfWork; $this->projectId = $projectId; } public function getData(\DateTimeImmutable $reportDate, array $sites): array { $queryResults = $this->unitOfWork->queryData($this->projectId, 'SELECT field1, field2, field3 FROM myDataset.myTable'); $data = []; foreach ($queryResults->getRows() as $row) { $data[] = $row->current()->getV(); } }
调试
如果代码没有抛出异常,但您在 bigquery 中找不到您的数据,您应该按照以下步骤操作
- 检查 google cloud storage。应该有一个名为 "reporting-[YYYY-mm-dd]-[uniqId].json" 的文件在这里。如果没有此类文件,请检查 GCP 上的权限。
- 检查 google big query。应已提交一个新作业。请确保检查项目历史记录。如果作业出错,请尝试使用 UI 重新提交它,bigquery 将显示错误。
清理 google cloud storage
这不在本包的作用范围内,但为了节省存储空间,您可以在您的存储桶中定义一个生命周期。