whitedigital-eu/etl-bundle

Symfony 6+ 的提取/转换/加载处理包

安装: 323

依赖: 0

建议者: 0

安全性: 0

星标: 1

关注者: 1

分支: 0

公开问题: 2

类型:symfony-bundle

0.5.8 2024-05-31 10:17 UTC

README

运行 ETL 任务的包。

  1. 定义自定义提取器、转换器和加载器
  2. 创建管道以运行特定的数据导入/导出过程。

任务可以从以下方式运行:

  1. 通过 CLI (bin/console etl:run <task_name>)
  2. 通过服务或控制器
  3. 通过前端使用服务器发送事件 (SSE) API。

要求

  1. PHP 8.1+
  2. Symfony 6.2+

安装包

composer req "whitedigital-eu/etl-bundle"

设置任务

示例任务(HorizonDataExtractor,HorizonCustomerTransformer - 应该单独创建)

<?php

declare(strict_types=1);

namespace App\ETL\Task;

use App\ETL\Extractor\HorizonDataExtractor;
use App\ETL\Transformer\HorizonCustomerTransformer;
use Symfony\Component\Console\Output\OutputInterface;
use Symfony\Component\Mailer\Exception\TransportExceptionInterface;
use WhiteDigital\EtlBundle\Attribute\AsTask;
use WhiteDigital\EtlBundle\Exception\EtlException;
use WhiteDigital\EtlBundle\Loader\DoctrineDbalLoader;
use WhiteDigital\EtlBundle\Task\AbstractTask;

#[AsTask(name: 'horizon_customer_import')]
class HorizonCustomerImportTask extends AbstractTask
{
    /**
     * @throws EtlException
     * @throws TransportExceptionInterface
     */
    public function runTask(OutputInterface $output, array $extractorArgs = null): void
    {
        $this->etlPipeline
            ->setOutput($output)
            ->addExtractor(HorizonDataExtractor::class, $extractorArgs ?? ['path' => '/rest/TDdmNorSar/query?columns=K.KODS,K.NOSAUK&orderby=K.NOSAUK asc'])
            ->addTransformer(HorizonCustomerTransformer::class)
            ->addLoader(DoctrineDbalLoader::class)
            ->run();
    }

}

示例提取器

<?php

declare(strict_types=1);

namespace App\ETL\Extractor;

use App\Service\HorizonRestApiService;
use Psr\Cache\InvalidArgumentException;
use Symfony\Contracts\HttpClient\Exception\ClientExceptionInterface;
use Symfony\Contracts\HttpClient\Exception\RedirectionExceptionInterface;
use Symfony\Contracts\HttpClient\Exception\ServerExceptionInterface;
use Symfony\Contracts\HttpClient\Exception\TransportExceptionInterface;
use WhiteDigital\EtlBundle\Exception\ExtractorException;
use WhiteDigital\EtlBundle\Extractor\AbstractExtractor;
use WhiteDigital\EtlBundle\Helper\Queue;

final class HorizonDataExtractor extends AbstractExtractor
{
    public function __construct(
        private readonly HorizonRestApiService $horizonRestApiService,
    )
    {
    }

    /**
     * @param \Closure|null $batchProcessor
     * @return Queue<\stdClass>
     * @throws ClientExceptionInterface
     * @throws ExtractorException
     * @throws InvalidArgumentException
     * @throws RedirectionExceptionInterface
     * @throws ServerExceptionInterface
     * @throws TransportExceptionInterface
     * @throws \JsonException
     */
    public function run(\Closure $batchProcessor = null): Queue
    {
        if (null !== $batchProcessor) {
            throw new ExtractorException(sprintf('Batch mode not supported by %s', __CLASS__));
        }
        $data = new Queue();

        $this->output->writeln(sprintf('Datu iegūšana uzsākta no avota: [%s]', $path = $this->getOption('path')));
        $rawJsonData = $this->horizonRestApiService->makeGetRequest($path);
        foreach ($rawJsonData?->collection->row as $row) {
            $data->push($row);
        }
        $this->output->writeln(sprintf('Iegūti %s ieraksti.', count($data)));

        return $data;
    }
}

示例转换器


示例加载器


控制台命令

  1. 按名称运行任务
bin/console etl:run horizon_customer_import

或传递额外的自定义参数

bin/console etl:run horizon_customer_import random_path.txt
  1. 列出可用任务
bin/console etl:list