maketok/datamigration

Maketok 数据迁移引擎

0.3.1 2018-09-06 14:19 UTC

This package is auto-updated.

Last update: 2024-09-29 04:42:21 UTC


README

Build Status Scrutinizer Code Quality Code Coverage

此包旨在在不同资源之间迁移大量数据。

一些影响

它非常适合计划和执行复杂的导入或导出操作。它还可以用作传输工具,将数据从一种结构迁移到另一种结构。单元(工作者)结构允许它解析复杂的导入文件,并从中提取多级数据结构。

反向操作也是一样:多级数据结构可以插入到单个文件中。

安装

使用 composer 将其包含到您的项目中

composer require maketok/datamigration

示例

tests/integration/QueueWorkflowTest 集成测试中有几个示例。以下是典型用法。

导入

从平面 CSV 文件导入客户和地址。

use Maketok\DataMigration\MapInterface;
use Maketok\DataMigration\QueueWorkflow;
use Maketok\DataMigration\Unit\SimpleBag;
use Maketok\DataMigration\Unit\Type\Unit;
use Maketok\DataMigration\Storage\Db\ResourceHelperInterface;
use Maketok\DataMigration\Action\Type\CreateTmpFiles;
use Maketok\DataMigration\Action\Type\Load;
use Maketok\DataMigration\Action\Type\Move;
use Maketok\DataMigration\Input\Shaper\Processor\Nulls;

$customerUnit = new Unit('customers');
$customerUnit->setTable('customers');
$customerUnit->setMapping([
    'id' => 'map.customer_id'
    // ExpressionLanguage is used to interpret string expressions
    'email' => 'mail.email',
    // closure or any other callable is also acceptable
    'name' => function (MapInterface $map) {
        return $map['name'];
    },
    'age' => 'map.age',
]);
/*
 * the is_entity condition resolves whether
 *  unit should consider current row as the entity
 * some utility functions are available in
 *  Maketok\DataMigration\Expression\HelperExpressionsProvider
 */
$customerUnit->setIsEntityCondition("trim(map.email) is not empty");
/*
 * the contributions is the way for unit to
 *  add some data into general pool for every other unit to use
 *
 * This is the logic for assigning customer_id
 * First it checks if it exists in the pre-compiled Hashmap
 * If it does not, it's calling for frozen increment for "new_customer_id" key
 *  and assign the last increment id if it's non existent
 * The frozenIncr is different from incr in that it's incremented only once
 *  is_entity condition resolves for current row
 * So it's perfect for incrementing "parent" entities
 */
$customerUnit->addContribution(function (
    MapInterface $map,
    ResourceHelperInterface $resource,
    array $hashmaps
    ) {
        if (isset($hashmaps['email-id'][trim($map->email)])) {
            $map['customer_id'] = $hashmaps['email-id'][trim($map->email)];
        } else {
            $map['customer_id'] = $map->frozenIncr(
                'new_customer_id',
                 $resource->getLastIncrementId('customers')
             )
        }
    });

$addressUnit = new Unit('addresses');
$addressUnit->setTable('addresses');
$addressUnit->setMapping([
    'id' => 'map.incr('address_id', resource.getLastIncrementId('addresses'))'
    'street' => 'map.street',
    'city' => 'map.city',
    'zip' => 'map.zip',
    'parent_id' => 'map.customer_id',
]);
$addressUnit->setParent($customerUnit);
$bag = new SimpleBag();
$bag->addSet([$customerUnit, $addressUnit]);

/*
 * Last but not least, since we're using CSV file, we need a Shaper
 * instance to shape up our flat file before feeding it to CreateTmpFiles action
 */
$input = new Csv($fname, 'r', new Nulls($bag, new ArrayMap(), $this->getLanguageAdapter()));

$workflow = new QueueWorkflow($config, $result);
$workflow->add(new CreateTmpFiles($bag, $config, $languageAdapter,
    $input, new ArrayMap(), $helperResource));
$workflow->add(new Load($bag, $config, $resource));
$workflow->add(new Move($bag, $config, $resource));
$workflow->execute();

导出

我们有三个数据库表用于客户及其地址。

customer

customer_data

address

我们希望得到以下输出

customers.csv

use Maketok\DataMigration\MapInterface;
use Maketok\DataMigration\QueueWorkflow;
use Maketok\DataMigration\Unit\SimpleBag;
use Maketok\DataMigration\Unit\Type\Unit;
use Maketok\DataMigration\Storage\Db\ResourceHelperInterface;
use Maketok\DataMigration\Action\Type\AssembleInput;
use Maketok\DataMigration\Action\Type\Load;
use Maketok\DataMigration\Action\Type\Move;
use Maketok\DataMigration\Input\Shaper\Processor\Nulls;

$customerUnit = new Unit('customers');
$customerUnit->setTable('customer');
$customerUnit->setMapping([
    'id' => 'map.customer_id'
    'email' => 'mail.email',
    'age' => 'map.age',
]);
$customerUnit->setIsEntityCondition("trim(map.email) is not empty");
$customerUnit->addContribution("map.offsetSet(
    'customer_id',
    (isset(hashmaps['email-id'][trim(map.email)]) ?
        hashmaps['email-id'][trim(map.email)] :
        map.frozenIncr(
            'new_customer_id',
            resource.getLastIncrementId('customer')
        )
    )
)");
$customerUnit->setReversedConnection([
  'customer_id' => 'id',
]);
$customerUnit->setReversedMapping([
    'email' => 'map.email',
    'age' => 'map.age',
]);

$customerDataUnit = new Unit('customer_data');
$customerDataUnit->setTable('customer_data');
$customerDataUnit->setMapping([
    'id' => 'map.incr('customer_data_id', resource.getLastIncrementId('customer_data'))'
    'parent_id' => 'map.customer_id'
    'firstname' => 'map.firstname',
    'lastname' => 'map.lastname',
]);
$customerDataUnit->addContribution("map.offsetSet(
    'complexName',
    explode(' ', map.name)
)");
$customerDataUnit->addContribution("map.offsetSet(
    'firstname',
    (count(map.complexName) >= 2 && isset(map.complexName[0]) ? map.complexName[0] : map.name)
)");
$customerDataUnit->addContribution("map.offsetSet(
    'lastname',
    (count(map.complexName) >= 2 && isset(map.complexName[1]) ? map.complexName[1] : '')
)");
$customerDataUnit->setReversedConnection([
    'customer_id' => 'parent_id',
]);
$customerDataUnit->setReversedMapping([
    'name' => 'map.firstname ~ " " ~ map.lastname',
]);
$customerUnit->addSibling($customerDataUnit);

$addressUnit = new Unit('addresses');
$addressUnit->setTable('address');
$addressUnit->setMapping([
    'id' => 'map.incr('address_id', resource.getLastIncrementId('address'))'
    'customer_id' => 'map.customer_id',
    'street' => 'map.street',
    'city' => 'map.city',
    'zip' => 'map.zip',
]);
$addressUnit->setReversedConnection([
    'customer_id' => 'customer_id',
]);
$addressUnit->setReversedMapping([
    'street' => 'map.street',
    'city' => 'map.city',
    'zip' => 'map.zip',
]);
$addressUnit->setParent($customerUnit);
$bag = new SimpleBag();
$bag->addSet([$customerUnit, $customerDataUnit, $addressUnit]);

$input = new Csv($fname, 'w', new Nulls($bag, new ArrayMap(), $this->getLanguageAdapter()));

$result = new Result();
$workflow = new QueueWorkflow($this->config, $result);
$workflow->add(new ReverseMove($bag, $config, $resource));
$workflow->add(new Dump($bag, $config, $resource));
$workflow->add(new AssembleInput($bag, $config, $languageAdapter, $input, new ArrayMap()));
$workflow->execute();

常见问题解答