fab2s / yaetl
广泛扩展的节点提取-转换-加载 ETL 工作流程,也称为 NEJQTL 或 节点提取-连接-资格-转换-加载
Requires
- php: ^7.2|^8.0
- fab2s/nodalflow: ^2.0.5
- fab2s/opinhelpers: ^1.0
Requires (Dev)
- ext-pdo: *
- friendsofphp/php-cs-fixer: ^3.0|^2.0
- orchestra/testbench: ^7.0|^6.0|^5.0|^4.0
- phpunit/phpunit: ^9.0|^8.0|^7.5
- symfony/console: ^6.0|^5.0|^4.0|^3.4|^2.8
- symfony/event-dispatcher: ^6.0|^5.0|^4.0|^3.4|^2.8
Suggests
- laravel/laravel: To use Laravel (The awesome) implementations
- symfony/console: To use ProgressBarSubscriber
README
YaEtl
(“Yay'TL”,或 YetAnotherEtl)是基于 NodalFlow 的 PHP 实现的提取-转换-加载(ETL)工作流程。ETL 工作流程在遇到大量记录和不同来源、格式和存储库的情况下非常有用。《YaEtl》扩展了此模式,允许您通过一些额外的功能(如连接和资格)链式执行任意数量的 E-T-L 操作。YaEtl 可以仅进行提取和加载,不涉及转换,甚至可以仅进行加载或转换。如果我们给 YaEtl
背后的工作流程起个缩写,它可以结果是 NEJQTL,即 节点提取-连接-资格-转换-加载 工作流程。
NodalFlow 是 YaEtl 所基于的底层和更通用的可执行有向图的实现。有向图由节点组成,这些节点可执行,接受一个参数,可能返回一个值,该值将用作下一个节点的参数;否则,将传递未修改的参数给下一个节点,直到达到流的执行参数(如果有的话)。节点还可以是可遍历的(数据生成器等),在这种情况下,它们将迭代流中的每个值,直到用完。当一个节点被“遍历”时,每个产生的值将触发后续节点的执行,这些节点是否带有产生的值作为参数,取决于可遍历节点的属性。这些有向图中的每个都可以由进程中的任何其他实例调用,也可以由每个节点和任何节点位置调用,这实际上可以将任何此类图集变成可执行的节点网络。
这种设计的最大优点是,除了轻松组织复杂任务外,还可以创建可重用和原子的任务。工作流程中的每个节点都可以在任意其他工作流程中重用,就像它现在这样。实际上,随着时间的推移和代码库的增长,这将节省大量时间。
节点化使其能够链式执行任意数量的提取到加载操作,这些操作可能经过任意数量的转换、连接,甚至可以分支工作流程,以便某些加载器在执行其工作之前需要不同的转换和/或连接。
YaEtl 文档
文档可以在 ReadTheDocs 找到。检查 NodalFlow 文档 也是一个好主意,特别是有关那些可以直接在
YaEtl
中使用的核心功能,如 中断、序列化 或 sendTo()
方法,该方法允许您将您的流程转换为可执行的流程和节点网络。
安装
可以使用composer安装YaEtl
composer require "fab2s/yaetl"
如果您想专门安装php >=7.2.0版本,请使用以下命令:
composer require "fab2s/yaetl" ^2
如果您想专门安装php 5.6/7.1版本,请使用以下命令:
composer require "fab2s/yaetl" ^1
完成以上步骤后,您就可以开始使用了
$yaEtl = new YaEtl; $yaEtl->from($extractor = new Extractor) -> transform(new Transformer) ->to(new Loader) ->exec(); // forgot something ? // continuing with the same object $yaEtl->transform(new AnotherTransformer) ->to(new CsvLoader) ->transform(new SuperCoolTransformer) ->to(new S3Loader) // never too cautious ->to(new FlatLogLoader) // Flows are repeatable ->exec(); // oh but what if ... $yaEtl->branch( (new YaEtl)->transform(new SwaggyTransformer) // Enrich $extractor's records ->join($extractor, new HypeJoiner($pdo, $query, new OnClose('upstreamFieldName', 'joinerFieldName', function($upstreamRecord, $joinerRecord) { return array_replace($joinerRecord, $upstreamRecord); }))) ->transform(new PutItAllTogetherTransformer) ->to(new SuperSpecializedLoader) )->exec(); // or another branch for a subset of the extraction $yaEtl->branch( (new YaEtl)->qualify(new CallableQualifier(function($record) { return !empty($record['is_great']); }) ->transform(new GreatTransformer) ->to(new GreatLoader) )->exec(); // need a Progress Bar ? $progressSubscriber = new ProgressBarSubscriber($yaEtl); // with count ? $progressSubscriber->setNumRecords($count);
使用模式
YaEtl
可以轻松应对多种通用用例,其中一些可能需要更专业或复杂的编码。
纯ETL
YaEtl
可以,但不限于,实现纯ETL流程,这在很多情况下都能看到。值得注意的是,YaEtl
设计上支持批量的提取和加载。
+---------+ +---------+
| | | |
| | | |
| data | | data |
| source | | storage |
| | | |
| | | |
| | | |
++-+-+-+-++ +-^-^-^-^-+
| + + + | + + + +
|Records| Records
| + + + | + + + +
+v-v-v-v-v--+ +-+-+-+-++
| Extract | | Load |
+-+---------+ +---^----+
| |
| Record +-----------+ Record |
+---------> Transform +---------+
+-----------+
共享提取
作为节点式架构,YaEtl
可以透明地跨多个用例共享提取操作,最终甚至可能不需要加载。
+-------------+
| |
| |
| slow http |
| data source |
| |
| |
+-----+-------+
|
| +----------+
+-----v-------+ +--> Loader 1 |
| Extractor | | +----------+
+-+-----------+ |
| | +----------+
| Record 1 +-------------+ +--> Loader 2 |
+------------> Transformer +--+ +----------+
| +-------------+ | ...
| Record 2 |
+------------> | +----------+
| ... +--> Loader N |
| | +----------+
| Record N |
+------------> | +-------------+
+--> Transformer |
+-+-----------+
| +----------+
+------------> Loader X |
+----------+
分类提取
由于提取器将上游返回值作为参数,因此可以链式调用提取器本身以获取分类中的项目。这有助于分离关注点,因为它使得在仍然使用专业提取器的情况下提取所有分类中的所有项目成为可能,例如一个分类和一个项目;前提是项目提取器也能在传递适当的分类对象作为参数时按分类提取项目(除非您在开始提取项目时指定了参数,否则这种情况不会发生)。
+------------+
| |
| |
| categories |
| |
| |
+-----+------+
|
|
+-----v------+
| Extractor |
++-----------+
|
| category +-----------+
+------------> Extractor |
| ++----------+
+-----------> |
... | item
+----------> ...
|
+---------->
...
分片提取
有时,可能需要在低级别从多个物理源和/或分片提取数据,即没有任何预定义和现成可用的抽象。
使用YaEtl
进行此类操作很简单,因为当构建流程时,提取器可以相互聚合。例如,您可能希望提取跨越多个源的数据,其中每个源仅保留特定的时间范围。然后,可以为每个分片实例化相同的提取器,并执行适当的排序,以最终提取所有数据,就像它们存储在单个存储库中一样。YaEtl
将随后内部按提取器添加到流程中的顺序消费每个提取器的记录,并逐个提供给剩余的节点,就像只使用了单个提取器一样。
+-------------+ +-------------+ +-------------+
| | | | | |
| shard 1 | | shard 2 | ... | shard N |
| | | | | |
+------+------+ +------+------+ +------+------+
| | |
| | |
+------------------------------------------------------------+
| | | | |
| +------v------+ +------v------+ +------v------+ |
| | | | | | | |
| | Extractor 1 | | Extractor 2 | ... | Extractor N | |
| | | | | | | |
| +------+------+ +------+------+ +-------+-----+ |
| | | + |
| | | Aggregate Node | Records
+-----------v----------------v--------------------v----------+---------->
...
连接
YaEtl
提供了所有必要的接口来实现连接操作,这与数据库管理系统的方式非常相似(常规和左连接)。在底层,这需要传递某种类型的记录映射,以便连接器知道在过程中匹配哪些记录。YaEtl
自带了一个通用的可连接提取器的PDO
实现(针对单个唯一键)。此类功能的应用场景无限,尤其是当您开始考虑上述所有模式可以完全组合和分支时。还值得注意的是,YaEtl
提取器支持按批提取记录,即使对于连接器也可能更小(例如,对于WHERE IN
查询类型的较小集合)。
+-----------+ +------------+
| | | |
| | | |
| users | | addresses |
| | | |
| | | |
+-----+-----+ +-----+------+
| |
| |
+-----v-----+ user +-----v------+ user & address
| Extractor +------> Joiner +---------------->
+-----------+ +------------+
资格
(>= 1.1.0) YaEtl
引入了QualifierInterface
,部分由QualifierAbstract
实现,并可直接使用CallableQualifier
类。资格的目标是增加流程条件(IF)和流程操作(转换和加载)之间的关注点分离,这反过来应该有助于编写更通用的转换器和加载器(它们不再需要保留所有条件),从而提高可重用性。
使用此类节点,您可以通过为每个场景实例化一个分支来共享慢速提取操作,每个分支都以一个负责根据其属性接受或拒绝记录的资格开始。
+------------------------------------------------+
+-----------+ | +----------+ |
| Extractor +----+----->Qualifier1+--->... Transform ... ---> Loader1|
+-----------+ | | +----------+ branch1 |
| +------------------------------------------------+
|
| +------------------------------------------------+
| | +----------+ |
+----->Qualifier2+--->... Transform ... ---> Loader2|
| | +----------+ branch2 |
| +------------------------------------------------+
|
| +------------------------------------------------+
| | +----------+ |
+----->QualifierN+--->... Transform ... ---> LoaderN|
| +----------+ branchN |
+------------------------------------------------+
在这个示例中,每条记录将被呈现给每个分支,每个限定符将负责在其分支中接受记录,以便其他节点可以对其执行操作。正如您所看到的,这种模式创造了大量重新使用现有节点作为下游转换器和加载器的机会,因为这些转换器和加载器不需要了解我们在限定符中选择的具体属性。这意味着您可以编写非常通用的加载器,严格负责将记录加载到某个地方,将默认值和格式化(字符集等)留给仅执行此操作的转换器,并通过仅实现包含条件逻辑的限定符在任何条件用例中重用这些转换器。
阅读限定符以了解更多关于限定的信息。
序列化
YaEtl
是可序列化的,但前提是它不携带闭包,这可能会在OnClose
对象中发生,因为PHP原生不支持闭包序列化,但有一些方法可以解决这个问题,如Opis Closure。
请参阅NodalFlow文档以了解有关序列化的更多有趣边缘案例。
要求
YaEtl
在php 7.2、7.3、7.4、8.0、8.1和8.2上进行了测试。
贡献
欢迎贡献。一种回馈的好方法是分享您在编写YaEtl
时可能编写的通用提取器(Redis、RedShift、LDAP等),因为这会直接造福每个人。在任何情况下,都不必犹豫,提出问题和提交拉取请求。
许可证
YaEtl
是开源软件,受MIT许可证许可。