code-rhapsodie / dataflow-bundle
受PortPHP启发的数据处理框架
Requires
- php: ^8.0
- ext-json: *
- doctrine/dbal: ^2.12||^3.0
- doctrine/doctrine-bundle: ^1.0||^2.0
- monolog/monolog: ^1.0||^2.0
- psr/log: ^1.1||^2.0||^3.0
- symfony/config: ^3.4||^4.0||^5.0||^6.0
- symfony/console: ^3.4||^4.0||^5.0||^6.0
- symfony/dependency-injection: ^3.4||>=4.1.12||^5.0||^6.0
- symfony/event-dispatcher: ^3.4||^4.0||^5.0||^6.0
- symfony/http-kernel: ^3.4||^4.0||^5.0||^6.0
- symfony/lock: ^3.4||^4.0||^5.0||^6.0
- symfony/monolog-bridge: ^3.4||^4.0||^5.0||^6.0
- symfony/options-resolver: ^3.4||^4.0||^5.0||^6.0
- symfony/validator: ^3.4||^4.0||^5.0||^6.0
- symfony/yaml: ^3.4||^4.0||^5.0||^6.0
Requires (Dev)
- amphp/amp: ^2.5
- phpunit/phpunit: ^7||^8||^9
- rector/rector: ^0.13.10
- symfony/messenger: ^4.4||^5.0||^6.0
Suggests
- amphp/amp: Provide asynchronous steps for your dataflows
- portphp/portphp: Provides generic readers, steps and writers for your dataflows.
- symfony/messenger: Allows messenger mode, i.e. letting workers run jobs
README
DataflowBundle 是一个为 Symfony 3.4+ 提供创建导入/导出数据流的便捷方式的包。
Dataflow 使用由三部分组成的线性通用工作流程
- 一个读取器
- 任意数量的步骤,可以是同步或异步的
- 一个或多个写入器
读取器可以从任何地方读取数据并逐行返回数据。每个步骤处理当前行数据。步骤按添加的顺序执行。然后,一个或多个写入器将行数据保存到任何你想要的地方。
如下所示,你可以定义多个数据流
功能
- 定义和配置数据流
- 运行计划作业
- 从命令行运行一个数据流
- 从命令行定义数据流的计划
- 从命令行启用/禁用计划数据流
- 从命令行显示计划数据流的列表
- 从命令行显示数据流的最后一个作业的结果
- 与多个 Doctrine DBAL 连接一起工作
安装
安全警告:在 4.1.12 之前不支持 Symfony 4.x,请参阅 https://github.com/advisories/GHSA-pgwj-prpq-jpc2
添加依赖项
要安装此包,请运行以下命令
$ composer require code-rhapsodie/dataflow-bundle
建议
您可以使用来自 PortPHP 的通用读取器、写入器和步骤。
对于写入器,您必须使用类似于这样的适配器 CodeRhapsodie\DataflowBundle\DataflowType\Writer\PortWriterAdapter
<?php // ... $streamWriter = new \Port\Writer\StreamMergeWriter(); $builder->addWriter(new \CodeRhapsodie\DataflowBundle\DataflowType\Writer\PortWriterAdapter($streamWriter)); // ...
注册包
Symfony 4 (新树)
对于 Symfony 4,请在 config/bundles.php
文件中添加 CodeRhapsodie\DataflowBundle\CodeRhapsodieDataflowBundle::class => ['all' => true],
。
如下所示
<?php return [ // ... CodeRhapsodie\DataflowBundle\CodeRhapsodieDataflowBundle::class => ['all' => true], // ... ];
Symfony 3.4 (旧树)
对于 Symfony 3.4,请在 app/AppKernel.php
文件中添加新行。
如下所示
<?php // app/AppKernel.php public function registerBundles() { $bundles = [ // ... new CodeRhapsodie\DataflowBundle\CodeRhapsodieDataflowBundle(), // ... ]; }
更新数据库
此包使用 Doctrine DBAL 将数据流计划存储到数据库表(cr_dataflow_scheduled
)和作业(cr_dataflow_job
)中。
如果您使用 Doctrine Migration Bundle 或 Phinx 或 Kaliop Migration Bundle 或其他,您可以使用此命令生成的 SQL 查询添加新迁移
$ bin/console code-rhapsodie:dataflow:dump-schema
如果您已经有表,则可以使用此命令生成的更新 SQL 查询添加新迁移
$ bin/console code-rhapsodie:dataflow:dump-schema --update
配置
默认情况下,使用的 Doctrine DBAL 连接是 default
。您可以配置默认连接。将此配置添加到您的 Symfony 配置中
code_rhapsodie_dataflow: dbal_default_connection: test #Name of the default connection used by Dataflow bundle
默认情况下,将使用 logger
服务记录所有异常和自定义消息。如果您想使用另一个记录器,如特定的 Monolog 处理器,请添加此配置
code_rhapsodie_dataflow: default_logger: monolog.logger.custom #Service ID of the logger you want Dataflow to use
消息传递模式
如果可用,Dataflow 可以将作业执行委托给 Symfony 消息传递组件。这允许作业由工作者并发执行而不是顺序执行。
要启用消息传递模式
code_rhapsodie_dataflow: messenger_mode: enabled: true # bus: 'messenger.default_bus' #Service ID of the bus you want Dataflow to use, if not the default one
您还需要将 Dataflow 消息路由到适当的传输
# config/packages/messenger.yaml framework: messenger: transports: async: '%env(MESSENGER_TRANSPORT_DSN)%' routing: CodeRhapsodie\DataflowBundle\MessengerMode\JobMessage: async
定义数据流类型
此包使用固定和简单的流程结构,以便让您专注于数据流的数据处理逻辑部分。
数据流类型定义了您数据流的不同部分。数据流由
- 恰好一个 读取器
- 任意数量的步骤
- 一个或多个编写者
可以使用选项配置数据流类型。
数据流类型必须实现CodeRhapsodie\DataflowBundle\DataflowType\DataflowTypeInterface
。
为了帮助创建数据流类型,提供了一个抽象类CodeRhapsodie\DataflowBundle\DataflowType\AbstractDataflowType
,允许您通过方便的构建器CodeRhapsodie\DataflowBundle\DataflowType\DataflowBuilder
定义数据流。
以下是一个定义一个类DataflowType的示例
<?php namespace CodeRhapsodie\DataflowExemple\DataflowType; use CodeRhapsodie\DataflowBundle\DataflowType\AbstractDataflowType; use CodeRhapsodie\DataflowBundle\DataflowType\DataflowBuilder; use CodeRhapsodie\DataflowExemple\Reader\FileReader; use CodeRhapsodie\DataflowExemple\Writer\FileWriter; class MyFirstDataflowType extends AbstractDataflowType { private $myReader; private $myWriter; public function __construct(FileReader $myReader, FileWriter $myWriter) { $this->myReader = $myReader; $this->myWriter = $myWriter; } protected function buildDataflow(DataflowBuilder $builder, array $options): void { $this->myWriter->setDestinationFilePath($options['to-file']); $builder ->setReader($this->myReader->read($options['from-file'])) ->addStep(function ($data) use ($options) { // TODO : Write your code here... return $data; }) ->addWriter($this->myWriter) ; } protected function configureOptions(OptionsResolver $optionsResolver): void { $optionsResolver->setDefaults(['to-file' => '/tmp/dataflow.csv', 'from-file' => null]); $optionsResolver->setRequired('from-file'); } public function getLabel(): string { return 'My First Dataflow'; } public function getAliases(): iterable { return ['mfd']; } }
数据流类型必须带有标签coderhapsodie.dataflow.type
。
如果您正在使用Symfony自动配置服务,此标签将自动添加到实现DataflowTypeInterface
的所有服务中。
否则,请手动在数据流类型服务配置中添加标签coderhapsodie.dataflow.type
。
```yaml CodeRhapsodie\DataflowExemple\DataflowType\MyFirstDataflowType: tags: - { name: coderhapsodie.dataflow.type }
为您的数据流类型使用选项
AbstractDataflowType
可以帮助您为Dataflow类型定义选项。
在您的DataflowType类中添加此方法
<?php // ... use Symfony\Component\OptionsResolver\OptionsResolver; class MyFirstDataflowType extends AbstractDataflowType { // ... protected function configureOptions(OptionsResolver $optionsResolver): void { $optionsResolver->setDefaults(['to-file' => '/tmp/dataflow.csv', 'from-file' => null]); $optionsResolver->setRequired('from-file'); } }
在此配置中,选项fileName
是必需的。有关选项解析器的高级使用,请参阅Symfony文档。
对于异步管理,AbstractDataflowType
提供了两个默认选项
- loopInterval:默认为0。如果您想自定义
tick
循环的持续时间,请更新此间隔。 - emitInterval:默认为0。更新此间隔以控制读取器何时在流管道中发出新数据。
日志记录
所有异常都将被捕获并写入日志。如果您想将自定义消息添加到日志中,可以在您的读取器/步骤/编写者中注入日志记录器。如果您的DataflowType类扩展了AbstractDataflowType
,则日志记录器可通过$this->logger
访问。
<?php // ... use Symfony\Component\OptionsResolver\OptionsResolver; class MyDataflowType extends AbstractDataflowType { // ... protected function buildDataflow(DataflowBuilder $builder, array $options): void { $this->myWriter->setLogger($this->logger); } }
当使用code-rhapsodie:dataflow:run-pending
命令时,此日志记录器还将用于在数据库中保存相应的作业日志。
检查您的DataflowType是否就绪
执行此命令以检查您的DataflowType是否正确注册
$ bin/console debug:container --tag coderhapsodie.dataflow.type --show-private
结果如下
Symfony Container Public and Private Services Tagged with "coderhapsodie.dataflow.type" Tag
===========================================================================================
---------------------------------------------------------------- ----------------------------------------------------------------
Service ID Class name
---------------------------------------------------------------- ----------------------------------------------------------------
CodeRhapsodie\DataflowExemple\DataflowType\MyFirstDataflowType CodeRhapsodie\DataflowExemple\DataflowType\MyFirstDataflowType
---------------------------------------------------------------- ----------------------------------------------------------------
读取器
读取器为数据流提供导入/导出的元素。通常,元素是从外部资源(文件、数据库、web服务等)读取的。
读取器可以是任何iterable
。
返回元素的类型约束仅是它们不能是false
。
读取器可以是像以下示例这样的生成器
<?php namespace CodeRhapsodie\DataflowExemple\Reader; class FileReader { public function read(string $filename): iterable { if (!$filename) { throw new \Exception("The file name is not defined. Define it with 'setFilename' method"); } if (!$fh = fopen($filename, 'r')) { throw new \Exception("Unable to open file '".$filename."' for read."); } while (false !== ($read = fgets($fh))) { yield explode('|', trim($read)); } } }
您可以根据以下设置配置此读取器
$builder->setReader(($this->myReader)())
步骤
步骤是在元素被编写者处理之前执行的运算。通常,步骤是
- 转换器,改变元素
- 过滤器,条件性地阻止对该元素执行进一步操作
- 生成器,可以包括异步操作
步骤可以是任何可调用对象,它以元素为其参数,并返回
- 元素,可能已更改
false
,如果不应对此元素执行进一步操作
一些示例
<?php //[...] $builder->addStep(function ($item) { // Titles are changed to all caps before export $item['title'] = strtoupper($item['title']); return $item; }); // asynchronous step with 2 scale factor $builder->addStep(function ($item): \Generator { yield new \Amp\Delayed(1000); // asynchronous processing for 1 second long // Titles are changed to all caps before export $item['title'] = strtolower($item['title']); return $item; }, 2); $builder->addStep(function ($item) { // Private items are not exported if ($item['private']) { return false; } return $item; }); //[...]
注意:如果您将所有步骤都扩展到1倍因子,则可以确保异步操作的写入顺序。
编写者
编写者执行实际的导入/导出操作。
编写者必须实现CodeRhapsodie\DataflowBundle\DataflowType\Writer\WriterInterface
。由于此接口与Port\Writer
不兼容,因此提供了适配器CodeRhapsodie\DataflowBundle\DataflowType\Writer\PortWriterAdapter
。
此示例展示了如何使用预定义的PhpPort编写者
$builder->addWriter(new PortWriterAdapter(new \Port\FileWriter()));
或您自己的编写者
<?php namespace CodeRhapsodie\DataflowExemple\Writer; use CodeRhapsodie\DataFlowBundle\DataflowType\Writer\WriterInterface; class FileWriter implements WriterInterface { private $fh; /** @var string */ private $path; public function setDestinationFilePath(string $path) { $this->path = $path; } public function prepare() { if (null === $this->path) { throw new \Exception('Define the destination file name before use'); } if (!$this->fh = fopen($this->path, 'w')) { throw new \Exception('Unable to open in write mode the output file.'); } } public function write($item) { fputcsv($this->fh, $item); } public function finish() { fclose($this->fh); } }
CollectionWriter
如果您想从一个单个读取操作中写入多个项目,可以使用通用的CollectionWriter
。这个写入器会遍历它接收到的任何iterable
,并将该集合中的每个项目传递给您的处理单个项目的自定义写入器。
$builder->addWriter(new CollectionWriter($mySingleItemWriter));
DelegatorWriter
如果您想根据读取的项目调用不同的写入器,可以使用通用的DelegatorWriter
。
例如,假设我们的项目是数组,第一个条目是product
或order
。我们希望根据该值使用不同的写入器。
首先,创建您的写入器,实现DelegateWriterInterface
(此接口扩展了WriterInterface
,因此您的写入器仍然可以在不使用DelegatorWriter
的情况下使用)。
<?php namespace CodeRhapsodie\DataflowExemple\Writer; use CodeRhapsodie\DataFlowBundle\DataflowType\Writer\WriterInterface; class ProductWriter implements DelegateWriterInterface { public function supports($item): bool { return 'product' === reset($item); } public function prepare() { } public function write($item) { // Process your product } public function finish() { } }
<?php namespace CodeRhapsodie\DataflowExemple\Writer; use CodeRhapsodie\DataFlowBundle\DataflowType\Writer\WriterInterface; class OrderWriter implements DelegateWriterInterface { public function supports($item): bool { return 'order' === reset($item); } public function prepare() { } public function write($item) { // Process your order } public function finish() { } }
然后,配置您的DelegatorWriter
并将其添加到您的数据流类型中。
protected function buildDataflow(DataflowBuilder $builder, array $options): void { // Snip add reader and steps $delegatorWriter = new DelegatorWriter(); $delegatorWriter->addDelegate(new ProductWriter()); $delegatorWriter->addDelegate(new OrderWriter()); $builder->addWriter($delegatorWriter); }
在执行期间,DelegatorWriter
将简单地将其接收到的每个项目传递给其第一个支持它的代理(按照添加的顺序)。如果没有代理支持项目,将抛出异常。
队列
所有待处理的数据流作业过程都存储在数据库中的队列中。
将此命令添加到您的crontab以执行所有队列中的作业
$ SYMFONY_ENV=prod php bin/console code-rhapsodie:dataflow:run-pending
命令
提供了一些命令来管理计划并运行作业。
code-rhapsodie:dataflow:run-pending
根据它们的计划执行队列中的作业。
当启用信使模式时,作业将根据它们的计划创建,但执行将由信使组件处理。
code-rhapsodie:dataflow:schedule:list
显示计划的数据流列表。
code-rhapsodie:dataflow:schedule:change-status
启用或禁用计划的数据流
code-rhapsodie:dataflow:schedule:add
为数据流添加计划。
code-rhapsodie:dataflow:job:show
显示作业的最后结果。
code-rhapsodie:dataflow:execute
允许您执行一个数据流作业。
code-rhapsodie:dataflow:dump-schema
生成创建/更新SQL查询的架构
与多个数据库一起工作
所有命令都有一个--connection
选项,用于在执行期间定义要使用的Doctrine DBAL连接。
示例
此命令使用default
DBAL连接生成所有架构更新查询。
$ bin/console code-rhapsodie:dataflow:dump-schema --update --connection=default
要执行特定连接的所有挂起作业,请使用
# Run for dataflow DBAL connection $ bin/console code-rhapsodie:dataflow:run-pending --connection=dataflow # Run for default DBAL connection $ bin/console code-rhapsodie:dataflow:run-pending --connection=default
问题和功能请求
请在https://github.com/code-rhapsodie/dataflow-bundle/issues处报告问题和请求功能。
请注意,仅支持此包3.x和4.x版本的最新发布版。
贡献
欢迎贡献。请参阅CONTRIBUTING.md以获取详细信息。感谢所有已做出贡献的人。
许可证
此软件包根据MIT许可证授权。