keboola /db-import-export
该包允许从多个云存储将文件导入Snowflake
Requires
- php: ^8.1
- ext-json: *
- ext-pdo: *
- doctrine/dbal: ^3.3
- google/cloud-bigquery: ^1.23
- google/cloud-storage: ^1.27
- keboola/csv-options: ^1
- keboola/php-csv-db-import: ^6
- keboola/php-datatypes: ^7.6
- keboola/php-file-storage-utils: ^0.2.2
- keboola/php-temp: ^2.0
- keboola/table-backend-utils: >=2.7
- microsoft/azure-storage-blob: ^1.4
- symfony/process: ^4.4|^5.0|^6.0
Requires (Dev)
- keboola/coding-standard: ^15
- php-parallel-lint/php-parallel-lint: ^1.3
- phpstan/extension-installer: ^1.1
- phpstan/phpstan: ^1.4
- phpstan/phpstan-phpunit: ^1
- phpunit/phpunit: ^9
- react/async: ^4||^3
- symfony/finder: ^5.4
- dev-main
- v2.9.0
- v2.8.3
- v2.8.2
- v2.8.1
- v2.8.0
- v2.7.0
- v2.6.3
- v2.6.2
- v2.6.1
- v2.6.0
- v2.5.0
- v2.4.7
- v2.4.6
- v2.4.5
- v2.4.4
- v2.4.3
- v2.4.2
- v2.4.1
- v2.4.0
- v2.3.0
- v2.2.2
- v2.2.1
- v2.2.0
- v2.1.0
- v2.0.0
- v1.21.2
- v1.21.1
- v1.21.0
- v1.20.1
- v1.20.0
- v1.19.2
- v1.19.1
- v1.19
- v1.18.0
- v1.17.0
- v1.16.1
- v1.16.0
- v1.15.0
- v1.14.0
- v1.13.0
- v1.12.0
- v1.11.0
- v1.10.0
- v1.9.1
- v1.9.0
- v1.8.0
- v1.7.0
- v1.6.1
- v1.6.0
- v1.5.2
- v1.5.1
- v1.5.0
- v1.4.0
- v1.3.1
- v1.3.0
- v1.2.2
- v1.2.1
- v1.2.0
- v1.1.0
- v1.0.6
- v1.0.5
- v1.0.4
- v1.0.3
- v1.0.2
- v1.0.1
- v1.0.0
- v0.24.0
- v0.23.0
- v0.22.0
- v0.21.0
- v0.20.1
- v0.20
- v0.19
- v0.18
- v0.17
- v0.16.15
- v0.16.14
- v0.16.13
- v0.16.12
- v0.16.11
- v0.16.10
- v0.16.9
- v0.16.8
- v0.16.7
- v0.16.6
- v0.16.5
- v0.16.4
- v0.16.3
- v0.16.2
- v0.16.1
- v0.16.0
- v0.15.4
- v0.15.3
- v0.15.2
- v0.15.1
- v0.15.0
- v0.14.1
- v0.14.0
- v0.13.0
- v0.12.2
- v0.12.1
- v0.12.0
- v0.11.0
- v0.10.0
- v0.9.2
- v0.9.1
- v0.9.0
- v0.8.0
- v0.7.1
- v0.7.0
- v0.6.0
- v0.5.1
- v0.5.0
- v0.4.0
- v0.3.0
- v0.2.0
- v0.1.2
- v0.1.1
- v0.1
- dev-zajca-fix-distinct-on-nonnative-tables-2
- dev-zajca-ct-1642
- dev-zajca-ct-950-ignore-columns
- dev-zajca-fix-wrong-tests
- dev-zajca-big-256
- dev-martinj-db-import-export-terraform-fix
- dev-martinj-fix-phpstan
- dev-martinj-ct-1361-default-value-for-null-conversion-in-bq
- dev-CT-933-add-release-development-branch-tag-to-able-to-require-in-other-lib-in-process-of-programming
- dev-CT-1169-put-column-definition-endpoint
- dev-jirka-1271-new-command
- dev-jirka-ct-1271-add-alter-command
- dev-jirka-ct-1331-add-protobuf-for-table-info-in-preview
- dev-zajca-ct-1301
- dev-erik-metadata-backend
- dev-BIG-208-create-table-definition-sql-injection
- dev-BIG-207-bq-import-user-exception-wrong-timestamp-format
- dev-jirka-big-193-convert-load-exception
- dev-martin-GCP-445
- dev-zajca-tag.php
- dev-backup-cache
- dev-CT-905-dont-run-build-if-create-tag-on-master-branch
- dev-adamvyborny-CM-727-php-datatypes-oracle
- dev-martin-build-ecr
- dev-big-160-update-common
- dev-jirka-big-167-too-many-requests-exception
- dev-zajca-big-171
- dev-zajca-big-185
- dev-zajca-big-169
- dev-zajca-big-170
- dev-zajca-ct-1128-1
- dev-big-153-runtime-options
- dev-zajca-ct-1118-no-bc
- dev-revert-84-zajca-ct-1118
- dev-big-160
- dev-zajca-ct-1118
- dev-zajca-BIG-155-ASCII
- dev-BIG-126-external-buckets
- dev-BIG-126-external-buckets-2
- dev-big-153-roman-improve-type-hint
- dev-jirka-ct-1084-add-table-type-bq-td
- dev-jirka-ct-910-external-tables
- dev-zajca-BIG-157
- dev-zajca-big-142
- dev-zajca-fix-zero-length
- dev-zajca-new-err-code
- dev-zajca-kbc-1003
- dev-CM-569-ondra
- dev-jirka-ct-924-re-enable-exasol-start-stop
- dev-disable-td
- dev-zajca-CT-666-snflk-null
- dev-php81
- dev-CT-950-ignore_timestamp
- dev-roman-finish-release
- dev-ct-835-fixx-export-null
- dev-ct-843-fix-numeric-value-is-empty-string
- dev-CT-843-null-import
- dev-PST-631_SNFLK-add-missing-types
- dev-roman-add-release-phase
- dev-add-ie-lib-repo
- dev-odbc-test
- dev-roman-hackaton-parquet
- dev-zajca-synapse-checksum
- dev-zajca-kbc-2902
- dev-zajca-tmp-deps
- dev-roman-kbc-2798-table-create
- dev-zajca-exa-7.1
- dev-zajca-debug-exa
- dev-zajca-kbc-1668
- dev-zajca-php74
- dev-zajca-kbc-1560
- dev-zajca-kbc-1516
- dev-zajca-kbc-1224
- dev-zajca-kbc-1251
- dev-zajca-optimize-file-to-table
- dev-azure-pipelines
- dev-zajca-synapse-polybase
- dev-zajca-fix-less-insert
- dev-zajca-kbc-1058
- dev-zajca-kbc-1032
- dev-zajca-synapse-temp-table
- dev-zajca-fix-ns
- dev-zajca-kbc-799
- dev-zajca-fix-temptable-as-heap
- dev-KBC-500-spk
- dev-zajca-snflk-2.21.3
- dev-zajca-harden-escaping-export-test
- dev-zajca-run-dedup-in-transaction
- dev-zajca-remove-skipped-tests
- dev-zajca-kbc-168-synapse
- dev-zajca-kbc-168-synapse-export
- dev-zajca-update-loader
- dev-zajca-sync-dep-version
- dev-zajca-export-bind
- dev-zajca-fix-php71
- dev-zajca-s3-support
- dev-zajca-kbc-58
- dev-zajca-csv-options
- dev-zajca-readme
- dev-zajca-export-snflk-azure
- dev-zajca-7.1
- dev-zajca-genrator
- dev-zajca-switch-connection-lib
This package is auto-updated.
Last update: 2024-09-07 14:31:56 UTC
README
支持的操作
- 从
ABS
加载/导入csv到Snowflake
或Synapse
- 从
GCS
加载/导入csv到Bigquery
- 从
Snowflake
或Synapse
卸载/导出表到ABS
功能
导入
- 全量加载 - 在加载之前截断目标表
- 增量加载 - 数据合并
- 所有引擎的主键去重
- 将空值转换为NULL(使用convertEmptyValuesToNull选项)
导出
- 全量卸载 - 目标csv总是重写
开发
Docker
准备 .env
(.env.dist
的副本)并设置AWS密钥,以便访问keboola-drivers
存储桶以构建此映像。此外,将此用户添加到ci-php-import-export-lib
组,这将允许您使用为测试创建的新存储桶。
可以在Dev - Main legacy
中创建用户,其中还有keboola-drivers
和ci-php-import-export-lib
的组。
如果您没有访问keboola-drivers
的权限,您必须更改Dockerfile。
- 取消注释第一个阶段,该阶段下载Teradata驱动程序和工具,并提供从Teradata网站下载的自己的版本
- 工具:https://downloads.teradata.com/download/tools/teradata-tools-and-utilities-linux-installation-package-0
- 驱动程序:https://downloads.teradata.com/download/connectivity/odbc-driver/linux
- 在Dockerfile中将
COPY --from=td
命令更改为您本地Teradata包的副本
然后运行 docker compose build
AWS凭据必须也有权访问在AWS_S3_BUCKET
中指定的存储桶。此存储桶必须包含测试数据。运行 docker compose run --rm dev composer loadS3
将它们加载。
准备
Azure
- 在ABS配置中可以找到创建存储账户模板
- 在存储账户中创建容器
Blob service -> Containers
注意:对于测试,此步骤可以跳过,容器可以通过loadAbs
命令创建 - 在.env文件中填写环境变量
ABS_ACCOUNT_NAME=storageAccount
ABS_ACCOUNT_KEY=accountKey
ABS_CONTAINER_NAME=containerName
- 将测试用例上传到ABS
docker compose run --rm dev composer loadAbs
Google cloud storage
-
在GCS中创建存储桶,将存储桶名称设置在.env变量
GCS_BUCKET_NAME
中 -
在IAM中创建服务账户
-
在存储桶权限中授予服务账户对存储桶的管理员访问权限
-
创建新的服务账户密钥
-
将密钥转换为字符串
awk -v RS= '{$1=$1}1' <key_file>.json >> .env
(或cat file.json | jq -c | jq -R
) -
将.env文件最后一行内容设置为变量
GCS_CREDENTIALS
-
将测试用例上传到GCS
docker compose run --rm dev composer loadGcs-bigquery
或docker compose run --rm dev composer loadGcs-snowflake
(根据后端而定)
SNOWFLAKE
角色、用户、数据库和仓库是测试所需的。您可以创建它们
CREATE ROLE "KEBOOLA_DB_IMPORT_EXPORT"; CREATE DATABASE "KEBOOLA_DB_IMPORT_EXPORT"; GRANT ALL PRIVILEGES ON DATABASE "KEBOOLA_DB_IMPORT_EXPORT" TO ROLE "KEBOOLA_DB_IMPORT_EXPORT"; GRANT USAGE ON WAREHOUSE "DEV" TO ROLE "KEBOOLA_DB_IMPORT_EXPORT"; CREATE USER "KEBOOLA_DB_IMPORT_EXPORT" PASSWORD = 'Password' DEFAULT_ROLE = "KEBOOLA_DB_IMPORT_EXPORT"; GRANT ROLE "KEBOOLA_DB_IMPORT_EXPORT" TO USER "KEBOOLA_DB_IMPORT_EXPORT"; -- For GCS create storage integration https://docs.snowflake.com/en/user-guide/data-load-gcs-config.html#creating-a-custom-iam-role CREATE STORAGE INTEGRATION "KEBOOLA_DB_IMPORT_EXPORT" TYPE = EXTERNAL_STAGE STORAGE_PROVIDER = GCS ENABLED = TRUE STORAGE_ALLOWED_LOCATIONS = ('gcs://<your gcs bucket>/'); -- set integration name to env GCS_INTEGRATION_NAME in .env file -- get service account id `STORAGE_GCP_SERVICE_ACCOUNT` DESC STORAGE INTEGRATION "KEBOOLA_DB_IMPORT_EXPORT"; -- continue according manual ^ in snflk documentation assign roles for Data loading and unloading
SYNAPSE
在Azure门户或使用CLI上创建synapse服务器。
设置环境变量:SYNAPSE_UID SYNAPSE_PWD SYNAPSE_DATABASE SYNAPSE_SERVER
运行查询
CREATE MASTER KEY;
这将为polybase创建主密钥。
托管标识
在vnet中使用ABS时需要托管标识。文档 如何设置和使用托管标识在 文档 中描述
TLDR;在ABS的IAM中添加角色分配 "Blob Storage Data {Reader or Contributor}" 到您的Synapse服务器主体
Exasol
您可以在Docker中本地运行Exasol或使用SaaS。
在Docker中本地运行Exasol
在docker中运行Exasol(对于此案例,.env已预配置)
docker compose up -d exasol
在其他地方运行Exasol服务器并设置环境变量
EXASOL_HOST= EXASOL_USERNAME= EXASOL_PASSWORD=
在SaaS中运行Exasol
登录到SaaS UI(或使用本地客户端)并创建具有以下权限的用户。
CREATE USER "<nick>_ie" IDENTIFIED BY "password"; GRANT CREATE SESSION, CREATE SCHEMA, CREATE TABLE, CREATE VIEW, CREATE USER, CREATE ROLE, DROP USER, DROP ANY ROLE, GRANT ANY ROLE, ALTER ANY SCHEMA, ALTER USER, IMPORT, EXPORT TO "<nick>_ie" WITH ADMIN OPTION;
获取主机(带有端口)、用户名和密码(从上一步)并在.env
中填写,如上所述。确保您的账户已为您IP启用网络。
Teradata
在AWS/Azure上准备Teradata服务器并设置以下属性。见
为测试创建新数据库
CREATE DATABASE <nick>_ie_lib_tests FROM dbc AS PERMANENT = 1e8, SPOOL = 1e8;
TERADATA_HOST= TERADATA_USERNAME= TERADATA_PASSWORD= TERADATA_PORT= TERADATA_DATABASE=
Bigquery
安装 Google Cloud客户端(通过Brew),初始化它并登录到 生成默认凭据。
要准备后端,您可以使用 Terraform模板。您必须具有组织的resourcemanager.folders.create
权限。
# you can copy it to a folder somewhere and make an init
terraform init
使用以下变量运行terraform apply
- folder_id:转到 GCP资源管理器 并选择您的团队开发文件夹ID(例如,找到'KBC Team Dev'并复制ID)
- backend_prefix:你的名字,所有资源都将使用此前缀创建
- billing_account_id:转到 计费 并复制您的计费账户ID
terraform apply -var folder_id=<folder_id> -var backend_prefix=<your_prefix> -var billing_account_id=<billing_account_id>
有关缺失部分的说明,请参阅 连接存储库。Terraform apply完成后,转到由Terraform创建的文件夹中的服务项目。
- 将密钥转换为字符串并保存到
.env
文件中:awk -v RS= '{$1=$1}1' <key_file>.json >> .env
- 将
.env
文件最后一行的内容设置为变量BQ_KEY_FILE
- 设置环境变量
BQ_BUCKET_NAME
,由TF模板中的file_storage_bucket_id
生成
测试
使用以下命令运行测试。
注意:必须提供Azure凭据并上传 fixtures
docker compose run --rm dev composer tests
单元和功能测试可以单独运行
#unit test
docker compose run --rm dev composer tests-unit
#functional test
docker compose run --rm dev composer tests-functional
代码质量检查
#phplint
docker compose run --rm dev composer phplint
#phpcs
docker compose run --rm dev composer phpcs
#phpstan
docker compose run --rm dev composer phpstan
完整的CI工作流程
此命令将运行所有检查,加载 fixtures 并运行测试
docker compose run --rm dev composer ci
用法
Snowflake
ABS -> Snowflake import/load
use Keboola\Db\ImportExport\Backend\Snowflake\Importer; use Keboola\Db\ImportExport\ImportOptions; use Keboola\Db\ImportExport\Storage; $absSourceFile = new Storage\ABS\SourceFile(...); $snowflakeDestinationTable = new Storage\Snowflake\Table(...); $importOptions = new ImportOptions(...); (new Importer($snowflakeConnection))->importTable( $absSourceFile, $snowflakeDestinationTable, $importOptions );
Snowflake -> Snowflake copy
use Keboola\Db\ImportExport\Backend\Snowflake\Importer; use Keboola\Db\ImportExport\ImportOptions; use Keboola\Db\ImportExport\Storage; $snowflakeSourceTable = new Storage\Snowflake\Table(...); $snowflakeDestinationTable = new Storage\Snowflake\Table(...); $importOptions = new ImportOptions(...); (new Importer($snowflakeConnection))->importTable( $snowflakeSourceTable, $snowflakeDestinationTable, $importOptions );
Snowflake -> ABS export/unload
use Keboola\Db\ImportExport\Backend\Snowflake\Exporter; use Keboola\Db\ImportExport\ExportOptions; use Keboola\Db\ImportExport\Storage; $snowflakeSourceTable = new Storage\Snowflake\Table(...); $absDestinationFile = new Storage\ABS\DestinationFile(...); $exportOptions = new ExportOptions(...); (new Exporter($snowflakeConnection))->exportTable( $snowflakeSourceTable, $absDestinationFile, $exportOptions );
Synapse next(实验性)
导入到Synapse
use Keboola\TableBackendUtils\Table\SynapseTableDefinition; use Keboola\TableBackendUtils\Table\SynapseTableQueryBuilder; use Keboola\Db\ImportExport\Backend\Synapse\ToStage\StageTableDefinitionFactory; use Keboola\Db\ImportExport\Storage; use Keboola\Db\ImportExport\Backend\Synapse\ToStage\ToStageImporter; use Keboola\Db\ImportExport\Backend\Synapse\SynapseImportOptions; use Keboola\Db\ImportExport\Backend\Synapse\ToFinalTable\IncrementalImporter; use Keboola\Db\ImportExport\Backend\Synapse\ToFinalTable\FullImporter; use Keboola\Db\ImportExport\Backend\Synapse\ToFinalTable\SqlBuilder; use Doctrine\DBAL\Connection; $importSource = new Storage\ABS\SourceFile(...); // or $importSource = new Storage\Synapse\Table(...); // or $importSource = new Storage\Synapse\SelectSource(...); $destinationTable = new SynapseTableDefinition(...); $options = new SynapseImportOptions(...); $synapseConnection = new Connection(...); $stagingTable = StageTableDefinitionFactory::createStagingTableDefinition( $destinationTable, $importSource->getColumnsNames() ); $qb = new SynapseTableQueryBuilder($synapseConnection); $synapseConnection->executeStatement( $qb->getCreateTableCommandFromDefinition($stagingTable) ); $toStageImporter = new ToStageImporter($synapseConnection); $toFinalTableImporter = new IncrementalImporter($synapseConnection); // or $toFinalTableImporter = new FullImporter($synapseConnection); try { $importState = $toStageImporter->importToStagingTable( $importSource, $stagingTable, $options ); $result = $toFinalTableImporter->importToTable( $stagingTable, $destinationTable, $options, $importState ); } finally { $synapseConnection->executeStatement( (new SqlBuilder())->getDropTableIfExistsCommand( $stagingTable->getSchemaName(), $stagingTable->getTableName() ) ); }
内部/扩展
库由几个简单的接口组成。
创建新的后端
导入器、导出器接口必须在新的后端中实现
Keboola\Db\ImportExport\Backend\ImporterInterface
Keboola\Db\ImportExport\Backend\ExporterInterface
每个后端都有一个相应的适配器,该适配器支持自己组合的源接口和目标接口。可以使用 setAdapters
方法设置自定义适配器。
创建新存储
存储现在为文件存储 ABS|S3(未来)或表格存储 Snowflake|Synapse。存储可以有 Source
和 Destination
,这些必须实现 SourceInterface
或 DestinationInterface
。这些接口为空,适配器需要支持自己的组合。一般来说,每个 FileStorage <=> TableStorage 组合都有一个导入/导出适配器。
适配器必须实现
- 用于导入的
Keboola\Db\ImportExport\Backend\BackendImportAdapterInterface
- 用于导出的
Keboola\Db\ImportExport\Backend\BackendExportAdapterInterface
后端可能需要自己的扩展 AdapterInterface(Synapse 和 Snowflake 现在是这样的)。
许可证
MIT 许可,请参阅 LICENSE 文件。