flow-php/etl-adapter-amphp

该包已被废弃,不再维护。未建议替代包。

PHP ETL - 基于 amphp 的异步适配器

0.4.0 2023-10-13 09:59 UTC

This package is auto-updated.

Last update: 2023-12-05 13:08:08 UTC


README

Flow PHP 的 Adapter AmpHP 是一个精心制作的库,旨在将 AmpHP 的异步功能集成到您的 ETL(提取、转换、加载)工作流程中。此适配器对于希望执行非阻塞数据操作的开发商来说至关重要,从而优化数据转换工作流程的性能和响应能力。通过利用 Adapter AmpHP 库,开发商可以访问一套为精确异步数据操作而设计的强大功能,简化复杂的数据转换,同时提高操作效率。Adapter AmpHP 库封装了一系列功能,提供了一种简化的 API,用于管理异步任务,这在现代数据处理和转换领域是必不可少的。这个库反映了 Flow PHP 在提供灵活和高效数据处理解决方案方面的承诺,使其成为处理大规模和数据密集型环境中异步操作的开发商的首选。使用 Flow PHP 的 Adapter AmpHP,将异步数据处理集成到您的 ETL 工作流程中变得无缝且高效,与 Flow PHP 生态系统的强大且适应性强的框架和谐一致。

以下通信协议受支持

  • TCP/IP(仅本地) - 127.0.0.1:6651
  • Unix 域套接字 - uinx:///var/run/etl.sock

安装

composer require flow-php/etl-adapter-amphp:1.x@dev

示例用法

<?php

use function Flow\ETL\DSL\concat;
use function Flow\ETL\DSL\lit;
use function Flow\ETL\DSL\ref;
use Flow\ETL\Adapter\CSV\League\CSVExtractor;
use Flow\ETL\Adapter\Doctrine\DbalLoader;
use Flow\ETL\Monitoring\Memory\Consumption;
use Flow\ETL\Pipeline\LocalSocketPipeline;
use Flow\ETL\Async\ReactPHP\Worker\ChildProcessLauncher;
use Flow\ETL\Async\ReactPHP\Server\SocketServer;
use Flow\ETL\DSL\Transform;
use Flow\ETL\Flow;
use Monolog\Handler\StreamHandler;
use Monolog\Logger;
use Psr\Log\LogLevel;

require __DIR__ . '/vendor/autoload.php';

$logger = new Logger('server');
$logger->pushHandler(new StreamHandler("php://stdout", LogLevel::DEBUG, false));
$logger->pushHandler(new StreamHandler("php://stderr", LogLevel::ERROR, false));

(new Flow)
    ->read(new CSVExtractor(
        $path = __DIR__ . '/data/dataset.csv',
        10_000,
        0
    ))
    ->pipeline(
        new LocalSocketPipeline(
            SocketServer::unixDomain(__DIR__ . "/var/run/", $logger),
            new ChildProcessLauncher(__DIR__ . "/vendor/bin/worker-amp", $logger),
            $workers = 8
        )
    )
    ->withEntry('id', ref('id')->cast('int'))
    ->withEntry('name', concat(ref('name'), lit(' '), ref('last name')))
    ->drop('last_name')
    ->load(new DbalLoader($tableName, $dbConnectionParams))
    ->run();

此适配器自带内置的 worker CLI 应用程序,但您也可以自由创建自定义应用程序。对工作的自定义将允许您调整记录器或自动加载器。