code-rhapsodie/dataflow-bundle

受PortPHP启发的数据处理框架

安装数: 40,077

依赖者: 1

建议者: 0

安全: 0

星标: 15

关注者: 4

分支: 4

开放问题: 3

类型:symfony-bundle

v4.1.2 2023-12-06 12:56 UTC

README

DataflowBundle 是一个为 Symfony 3.4+ 提供创建导入/导出数据流的便捷方式的包。

Dataflow 使用由三部分组成的线性通用工作流程

  • 一个读取器
  • 任意数量的步骤,可以是同步或异步的
  • 一个或多个写入器

读取器可以从任何地方读取数据并逐行返回数据。每个步骤处理当前行数据。步骤按添加的顺序执行。然后,一个或多个写入器将行数据保存到任何你想要的地方。

如下所示,你可以定义多个数据流

Dataflow schema

功能

  • 定义和配置数据流
  • 运行计划作业
  • 从命令行运行一个数据流
  • 从命令行定义数据流的计划
  • 从命令行启用/禁用计划数据流
  • 从命令行显示计划数据流的列表
  • 从命令行显示数据流的最后一个作业的结果
  • 与多个 Doctrine DBAL 连接一起工作

安装

安全警告:在 4.1.12 之前不支持 Symfony 4.x,请参阅 https://github.com/advisories/GHSA-pgwj-prpq-jpc2

添加依赖项

要安装此包,请运行以下命令

$ composer require code-rhapsodie/dataflow-bundle

建议

您可以使用来自 PortPHP 的通用读取器、写入器和步骤。

对于写入器,您必须使用类似于这样的适配器 CodeRhapsodie\DataflowBundle\DataflowType\Writer\PortWriterAdapter

<?php
// ...
$streamWriter = new \Port\Writer\StreamMergeWriter();

$builder->addWriter(new \CodeRhapsodie\DataflowBundle\DataflowType\Writer\PortWriterAdapter($streamWriter));
// ...

注册包

Symfony 4 (新树)

对于 Symfony 4,请在 config/bundles.php 文件中添加 CodeRhapsodie\DataflowBundle\CodeRhapsodieDataflowBundle::class => ['all' => true],

如下所示

<?php

return [
     // ...
    CodeRhapsodie\DataflowBundle\CodeRhapsodieDataflowBundle::class => ['all' => true],
    // ...
];

Symfony 3.4 (旧树)

对于 Symfony 3.4,请在 app/AppKernel.php 文件中添加新行。

如下所示

<?php
// app/AppKernel.php

public function registerBundles()
{
    $bundles = [
        // ...
        new CodeRhapsodie\DataflowBundle\CodeRhapsodieDataflowBundle(),
        // ...
    ];
}

更新数据库

此包使用 Doctrine DBAL 将数据流计划存储到数据库表(cr_dataflow_scheduled)和作业(cr_dataflow_job)中。

如果您使用 Doctrine Migration BundlePhinxKaliop Migration Bundle 或其他,您可以使用此命令生成的 SQL 查询添加新迁移

$ bin/console code-rhapsodie:dataflow:dump-schema

如果您已经有表,则可以使用此命令生成的更新 SQL 查询添加新迁移

$ bin/console code-rhapsodie:dataflow:dump-schema --update

配置

默认情况下,使用的 Doctrine DBAL 连接是 default。您可以配置默认连接。将此配置添加到您的 Symfony 配置中

code_rhapsodie_dataflow:
  dbal_default_connection: test #Name of the default connection used by Dataflow bundle

默认情况下,将使用 logger 服务记录所有异常和自定义消息。如果您想使用另一个记录器,如特定的 Monolog 处理器,请添加此配置

code_rhapsodie_dataflow:
  default_logger: monolog.logger.custom #Service ID of the logger you want Dataflow to use

消息传递模式

如果可用,Dataflow 可以将作业执行委托给 Symfony 消息传递组件。这允许作业由工作者并发执行而不是顺序执行。

要启用消息传递模式

code_rhapsodie_dataflow:
  messenger_mode:
    enabled: true
    # bus: 'messenger.default_bus' #Service ID of the bus you want Dataflow to use, if not the default one

您还需要将 Dataflow 消息路由到适当的传输

# config/packages/messenger.yaml
framework:
  messenger:
    transports:
      async: '%env(MESSENGER_TRANSPORT_DSN)%'

    routing:
      CodeRhapsodie\DataflowBundle\MessengerMode\JobMessage: async

定义数据流类型

此包使用固定和简单的流程结构,以便让您专注于数据流的数据处理逻辑部分。

数据流类型定义了您数据流的不同部分。数据流由

  • 恰好一个 读取器
  • 任意数量的步骤
  • 一个或多个编写者

可以使用选项配置数据流类型。

数据流类型必须实现CodeRhapsodie\DataflowBundle\DataflowType\DataflowTypeInterface

为了帮助创建数据流类型,提供了一个抽象类CodeRhapsodie\DataflowBundle\DataflowType\AbstractDataflowType,允许您通过方便的构建器CodeRhapsodie\DataflowBundle\DataflowType\DataflowBuilder定义数据流。

以下是一个定义一个类DataflowType的示例

<?php
namespace CodeRhapsodie\DataflowExemple\DataflowType;

use CodeRhapsodie\DataflowBundle\DataflowType\AbstractDataflowType;
use CodeRhapsodie\DataflowBundle\DataflowType\DataflowBuilder;
use CodeRhapsodie\DataflowExemple\Reader\FileReader;
use CodeRhapsodie\DataflowExemple\Writer\FileWriter;

class MyFirstDataflowType extends AbstractDataflowType
{
    private $myReader;

    private $myWriter;

    public function __construct(FileReader $myReader, FileWriter $myWriter)
    {
        $this->myReader = $myReader;
        $this->myWriter = $myWriter;
    }

    protected function buildDataflow(DataflowBuilder $builder, array $options): void
    {
        $this->myWriter->setDestinationFilePath($options['to-file']);

        $builder
            ->setReader($this->myReader->read($options['from-file']))
            ->addStep(function ($data) use ($options) {
                // TODO : Write your code here...
                return $data;
            })
            ->addWriter($this->myWriter)
        ;
    }

    protected function configureOptions(OptionsResolver $optionsResolver): void
    {
        $optionsResolver->setDefaults(['to-file' => '/tmp/dataflow.csv', 'from-file' => null]);
        $optionsResolver->setRequired('from-file');
    }

    public function getLabel(): string
    {
        return 'My First Dataflow';
    }

    public function getAliases(): iterable
    {
        return ['mfd'];
    }
}

数据流类型必须带有标签coderhapsodie.dataflow.type

如果您正在使用Symfony自动配置服务,此标签将自动添加到实现DataflowTypeInterface的所有服务中。

否则,请手动在数据流类型服务配置中添加标签coderhapsodie.dataflow.type

```yaml
    CodeRhapsodie\DataflowExemple\DataflowType\MyFirstDataflowType:
      tags:
        - { name: coderhapsodie.dataflow.type }

为您的数据流类型使用选项

AbstractDataflowType可以帮助您为Dataflow类型定义选项。

在您的DataflowType类中添加此方法

<?php
// ...
use Symfony\Component\OptionsResolver\OptionsResolver;

class MyFirstDataflowType extends AbstractDataflowType
{
    // ...
    protected function configureOptions(OptionsResolver $optionsResolver): void
    {
        $optionsResolver->setDefaults(['to-file' => '/tmp/dataflow.csv', 'from-file' => null]);
        $optionsResolver->setRequired('from-file');
    }

}

在此配置中,选项fileName是必需的。有关选项解析器的高级使用,请参阅Symfony文档

对于异步管理,AbstractDataflowType提供了两个默认选项

  • loopInterval:默认为0。如果您想自定义tick循环的持续时间,请更新此间隔。
  • emitInterval:默认为0。更新此间隔以控制读取器何时在流管道中发出新数据。

日志记录

所有异常都将被捕获并写入日志。如果您想将自定义消息添加到日志中,可以在您的读取器/步骤/编写者中注入日志记录器。如果您的DataflowType类扩展了AbstractDataflowType,则日志记录器可通过$this->logger访问。

<?php
// ...
use Symfony\Component\OptionsResolver\OptionsResolver;

class MyDataflowType extends AbstractDataflowType
{
    // ...
    protected function buildDataflow(DataflowBuilder $builder, array $options): void
    {
        $this->myWriter->setLogger($this->logger);
    }

}

当使用code-rhapsodie:dataflow:run-pending命令时,此日志记录器还将用于在数据库中保存相应的作业日志。

检查您的DataflowType是否就绪

执行此命令以检查您的DataflowType是否正确注册

$ bin/console debug:container --tag coderhapsodie.dataflow.type --show-private

结果如下

Symfony Container Public and Private Services Tagged with "coderhapsodie.dataflow.type" Tag
===========================================================================================

 ---------------------------------------------------------------- ---------------------------------------------------------------- 
  Service ID                                                       Class name                                                      
 ---------------------------------------------------------------- ---------------------------------------------------------------- 
  CodeRhapsodie\DataflowExemple\DataflowType\MyFirstDataflowType   CodeRhapsodie\DataflowExemple\DataflowType\MyFirstDataflowType  
 ---------------------------------------------------------------- ---------------------------------------------------------------- 

读取器

读取器为数据流提供导入/导出的元素。通常,元素是从外部资源(文件、数据库、web服务等)读取的。

读取器可以是任何iterable

返回元素的类型约束仅是它们不能是false

读取器可以是像以下示例这样的生成器

<?php

namespace CodeRhapsodie\DataflowExemple\Reader;

class FileReader
{
    public function read(string $filename): iterable
    {
        if (!$filename) {
            throw new \Exception("The file name is not defined. Define it with 'setFilename' method");
        }

        if (!$fh = fopen($filename, 'r')) {
            throw new \Exception("Unable to open file '".$filename."' for read.");
        }

        while (false !== ($read = fgets($fh))) {
            yield explode('|', trim($read));
        }
    }
}

您可以根据以下设置配置此读取器

$builder->setReader(($this->myReader)())

步骤

步骤是在元素被编写者处理之前执行的运算。通常,步骤是

  • 转换器,改变元素
  • 过滤器,条件性地阻止对该元素执行进一步操作
  • 生成器,可以包括异步操作

步骤可以是任何可调用对象,它以元素为其参数,并返回

  • 元素,可能已更改
  • false,如果不应对此元素执行进一步操作

一些示例

<?php
//[...]
$builder->addStep(function ($item) {
    // Titles are changed to all caps before export
    $item['title'] = strtoupper($item['title']);

    return $item;
});

// asynchronous step with 2 scale factor
$builder->addStep(function ($item): \Generator {
    yield new \Amp\Delayed(1000); // asynchronous processing for 1 second long

    // Titles are changed to all caps before export
    $item['title'] = strtolower($item['title']);

    return $item;
}, 2);

$builder->addStep(function ($item) {
    // Private items are not exported
    if ($item['private']) {
        return false;
    }

    return $item;
});
//[...]

注意:如果您将所有步骤都扩展到1倍因子,则可以确保异步操作的写入顺序。

编写者

编写者执行实际的导入/导出操作。

编写者必须实现CodeRhapsodie\DataflowBundle\DataflowType\Writer\WriterInterface。由于此接口与Port\Writer不兼容,因此提供了适配器CodeRhapsodie\DataflowBundle\DataflowType\Writer\PortWriterAdapter

此示例展示了如何使用预定义的PhpPort编写者

$builder->addWriter(new PortWriterAdapter(new \Port\FileWriter()));

或您自己的编写者

<?php
namespace CodeRhapsodie\DataflowExemple\Writer;

use CodeRhapsodie\DataFlowBundle\DataflowType\Writer\WriterInterface;

class FileWriter implements WriterInterface
{
    private $fh;

    /** @var string */
    private $path;

    public function setDestinationFilePath(string $path) {
        $this->path = $path;
    }

    public function prepare()
    {
        if (null === $this->path) {
            throw new \Exception('Define the destination file name before use');
        }
        if (!$this->fh = fopen($this->path, 'w')) {
            throw new \Exception('Unable to open in write mode the output file.');
        }
    }

    public function write($item)
    {
        fputcsv($this->fh, $item);
    }

    public function finish()
    {
        fclose($this->fh);
    }
}

CollectionWriter

如果您想从一个单个读取操作中写入多个项目,可以使用通用的CollectionWriter。这个写入器会遍历它接收到的任何iterable,并将该集合中的每个项目传递给您的处理单个项目的自定义写入器。

$builder->addWriter(new CollectionWriter($mySingleItemWriter));

DelegatorWriter

如果您想根据读取的项目调用不同的写入器,可以使用通用的DelegatorWriter

例如,假设我们的项目是数组,第一个条目是productorder。我们希望根据该值使用不同的写入器。

首先,创建您的写入器,实现DelegateWriterInterface(此接口扩展了WriterInterface,因此您的写入器仍然可以在不使用DelegatorWriter的情况下使用)。

<?php
namespace CodeRhapsodie\DataflowExemple\Writer;

use CodeRhapsodie\DataFlowBundle\DataflowType\Writer\WriterInterface;

class ProductWriter implements DelegateWriterInterface
{
    public function supports($item): bool
    {
        return 'product' === reset($item);
    }

    public function prepare()
    {
    }

    public function write($item)
    {
        // Process your product
    }

    public function finish()
    {
    }
}
<?php
namespace CodeRhapsodie\DataflowExemple\Writer;

use CodeRhapsodie\DataFlowBundle\DataflowType\Writer\WriterInterface;

class OrderWriter implements DelegateWriterInterface
{
    public function supports($item): bool
    {
        return 'order' === reset($item);
    }

    public function prepare()
    {
    }

    public function write($item)
    {
        // Process your order
    }

    public function finish()
    {
    }
}

然后,配置您的DelegatorWriter并将其添加到您的数据流类型中。

    protected function buildDataflow(DataflowBuilder $builder, array $options): void
    {
        // Snip add reader and steps

        $delegatorWriter = new DelegatorWriter();
        $delegatorWriter->addDelegate(new ProductWriter());
        $delegatorWriter->addDelegate(new OrderWriter());

        $builder->addWriter($delegatorWriter);
    }

在执行期间,DelegatorWriter将简单地将其接收到的每个项目传递给其第一个支持它的代理(按照添加的顺序)。如果没有代理支持项目,将抛出异常。

队列

所有待处理的数据流作业过程都存储在数据库中的队列中。

将此命令添加到您的crontab以执行所有队列中的作业

$ SYMFONY_ENV=prod php bin/console code-rhapsodie:dataflow:run-pending

命令

提供了一些命令来管理计划并运行作业。

code-rhapsodie:dataflow:run-pending根据它们的计划执行队列中的作业。

当启用信使模式时,作业将根据它们的计划创建,但执行将由信使组件处理。

code-rhapsodie:dataflow:schedule:list显示计划的数据流列表。

code-rhapsodie:dataflow:schedule:change-status启用或禁用计划的数据流

code-rhapsodie:dataflow:schedule:add为数据流添加计划。

code-rhapsodie:dataflow:job:show显示作业的最后结果。

code-rhapsodie:dataflow:execute允许您执行一个数据流作业。

code-rhapsodie:dataflow:dump-schema生成创建/更新SQL查询的架构

与多个数据库一起工作

所有命令都有一个--connection选项,用于在执行期间定义要使用的Doctrine DBAL连接。

示例

此命令使用default DBAL连接生成所有架构更新查询。

$ bin/console code-rhapsodie:dataflow:dump-schema --update --connection=default

要执行特定连接的所有挂起作业,请使用

# Run for dataflow DBAL connection
$ bin/console code-rhapsodie:dataflow:run-pending --connection=dataflow
# Run for default DBAL connection
$ bin/console code-rhapsodie:dataflow:run-pending --connection=default

问题和功能请求

请在https://github.com/code-rhapsodie/dataflow-bundle/issues处报告问题和请求功能。

请注意,仅支持此包3.x和4.x版本的最新发布版。

贡献

欢迎贡献。请参阅CONTRIBUTING.md以获取详细信息。感谢所有已做出贡献的人

许可证

此软件包根据MIT许可证授权。