keboola/db-import-export

该包允许从多个云存储将文件导入Snowflake

v2.9.0 2024-08-07 14:12 UTC

This package is auto-updated.

Last update: 2024-09-07 14:31:56 UTC


README

支持的操作

  • ABS 加载/导入csv到 SnowflakeSynapse
  • GCS 加载/导入csv到 Bigquery
  • SnowflakeSynapse 卸载/导出表到 ABS

功能

导入

  • 全量加载 - 在加载之前截断目标表
  • 增量加载 - 数据合并
  • 所有引擎的主键去重
  • 将空值转换为NULL(使用convertEmptyValuesToNull选项)

导出

  • 全量卸载 - 目标csv总是重写

开发

Docker

准备 .env.env.dist的副本)并设置AWS密钥,以便访问keboola-drivers存储桶以构建此映像。此外,将此用户添加到ci-php-import-export-lib组,这将允许您使用为测试创建的新存储桶。

可以在Dev - Main legacy中创建用户,其中还有keboola-driversci-php-import-export-lib的组。

如果您没有访问keboola-drivers的权限,您必须更改Dockerfile。

然后运行 docker compose build

AWS凭据必须也有权访问在AWS_S3_BUCKET中指定的存储桶。此存储桶必须包含测试数据。运行 docker compose run --rm dev composer loadS3 将它们加载。

准备

Azure

  • 在ABS配置中可以找到创建存储账户模板
  • 在存储账户中创建容器Blob service -> Containers 注意:对于测试,此步骤可以跳过,容器可以通过loadAbs命令创建
  • 在.env文件中填写环境变量
ABS_ACCOUNT_NAME=storageAccount
ABS_ACCOUNT_KEY=accountKey
ABS_CONTAINER_NAME=containerName
  • 将测试用例上传到ABS docker compose run --rm dev composer loadAbs

Google cloud storage

  • GCS中创建存储桶,将存储桶名称设置在.env变量GCS_BUCKET_NAME

  • IAM中创建服务账户

  • 在存储桶权限中授予服务账户对存储桶的管理员访问权限

  • 创建新的服务账户密钥

  • 将密钥转换为字符串 awk -v RS= '{$1=$1}1' <key_file>.json >> .env(或cat file.json | jq -c | jq -R

  • 将.env文件最后一行内容设置为变量GCS_CREDENTIALS

  • 将测试用例上传到GCS docker compose run --rm dev composer loadGcs-bigquerydocker compose run --rm dev composer loadGcs-snowflake(根据后端而定)

SNOWFLAKE

角色、用户、数据库和仓库是测试所需的。您可以创建它们

CREATE ROLE "KEBOOLA_DB_IMPORT_EXPORT";
CREATE DATABASE "KEBOOLA_DB_IMPORT_EXPORT";

GRANT ALL PRIVILEGES ON DATABASE "KEBOOLA_DB_IMPORT_EXPORT" TO ROLE "KEBOOLA_DB_IMPORT_EXPORT";
GRANT USAGE ON WAREHOUSE "DEV" TO ROLE "KEBOOLA_DB_IMPORT_EXPORT";

CREATE USER "KEBOOLA_DB_IMPORT_EXPORT"
PASSWORD = 'Password'
DEFAULT_ROLE = "KEBOOLA_DB_IMPORT_EXPORT";

GRANT ROLE "KEBOOLA_DB_IMPORT_EXPORT" TO USER "KEBOOLA_DB_IMPORT_EXPORT";

-- For GCS create storage integration https://docs.snowflake.com/en/user-guide/data-load-gcs-config.html#creating-a-custom-iam-role
CREATE STORAGE INTEGRATION "KEBOOLA_DB_IMPORT_EXPORT"
  TYPE = EXTERNAL_STAGE
  STORAGE_PROVIDER = GCS
  ENABLED = TRUE
  STORAGE_ALLOWED_LOCATIONS = ('gcs://<your gcs bucket>/');
-- set integration name to env GCS_INTEGRATION_NAME in .env file
-- get service account id `STORAGE_GCP_SERVICE_ACCOUNT`
DESC STORAGE INTEGRATION "KEBOOLA_DB_IMPORT_EXPORT";
-- continue according manual ^ in snflk documentation assign roles for Data loading and unloading

SYNAPSE

在Azure门户或使用CLI上创建synapse服务器。

设置环境变量:SYNAPSE_UID SYNAPSE_PWD SYNAPSE_DATABASE SYNAPSE_SERVER

运行查询

CREATE MASTER KEY;

这将为polybase创建主密钥。

托管标识

在vnet中使用ABS时需要托管标识。文档 如何设置和使用托管标识在 文档 中描述

TLDR;在ABS的IAM中添加角色分配 "Blob Storage Data {Reader or Contributor}" 到您的Synapse服务器主体

Exasol

您可以在Docker中本地运行Exasol或使用SaaS。

在Docker中本地运行Exasol

在docker中运行Exasol(对于此案例,.env已预配置)

docker compose up -d exasol

在其他地方运行Exasol服务器并设置环境变量

EXASOL_HOST=
EXASOL_USERNAME=
EXASOL_PASSWORD=

在SaaS中运行Exasol

登录到SaaS UI(或使用本地客户端)并创建具有以下权限的用户。

CREATE USER "<nick>_ie" IDENTIFIED BY "password";

GRANT 
CREATE SESSION,
CREATE SCHEMA,
CREATE TABLE,
CREATE VIEW,
CREATE USER,
CREATE ROLE,
DROP USER,
DROP ANY ROLE,
GRANT ANY ROLE,
ALTER ANY SCHEMA,
ALTER USER,
IMPORT,
EXPORT
TO "<nick>_ie"
WITH ADMIN OPTION;

获取主机(带有端口)、用户名和密码(从上一步)并在.env中填写,如上所述。确保您的账户已为您IP启用网络。

Teradata

在AWS/Azure上准备Teradata服务器并设置以下属性。见

为测试创建新数据库

CREATE DATABASE <nick>_ie_lib_tests FROM dbc
    AS PERMANENT = 1e8,
       SPOOL = 1e8;
TERADATA_HOST=
TERADATA_USERNAME=
TERADATA_PASSWORD=
TERADATA_PORT=
TERADATA_DATABASE=

Bigquery

安装 Google Cloud客户端(通过Brew),初始化它并登录到 生成默认凭据

要准备后端,您可以使用 Terraform模板。您必须具有组织的resourcemanager.folders.create权限。

# you can copy it to a folder somewhere and make an init
terraform init

使用以下变量运行terraform apply

  • folder_id:转到 GCP资源管理器 并选择您的团队开发文件夹ID(例如,找到'KBC Team Dev'并复制ID)
  • backend_prefix:你的名字,所有资源都将使用此前缀创建
  • billing_account_id:转到 计费 并复制您的计费账户ID
terraform apply -var folder_id=<folder_id> -var backend_prefix=<your_prefix> -var billing_account_id=<billing_account_id>

有关缺失部分的说明,请参阅 连接存储库。Terraform apply完成后,转到由Terraform创建的文件夹中的服务项目。

  1. 将密钥转换为字符串并保存到.env文件中:awk -v RS= '{$1=$1}1' <key_file>.json >> .env
  2. .env文件最后一行的内容设置为变量BQ_KEY_FILE
  3. 设置环境变量BQ_BUCKET_NAME,由TF模板中的file_storage_bucket_id生成

测试

使用以下命令运行测试。

注意:必须提供Azure凭据并上传 fixtures

docker compose run --rm dev composer tests

单元和功能测试可以单独运行

#unit test
docker compose run --rm dev composer tests-unit

#functional test
docker compose run --rm dev composer tests-functional

代码质量检查

#phplint
docker compose run --rm dev composer phplint

#phpcs
docker compose run --rm dev composer phpcs

#phpstan
docker compose run --rm dev composer phpstan

完整的CI工作流程

此命令将运行所有检查,加载 fixtures 并运行测试

docker compose run --rm dev composer ci

用法

Snowflake

ABS -> Snowflake import/load

use Keboola\Db\ImportExport\Backend\Snowflake\Importer;
use Keboola\Db\ImportExport\ImportOptions;
use Keboola\Db\ImportExport\Storage;

$absSourceFile = new Storage\ABS\SourceFile(...);
$snowflakeDestinationTable = new Storage\Snowflake\Table(...);
$importOptions = new ImportOptions(...);

(new Importer($snowflakeConnection))->importTable(
    $absSourceFile,
    $snowflakeDestinationTable,
    $importOptions
);

Snowflake -> Snowflake copy

use Keboola\Db\ImportExport\Backend\Snowflake\Importer;
use Keboola\Db\ImportExport\ImportOptions;
use Keboola\Db\ImportExport\Storage;

$snowflakeSourceTable = new Storage\Snowflake\Table(...);
$snowflakeDestinationTable = new Storage\Snowflake\Table(...);
$importOptions = new ImportOptions(...);

(new Importer($snowflakeConnection))->importTable(
    $snowflakeSourceTable,
    $snowflakeDestinationTable,
    $importOptions
);

Snowflake -> ABS export/unload

use Keboola\Db\ImportExport\Backend\Snowflake\Exporter;
use Keboola\Db\ImportExport\ExportOptions;
use Keboola\Db\ImportExport\Storage;

$snowflakeSourceTable = new Storage\Snowflake\Table(...);
$absDestinationFile = new Storage\ABS\DestinationFile(...);
$exportOptions = new ExportOptions(...);

(new Exporter($snowflakeConnection))->exportTable(
    $snowflakeSourceTable,
    $absDestinationFile,
    $exportOptions
);

Synapse next(实验性)

导入到Synapse

use Keboola\TableBackendUtils\Table\SynapseTableDefinition;
use Keboola\TableBackendUtils\Table\SynapseTableQueryBuilder;
use Keboola\Db\ImportExport\Backend\Synapse\ToStage\StageTableDefinitionFactory;
use Keboola\Db\ImportExport\Storage;
use Keboola\Db\ImportExport\Backend\Synapse\ToStage\ToStageImporter;
use Keboola\Db\ImportExport\Backend\Synapse\SynapseImportOptions;
use Keboola\Db\ImportExport\Backend\Synapse\ToFinalTable\IncrementalImporter;
use Keboola\Db\ImportExport\Backend\Synapse\ToFinalTable\FullImporter;
use Keboola\Db\ImportExport\Backend\Synapse\ToFinalTable\SqlBuilder;
use Doctrine\DBAL\Connection;

$importSource = new Storage\ABS\SourceFile(...);
// or
$importSource = new Storage\Synapse\Table(...);
// or
$importSource = new Storage\Synapse\SelectSource(...);

$destinationTable = new SynapseTableDefinition(...);
$options = new SynapseImportOptions(...);
$synapseConnection = new Connection(...);

$stagingTable = StageTableDefinitionFactory::createStagingTableDefinition(
    $destinationTable,
    $importSource->getColumnsNames()
);
$qb = new SynapseTableQueryBuilder($synapseConnection);
$synapseConnection->executeStatement(
    $qb->getCreateTableCommandFromDefinition($stagingTable)
);
$toStageImporter = new ToStageImporter($synapseConnection);
$toFinalTableImporter = new IncrementalImporter($synapseConnection);
// or
$toFinalTableImporter = new FullImporter($synapseConnection);
try {
    $importState = $toStageImporter->importToStagingTable(
        $importSource,
        $stagingTable,
        $options
    );
    $result = $toFinalTableImporter->importToTable(
        $stagingTable,
        $destinationTable,
        $options,
        $importState
    );
} finally {
    $synapseConnection->executeStatement(
        (new SqlBuilder())->getDropTableIfExistsCommand(
            $stagingTable->getSchemaName(),
            $stagingTable->getTableName()
        )
    );    
}

内部/扩展

库由几个简单的接口组成。

创建新的后端

导入器、导出器接口必须在新的后端中实现

Keboola\Db\ImportExport\Backend\ImporterInterface
Keboola\Db\ImportExport\Backend\ExporterInterface

每个后端都有一个相应的适配器,该适配器支持自己组合的源接口和目标接口。可以使用 setAdapters 方法设置自定义适配器。

创建新存储

存储现在为文件存储 ABS|S3(未来)或表格存储 Snowflake|Synapse。存储可以有 SourceDestination,这些必须实现 SourceInterfaceDestinationInterface。这些接口为空,适配器需要支持自己的组合。一般来说,每个 FileStorage <=> TableStorage 组合都有一个导入/导出适配器。

适配器必须实现

  • 用于导入的 Keboola\Db\ImportExport\Backend\BackendImportAdapterInterface
  • 用于导出的 Keboola\Db\ImportExport\Backend\BackendExportAdapterInterface

后端可能需要自己的扩展 AdapterInterface(Synapse 和 Snowflake 现在是这样的)。

许可证

MIT 许可,请参阅 LICENSE 文件。