adn-magento / etl

Adn Magento ETL

安装: 0

依赖者: 0

建议者: 0

安全: 0

星标: 0

关注者: 1

分支: 0

开放问题: 0

类型:magento2-module

100.0.0 2022-03-29 08:34 UTC

This package is auto-updated.

Last update: 2024-09-29 06:14:44 UTC


README

使用提取/转换/加载(ETL)模式的数據轉換模塊。

要求

安装

composer require adn-magento/etl

概念

示例

<?php

use Adn\Etl\Model\Pipeline;
use Adn\Etl\Model\Process;
use Adn\Etl\Model\Process\Extractor;
use Adn\Etl\Model\Process\Transformer;
use Adn\Etl\Model\Process\Loader;
use Adn\Etl\Model\Context;
use Adn\Etl\Model\Runner;

$batchSize = 100;

$pipeline = new Pipeline([
    new Process(
        'First Process',
        new Extractor(function (Context $context) {
            return [
                ['label' => 'first row'],
                ['label' => 'second row'],
            ];
        }),
        new Transformer(function (&$entries, $entry, Context $context) {
            
            $identifier = $context->getData('identifier');
        
            $entries[] = [
                'id'     => (int)$identifier,
                'source' => (string)'first_process_etl',
                'data'   => (string)$entry['label'],
            ];
            
            $context->setData('identifier', ($identifier + 1));
        }),
        new Loader(function (array $entries, Context $context) {
            var_dump($entries);
        }),
        $batchSize
    ),
    new Process(
        'Second Process',
        new Extractor(function (Context $context) {
            return [
                ['label' => 'first row'],
                ['label' => 'second row'],
            ];
        }),
        new Transformer(function (&$entries, $entry, Context $context) {
            
            $identifier = $context->getData('identifier');
        
            $entries[] = [
                'id'     => (int)$identifier,
                'source' => (string)'second_process_etl',
                'data'   => (string)$entry['label'],
            ];
            
            $context->setData('identifier', ($identifier + 1));
        }),
        new Loader(function (array $entries, Context $context) {
            var_dump($entries);
        }),
        $batchSize
    ),
]);

$context = new context(['identifier' => 1])

$runner = new Runner();

$preProcess = function (Process $process, Context $context) {
    sprintf('Process %s start' . PHP_EOL, $process->getName());
};

$postProcess = function (Process $process, Context $context) {
    sprintf('Process %s end' . PHP_EOL, $process->getName());
}

$runner->runPipeline(
    $pipeline, 
    $context,
    $preProcess,
    $postProcess
);