whitedigital-eu / etl-bundle
Symfony 6+ 的提取/转换/加载处理包
0.5.8
2024-05-31 10:17 UTC
Requires
- php: >=8.1
- doctrine/collections: *
- doctrine/dbal: *
- doctrine/orm: *
- doctrine/persistence: *
- psr/container: *
- symfony/config: ^6.2
- symfony/console: ^6.2
- symfony/dependency-injection: ^6.2
- symfony/http-kernel: ^6.2
- symfony/mailer: ^6.2
- symfony/property-access: ^6.2
- symfony/security-bundle: ^6.2
- symfony/service-contracts: *
- symfony/twig-bridge: ^6.2
- whitedigital-eu/audit-service: ^0.5|^0.6|^0.7
- whitedigital-eu/entity-resource-mapper-bundle: *
Requires (Dev)
- roave/security-advisories: dev-latest
- whitedigital-eu/config-pack: *
README
运行 ETL 任务的包。
- 定义自定义提取器、转换器和加载器
- 创建管道以运行特定的数据导入/导出过程。
任务可以从以下方式运行:
- 通过 CLI (bin/console etl:run <task_name>)
- 通过服务或控制器
- 通过前端使用服务器发送事件 (SSE) API。
要求
- PHP 8.1+
- Symfony 6.2+
安装包
composer req "whitedigital-eu/etl-bundle"
设置任务
示例任务(HorizonDataExtractor,HorizonCustomerTransformer - 应该单独创建)
<?php declare(strict_types=1); namespace App\ETL\Task; use App\ETL\Extractor\HorizonDataExtractor; use App\ETL\Transformer\HorizonCustomerTransformer; use Symfony\Component\Console\Output\OutputInterface; use Symfony\Component\Mailer\Exception\TransportExceptionInterface; use WhiteDigital\EtlBundle\Attribute\AsTask; use WhiteDigital\EtlBundle\Exception\EtlException; use WhiteDigital\EtlBundle\Loader\DoctrineDbalLoader; use WhiteDigital\EtlBundle\Task\AbstractTask; #[AsTask(name: 'horizon_customer_import')] class HorizonCustomerImportTask extends AbstractTask { /** * @throws EtlException * @throws TransportExceptionInterface */ public function runTask(OutputInterface $output, array $extractorArgs = null): void { $this->etlPipeline ->setOutput($output) ->addExtractor(HorizonDataExtractor::class, $extractorArgs ?? ['path' => '/rest/TDdmNorSar/query?columns=K.KODS,K.NOSAUK&orderby=K.NOSAUK asc']) ->addTransformer(HorizonCustomerTransformer::class) ->addLoader(DoctrineDbalLoader::class) ->run(); } }
示例提取器
<?php declare(strict_types=1); namespace App\ETL\Extractor; use App\Service\HorizonRestApiService; use Psr\Cache\InvalidArgumentException; use Symfony\Contracts\HttpClient\Exception\ClientExceptionInterface; use Symfony\Contracts\HttpClient\Exception\RedirectionExceptionInterface; use Symfony\Contracts\HttpClient\Exception\ServerExceptionInterface; use Symfony\Contracts\HttpClient\Exception\TransportExceptionInterface; use WhiteDigital\EtlBundle\Exception\ExtractorException; use WhiteDigital\EtlBundle\Extractor\AbstractExtractor; use WhiteDigital\EtlBundle\Helper\Queue; final class HorizonDataExtractor extends AbstractExtractor { public function __construct( private readonly HorizonRestApiService $horizonRestApiService, ) { } /** * @param \Closure|null $batchProcessor * @return Queue<\stdClass> * @throws ClientExceptionInterface * @throws ExtractorException * @throws InvalidArgumentException * @throws RedirectionExceptionInterface * @throws ServerExceptionInterface * @throws TransportExceptionInterface * @throws \JsonException */ public function run(\Closure $batchProcessor = null): Queue { if (null !== $batchProcessor) { throw new ExtractorException(sprintf('Batch mode not supported by %s', __CLASS__)); } $data = new Queue(); $this->output->writeln(sprintf('Datu iegūšana uzsākta no avota: [%s]', $path = $this->getOption('path'))); $rawJsonData = $this->horizonRestApiService->makeGetRequest($path); foreach ($rawJsonData?->collection->row as $row) { $data->push($row); } $this->output->writeln(sprintf('Iegūti %s ieraksti.', count($data))); return $data; } }
示例转换器
示例加载器
控制台命令
- 按名称运行任务
bin/console etl:run horizon_customer_import
或传递额外的自定义参数
bin/console etl:run horizon_customer_import random_path.txt
- 列出可用任务
bin/console etl:list