ccmbenchmark/bigquery-bundle

使用 Symfony 批量上传数据到 Google bigquery

安装: 5,684

依赖项: 0

建议者: 0

安全: 0

星级: 0

关注者: 5

分支: 0

开放性问题: 0

类型:symfony-bundle

1.1.3 2024-01-26 12:37 UTC

This package is auto-updated.

Last update: 2024-09-26 13:58:39 UTC


README

此捆绑包提供了一种简单的方法,可以将数据批量上传到 Google bigquery。

概念

使用此捆绑包需要了解以下 3 个概念

实体

实体是任何实现了 \CCMBenchmark\BigQueryBundle\BigQuery\Entity\RowInterface 接口的对象。此接口扩展了 JsonSerializable 以处理导出到 bigquery。实体与一个元数据对象相关联。

元数据

元数据负责存储与您的实体相关的模式以及其他用于将数据存储到 bigquery 的信息。元数据是实现了 \CCMBenchmark\BigQueryBundle\BigQuery\MetadataInterface 接口的一个类。

单元工作

单元工作由捆绑包提供。它负责存储数据,然后将其上传到 bigquery。

完整示例

<?php
class MyMetadata implements CCMBenchmark\BigQueryBundle\BigQuery\MetadataInterface {
    public function getEntityClass(): string {
        return MyEntity::class;
    }

    public function getDatasetId(): string {
        return 'mydataset';
    }
    public function getProjectId(): string {
        return 'myproject';
    }
    public function getTableId(): string {
        return 'mytable';
    }
    public function getSchema(): array {
        return [
            [ "mode"=> "NULLABLE", "name"=> "sessions", "type"=> "INTEGER" ]
        ];
    }
}

class MyEntity implements CCMBenchmark\BigQueryBundle\BigQuery\Entity\RowInterface {
    use CCMBenchmark\BigQueryBundle\BigQuery\Entity\RowTrait;

    private $sessions;
    public function __construct(int $sessions) {
        $this->sessions = $sessions;
    }

    public function jsonSerialize()
    {
        return [
            'sessions' => $this->sessions,
            'created_at' => (new \Datetime())->format('Y-m-d H:i:s'),
        ];
    }
}

// @var $unitOfWork \CCMBenchmark\BigQueryBundle\BigQuery\UnitOfWork
$unitOfWork->addData(
    new MyEntity(
       1000
    )
);

// Your data will be uploaded when calling this method
$unitOfWork->flush();

入门

安装此包

  1. 要求包 composer require ccmbenchmark/bigquery-bundle
  2. 将其添加到您的内核

Symfony 4+:

<?php
//config/bundles.php
return [
    (..)
    \CCMBenchmark\BigQueryBundle\BigQueryBundle::class => ['all' => true]
]

Symfony 3.4:

<?php
//app/AppKernel.php
$bundles = array(
    (...)
    new \CCMBenchmark\BigQueryBundle\BigQueryBundle(),
);

在 google cloud storage 和 google bigquery 上设置您的项目

要使用 google cloud storage 上传数据到 google big query,您需要

  1. 一个有效的 google cloud project,并具有以下 API:BigQuery API、Cloud Storage
  2. 一个有效的 服务帐户,具有 json api 标识符
  3. 设置您的账户的计费

注意:使用 google cloud storage 和 google big query 可能会产生 Google 的费用。因此,使用此捆绑包可能会产生账户费用。您对此负责。

设置捆绑包

#config/packages/big_query.yml
big_query:
    cloudstorage:
        bucket: [Name of your bucket in google cloud storage]
    api:
        application_name: "My application"
        credentials_file: "[Path to your credentials in json format]"
    proxy: ## Remove this section if you don't have any proxy or set the values to "~"
        host: "%proxy.host%"
        port: "%proxy.port"

创建和声明您的元数据

要创建元数据,创建一个新的类实现 MetadataInterface。

为了自动将您的元数据注册到 UnitOfWork 中,此捆绑包提供了一个可以在该服务上声明的标签。

//config/services/services.xml
<service id="AppBundle\MyMetadata">
    <tag name="big_query.metadata" />
</service>

此时,您的元数据已通过 CompilerPass 声明到 UnitOfWork 中,您现在可以上传数据了。

使用 UnitOfWork 服务

您需要使用服务 CCMBenchmark\BigQueryBundle\BigQuery\UnitOfWork。此服务提供了一个简单的 API。

将数据上传到 google bigquery

调用 addData 将新的实体存储到上传。当所有实体都处于 UnitOfWork 中时,调用 flush 上传它们。

从 google bigquery 请求数据

调用 requestData 向指定的 projectId 发送请求,它将返回一个 \Google\Service\Bigquery\GetQueryResultsResponse

<?php
class myDataSource
{
    private string $projectId;
    private UnitOfWork $unitOfWork;
    
    public function __construct(UnitOfWork $unitOfWork, string $projectId)
    {
        $this->unitOfWork = $unitOfWork;
        $this->projectId = $projectId;
    }

    public function getData(\DateTimeImmutable $reportDate, array $sites): array
    {
    $queryResults = $this->unitOfWork->queryData($this->projectId, 'SELECT field1, field2, field3 FROM myDataset.myTable');

    $data = [];
    foreach ($queryResults->getRows() as $row) {
        $data[] = $row->current()->getV();
    }
}

调试

如果代码没有抛出异常,但您在 bigquery 中找不到您的数据,您应该按照以下步骤操作

  1. 检查 google cloud storage。应该有一个名为 "reporting-[YYYY-mm-dd]-[uniqId].json" 的文件在这里。如果没有此类文件,请检查 GCP 上的权限。
  2. 检查 google big query。应已提交一个新作业。请确保检查项目历史记录。如果作业出错,请尝试使用 UI 重新提交它,bigquery 将显示错误。

清理 google cloud storage

这不在本包的作用范围内,但为了节省存储空间,您可以在您的存储桶中定义一个生命周期