flow-php/etl-adapter-reactphp

该软件包已被废弃,不再维护。没有建议的替代软件包。

PHP ETL - 基于 ReactPHP 的异步适配器

0.4.0 2023-10-13 09:59 UTC

This package is auto-updated.

Last update: 2023-12-05 13:14:34 UTC


README

Flow PHP 的 Adapter ReactPHP 是一个独特的库,精心设计以将 ReactPHP 的异步功能与 Flow PHP 的 ETL(提取、转换、加载)流程集成。此适配器对于力求执行非阻塞数据操作的开发者至关重要,从而确保数据转换工作流程中的最佳性能和响应性。通过利用 Adapter ReactPHP 库,开发者可以充分利用针对异步数据操作精心打造的强大功能集,从而简化复杂的数据转换并提高操作效率。Adapter ReactPHP 库封装了丰富的功能,提供了一套管理异步任务的流畅 API,这在现代数据处理和转换环境中至关重要。这个库体现了 Flow PHP 致力于提供灵活高效的解决方案,使其成为处理大规模和高强度数据环境的开发者的理想选择。借助 Flow PHP 的 Adapter ReactPHP,在 ETL 工作流程中采用异步数据处理变得无缝高效,与 Flow PHP 生态系统的灵活和强大框架完美契合。

支持以下通信协议

  • TCP/IP(仅本地)- 127.0.0.1:6651
  • Unix 域套接字 - uinx:///var/run/etl.sock

安装

composer require flow-php/etl-adapter-reactphp:1.x@dev

工作示例

<?php

use function Flow\ETL\DSL\concat;
use function Flow\ETL\DSL\lit;
use function Flow\ETL\DSL\ref;
use Aeon\Calendar\Stopwatch;
use Doctrine\DBAL\DriverManager;
use Doctrine\DBAL\Schema\Column;
use Doctrine\DBAL\Schema\Table;
use Doctrine\DBAL\Types\Type;
use Doctrine\DBAL\Types\Types;
use Flow\ETL\Adapter\CSV\League\CSVExtractor;
use Flow\ETL\Adapter\Doctrine\DbalLoader;
use Flow\ETL\Cache\InMemoryCache;
use Flow\ETL\Config;
use Flow\ETL\Monitoring\Memory\Consumption;
use Flow\ETL\Pipeline\LocalSocketPipeline;
use Flow\ETL\Async\ReactPHP\Worker\ChildProcessLauncher;
use Flow\ETL\Async\ReactPHP\Server\SocketServer;
use Flow\ETL\DSL\Transform;
use Flow\ETL\Flow;
use Monolog\Handler\StreamHandler;
use Monolog\Logger;
use Psr\Log\LogLevel;

require __DIR__ . '/vendor/autoload.php';

$logger = new Logger('server');
$logger->pushHandler(new StreamHandler("php://stdout", LogLevel::DEBUG, false));
$logger->pushHandler(new StreamHandler("php://stderr", LogLevel::ERROR, false));

$stopwatch = new Stopwatch();
$stopwatch->start();

$memory = new Consumption();

(new Flow)
    ->read(new CSVExtractor(
        $path = __DIR__ . '/data/dataset.csv',
        10_000,
        0
    ))
    ->pipeline(
        new LocalSocketPipeline(
            SocketServer::unixDomain(__DIR__ . "/var/run/", $logger),
            new ChildProcessLauncher(__DIR__ . "/vendor/bin/worker-reactphp", $logger),
            $workers = 8
        )
    )
    ->withEntry('id', ref('id')->cast('int'))
    ->withEntry('name', concat(ref('name'), lit(' '), ref('last name')))
    ->drop('last_name')
    ->load(new DbalLoader($tableName, $dbConnectionParams))
    ->run();

此适配器包含内置的worker CLI 应用程序,但您可以自由创建自定义应用。对工作的自定义将允许您调整记录器或自动加载器。