slayerbirden/dataflow

一个框架,用于将数据从一个源传输到另一个源。

1.0.3 2019-09-19 12:23 UTC

This package is auto-updated.

Last update: 2024-09-20 00:04:24 UTC


README

Build Status Scrutinizer Code Quality Code Coverage

构建一个管道,让您的数据流过!

关于

这是一个低级别的库,帮助您构建数据迁移过程。它需要一点初始化才能运行,但如果您只需要一个最小化的过程或想要测试一些东西,它很容易实现。

主要思想是您正在构建一个管道,其中每个部分都可以改变通过的数据。它使过程非常灵活,因为每个新步骤都可能获得(完全)转换后的数据。这也可能非常危险,因为错误可能代价高昂,因此我们精心设计了出色的错误处理和报告机制。

示例

构建一个简单的命令行导入脚本。

您需要将CSV文件导入到数据库中?让我们这样做!

  1. 为了举例,我们将用户导入到名为"users"的表中,该表具有以下列

  2. 我们的CSV文件如下所示

  3. 让我们使用PipeLine构建器设置一些已知步骤。我们需要一点初始化来获取DBAL writer和空发射器。

use Doctrine\DBAL\DriverManager;
use SlayerBirden\DataFlow\Emitter\BlackHole;
use SlayerBirden\DataFlow\PipelineBuilder;
use SlayerBirden\DataFlow\Writer\Dbal\UpdateStrategy\UniqueIndexStrategy;
use SlayerBirden\DataFlow\Writer\Dbal\Write;
use SlayerBirden\DataFlow\Writer\Dbal\WriterUtility;

# bootstrap
$connection = DriverManager::getConnection([
    'url' => 'mysql://test-user:testpwd@localhost:4486/foo?charset=UTF8',
]);
// this is just a utility class to "cache" schema info
$utility = new WriterUtility($connection);
$dbWrite = new Write(
    'users_write', // pipe ID for reporting
    $connection, // DBAL connection
    'users', // db table name
    $utility, // utility class
    new UniqueIndexStrategy('users', $utility), // update or insert will depend on unique fields in the table
    $this->emitter
);
$emitter = new BlackHole();

# pipeline
$pipeline = (new PipelineBuilder($emitter))
    ->addSection($dbWrite)
    ->build();
  1. 现在启动Plumber并倒入。
use SlayerBirden\DataFlow\Plumber;
use SlayerBirden\DataFlow\Provider\Csv;
...
$file = new \SplFileObject(__DIR__ . '/users.csv');
$file->setFlags(\SplFileObject::READ_CSV | \SplFileObject::READ_AHEAD | \SplFileObject::SKIP_EMPTY);
(new Plumber(new Csv('users_file', $file), $pipeline, $emitter))->pour();
  1. 我们想要一些报告来了解发生了什么。让我们实现基本的stdOut发射器。
$emitter = new class implements \SlayerBirden\DataFlow\EmitterInterface
{
    public function emit(string $event, ...$args): void
    {
        echo $event, ' ==> ', implode(', ', $args), PHP_EOL;
    }
};
  1. 我们还需要做其他事情:将firstName和lastName连接起来,并分配给"name"。
use SlayerBirden\DataFlow\DataBagInterface;

$pipeline = (new PipelineBuilder($emitter))
    ->map('name', new class implements MapperCallbackInterface
    {
        public function __invoke($value, DataBagInterface $dataBag)
        {
            return $dataBag['first'] . ' ' . $dataBag['last'];
        }
    })
    ->addSection($dbWrite)
    ->build();
  1. 完整的文件可以在examples/example1-import-cli/import.php下找到。
  2. 日志结果:results

目标

  • 易于维护。强大的报告显示了到底发生了什么错误。
  • 灵活。极其灵活的工作流程允许处理不同的导入/导出/迁移场景。
    • 一个文件 -> 多个表
    • 多个文件 -> 一个表
    • 支持动态数据库连接
    • 支持不同的源类型和目标类型
  • 快速。低级操作产生最大流量速度。
  • 稳定。强大的测试覆盖率保证了无错误的底层。

影响