clue/reactphp-flux

Flux,一个轻量级的流处理器,可以同时并发处理许多(但不是太多)事情,建立在ReactPHP之上。

v1.4.0 2023-04-21 11:40 UTC

This package is auto-updated.

Last update: 2024-09-04 18:28:34 UTC


README

CI status code coverage installs on Packagist

Flux,一个轻量级的流处理器,可以同时并发处理许多(但不是太多)事情,建立在 ReactPHP 之上。

假设你有一个包含大量用户或产品的列表,你希望逐个通过发送(RESTful)HTTP API请求到第三方API来处理每条记录。如果估计每个调用需要大约 0.3s,那么如果按顺序处理 10000 个用户,你将不得不等待大约 50 分钟才能完成所有工作。这对于少量操作来说效果很好,但将成千上万的工作同时保存在内存中可能会轻易耗尽你的资源。相反,你可以使用这个库将任意大小的输入列表作为单独的记录流到一个非阻塞(异步)转换处理器。它使用 ReactPHP 来使你能够同时处理多个记录。你可以控制并发限制,允许同时处理 10 个操作,因此你可以将这个大输入列表处理得大约快 10 倍,同时你也不再受列表中记录数量的限制(想想处理数百万条记录)。这个库提供了一个简单易用的API,以便在不触及大多数底层细节的情况下管理任何类型的异步操作。你可以用它来限制多个HTTP请求、数据库查询或几乎所有已使用Promise的API。

  • 异步执行操作 - 选择一次应处理多少异步操作(并发)。在收到响应后立即处理它们的结果。基于Promise的设计提供了一个合理的接口来处理无序结果。
  • 标准接口 - 通过实现ReactPHP的标准 promisesstreaming interfaces,允许轻松集成现有的高级组件。
  • 轻量级,SOLID设计 - 提供了一个薄抽象层,它恰到好处,不会阻碍你。它建立在经过良好测试的组件和已建立的概念之上,而不是重新发明轮子。
  • 良好的测试覆盖率 - 附带一个 自动测试套件,并在实际环境中定期测试。

目录

支持我们

我们在开发、维护和更新我们出色的开源项目上投入了大量的时间。您可以通过成为GitHub上的赞助者来帮助我们保持工作的高质量。赞助者将获得许多回报,有关详情请参阅我们的赞助页面

让我们共同将这些项目提升到新的水平!🚀

快速入门示例

安装后,您可以使用以下代码通过为每个用户记录发送(RESTful)HTTP API请求来处理示例用户列表

<?php

require __DIR__ . '/vendor/autoload.php';

$browser = new React\Http\Browser();

$concurrency = isset($argv[1]) ? $argv[1] : 3;

// each job should use the browser to GET a certain URL
// limit number of concurrent jobs here
$transformer = new Clue\React\Flux\Transformer($concurrency, function ($user) use ($browser) {
    // skip users that do not have an IP address listed
    if (!isset($user['ip'])) {
        return React\Promise\resolve($user);
    }

    // look up country for this IP
    return $browser->get("https://ipapi.co/$user[ip]/country_name/")->then(
        function (Psr\Http\Message\ResponseInterface $response) use ($user) {
            // response successfully received
            // add country to user array and return updated user
            $user['country'] = (string)$response->getBody();

            return $user;
        }
    );
});

// load a huge number of users to process from NDJSON file
$input = new Clue\React\NDJson\Decoder(
    new React\Stream\ReadableResourceStream(
        fopen(__DIR__ . '/users.ndjson', 'r')
    ),
    true
);

// process all users by piping through transformer
$input->pipe($transformer);

// log transformed output results
$transformer->on('data', function ($user) {
    echo $user['name'] . ' is from ' . $user['country'] . PHP_EOL;
});
$transformer->on('end', function () {
    echo '[DONE]' . PHP_EOL;
});
$transformer->on('error', function (Exception $e) {
    echo 'Error: ' . $e->getMessage() . PHP_EOL;
});

请参阅示例

通过更改$concurrency参数,您可以看到在不并发处理此列表时大约需要4s,而使用5的并发设置只需大约1s(当然,实际效果可能会有所不同)。

使用方法

转换器

Transformer将所有输入数据通过其转换处理程序传递,并转发结果输出数据。

它使用ReactPHP的标准流接口,这些接口允许在不将所有内容一次性存储在内存中的情况下处理大量输入,并允许您高效地分块处理其输入。您写入此流的任何数据都将通过其转换处理程序传递,该处理程序负责处理和转换这些数据,并负责管理流吞吐量和背压。

转换处理程序可以是任何非阻塞(异步)可调用函数,它使用承诺来表示其最终结果。该可调用函数接收一个数据参数,作为传递给可写端的数据,并必须返回一个承诺。成功的履行值将转发到流的可读端,而失败的反驳值将触发一个error事件,然后关闭流。

可以使用new Transformer(int $concurrency, callable $handler)调用创建一个新的转换器实例。您可以根据需要创建任意数量的转换流,例如当您想要对不同类型的流应用不同的转换时。

$concurrency参数设置了一个新的软限制,用于最大并发处理作业数。找到一个合适的并发限制取决于您的特定用例。通常会将并发限制在一个相对较小的值,因为同时做超过十几件事情可能会轻易压垮接收方。使用1的值将确保所有作业依次处理,从而有效地创建一个“瀑布”作业流。使用小于1的值将抛出InvalidArgumentException异常。

// handle up to 10 jobs concurrently
$transformer = new Transformer(10, $handler);
// handle each job after another without concurrency (waterfall)
$transformer = new Transformer(1, $handler);

$handler参数必须是一个有效的可调用函数,它接受您的作业参数(从其可写端的数据),调用适当的操作,并返回一个承诺作为其未来结果的占位符(该结果将在其可读端提供)。

// using a Closure as handler is usually recommended
$transformer = new Transformer(10, function ($url) use ($browser) {
    return $browser->get($url);
});
// accepts any callable, so PHP's array notation is also supported
$transformer = new Transformer(10, array($browser, 'get'));

继续阅读有关承诺的更多信息。

Promise

此库假设您想同时处理使用基于承诺的API的异步操作。您可以使用此功能并发运行多个HTTP请求、数据库查询或几乎所有使用承诺的API。

出于演示目的,本文档中的示例使用ReactPHP的异步HTTP客户端。其API可以按如下方式使用

$browser = new React\Http\Browser();

$promise = $browser->get($url);

如果您将此包裹在上述给定的Transformer实例中,则此代码将看起来像这样

$browser = new React\Http\Browser();

$transformer = new Transformer(10, function ($url) use ($browser) {
    return $browser->get($url);
});

$transformer->write($url);

《$transformer》实例是一个《WritableStream》接口,因此使用《write($data)》写入它实际上会被转发为《$browser->get($data)》,正如《$handler》参数所提供的(关于这一点,请参阅下一节关于《streaming》的说明)。

每个操作都期望是异步(非阻塞)的,因此您可以实际并发调用多个处理器(并行发送多个请求)。《$handler》负责对每个请求返回一个解析值,顺序不保证。这些操作使用基于《Promise》的接口,这使得在操作完成时(即成功解决或因错误被拒绝)做出反应变得很容易。

$transformer = new Transformer(10, function ($url) use ($browser) {
    $promise = $browser->get($url);

    return $promise->then(
        function ($response) {
            var_dump('Result received', $result);

            return json_decode($response->getBody());
        },
        function (Exception $e) {
            echo 'Error: ' . $e->getMessage() . PHP_EOL;

            throw $error;
        }
    );
);

每个操作可能需要一些时间才能完成,但由于其异步性质,您可以实际启动任意数量的(排队)操作。一旦达到并发限制,此调用将简单地排队,并将此流通知写入方暂停写入,从而有效地限制写入方(反压)。一旦另一个操作完成,它将自动启动下一个操作,并通知写入方可以恢复写入。这意味着这是完全透明的处理,您无需担心并发限制。

此示例期望输入URI字符串,发送简单的HTTP GET请求,并返回JSON解码的HTTP响应体。您可以将满足值转换为在流的读取端可用的任何内容。可以使用类似逻辑来过滤输入流,例如跳过某些输入值或通过返回被拒绝的承诺来拒绝它。相应地,返回被拒绝的承诺(相当于抛出《Exception》)将导致《error》事件尝试取消所有挂起操作,然后关闭流。

超时

默认情况下,此库不限制单个操作可以持续多长时间,因此转换处理器可能长时间挂起。许多用例涉及某种“超时”逻辑,以便在达到某个阈值后取消操作。

您可以使用《react/promise-timer》,它通过简单的API帮助处理这一点。

应用超时后的代码看起来像这样

use React\Promise\Timer;

$transformer = new Transformer(10, function ($uri) use ($browser) {
    return Timer\timeout($browser->get($uri), 2.0);
});

$transformer->write($uri);

生成的流可以像往常一样消费,上述代码将确保此操作的执行时间不超过指定的超时时间(即实际启动后)。

有关更多详细信息,请参阅《react/promise-timer》。

流式处理

《Transformer》实现了《DuplexStreamInterface》,因此允许您向其可写输入端写入,并从其可读输出端消费。您写入此流的任何数据都将通过其转换处理器传递,该处理器负责处理和转换这些数据(有关更多详细信息,请参阅上文)。

《Transformer》负责将您通过其可写端传递的数据传递到转换处理器参数,并将结果数据转发到其可读端。每个操作可能需要一些时间才能完成,但由于其异步性质,您可以实际启动任意数量的(排队)操作。一旦达到并发限制,此调用将简单地排队,并将此流通知写入方暂停写入,从而有效地限制写入方(反压)。一旦另一个操作完成,它将自动启动下一个操作,并通知写入方可以恢复写入。这意味着这是完全透明的处理,您无需担心并发限制。

以下示例使用如上所示的异步(非阻塞)转换处理器

$browser = new React\Http\Browser();

$transformer = new Transformer(10, function ($url) use ($browser) {
    return $browser->get($url);
});

可以使用write(mixed $data): bool方法通过转换处理器转换数据,如下所示

$transformer->on('data', function (ResponseInterface $response) {
    var_dump($response);
});

$transformer->write('http://example.com/');

此处理器接收一个数据参数,作为传递给可写端的参数,并必须返回一个promise。成功的履行值将被转发到流的可读端,而不成功的拒绝值将触发一个error事件,尝试cancel()所有挂起的操作,然后close()流。

请注意,此类对任何数据类型都没有假设。无论写入什么,都将由转换处理器进行处理。转换处理器产生的任何内容都将转发到其可读端。

end(mixed $data = null): void方法可以在所有转换处理器完成后软关闭流。它将关闭可写端,等待所有挂起的转换处理器完成,然后触发一个end事件,然后close()流。您可以可选地传递一个(非null)的$data参数,它将被像write($data)调用立即后跟一个end()调用一样处理。

$transformer->on('data', function (ResponseInterface $response) {
    var_dump($response);
});
$transformer->on('end', function () {
    echo '[DONE]' . PHP_EOL;
});

$transformer->end('http://example.com/');

close(): void方法可以用来强制关闭流。它将尝试cancel()所有挂起的转换处理器,然后立即关闭流并触发一个close事件。

$transformer->on('data', $this->expectCallableNever());
$transformer->on('close', function () {
    echo '[CLOSED]' . PHP_EOL;
});

$transformer->write('http://example.com/');
$transformer->close();

pipe(WritableStreamInterface $dest): WritableStreamInterface方法可以用来将输入流转发到转换器,并将生成的输出流转发到另一个流。

$source->pipe($transformer)->pipe($dest);

这种管道上下文特别强大,因为它将自动节流传入的源流,并在转换处理器完成之前等待(背压)。任何额外的数据事件都将排队在内存中,并在适当的时候恢复。这样,它允许您限制同时处理的操作数量。

因为流是ReactPHP的核心抽象之一,所以为许多不同的用例提供了大量的流实现。例如,这允许您使用以下伪代码发送压缩NDJSON文件中每个JSON对象的HTTP请求

$transformer = new Transformer(10, function ($data) use ($http) {
    return $http->post('https://example.com/?id=' . $data['id'])->then(
        function ($response) use ($data) {
            return array('done' => $data['id']);
        }
    );
});

$source->pipe($gunzip)->pipe($ndjson)->pipe($transformer)->pipe($dest);

$transformer->on('error', function (Exception $e) {
    echo 'Error: ' . $e->getMessage() . PHP_EOL;
});

请注意,转换处理器可能会返回一个被拒绝的promise。在这种情况下,流将触发一个error事件,然后close()流。如果您不希望在这种情况下结束流,您必须显式处理任何被拒绝的promises,并返回一些占位符值,例如如下所示

$uploader = new Transformer(10, function ($data) use ($http) {
    return $http->post('https://example.com/?id=' . $data['id'])->then(
        function ($response) use ($data) {
            return array('done' => $data['id']);
        },
        function ($error) use ($data) {
            // HTTP request failed => return dummy indicator
            return array(
                'failed' => $data['id'],
                'reason' => $error->getMessage()
            );
        }
    );
});

all()

静态方法all(ReadableStreamInterface $input, int $concurrency, callable $handler): PromiseInterface<int,Exception>可以用来并发处理输入流中的所有作业通过给定的$handler

这是一个便利方法,它使用内部Transformer来安排输入流中的所有作业,同时限制并发,确保一次不会运行超过$concurrency作业。它将返回一个promise,在成功时解析为所有成功的作业总数。

$browser = new React\Http\Browser();

$promise = Transformer::all($input, 3, function ($data) use ($browser, $url) {
    return $browser->post($url, [], json_encode($data));
});

$promise->then(function ($count) {
    echo 'All ' . $count . ' jobs successful!' . PHP_EOL;
}, function (Exception $e) {
    echo 'Error: ' . $e->getMessage() . PHP_EOL;
});

如果任何作业失败,它将拒绝产生的promise,关闭输入流,并尝试取消所有挂起的作业。在挂起的promise上调用cancel()将关闭输入流并尝试取消所有挂起的作业。类似地,如果$input流触发一个error事件,它将拒绝产生的promise并尝试取消所有挂起的作业。

参数$input必须是一个ReadableStreamInterface,它为每个待处理作业发出一个data事件。每个元素将被传递到$handler中以启动一个作业。每个作业的完成值将被忽略,因此为了最佳性能,建议不要返回任何过度的数据结构。当流发出endclose事件时,该方法将等待所有未完成的作业完成,然后以成功作业的数量解析。如果此流已关闭或未发出任何data事件,则该方法将解析为0值,而不会处理任何作业。

$input = new ThroughStream();

$promise = Transformer::all($input, 2, $handler);

$input->write('a');
$input->write('b');
$input->write('c');
$input->end();

由于流是ReactPHP的核心抽象之一,因此有许多不同用例的流实现可用。例如,这允许您使用clue/reactphp-ndjsonclue/reactphp-csv来处理大量结构化输入数据。有关更多详细信息,请参阅

参数$concurrency设置了一个新的软限制,以处理并发作业的最大数量。找到合适的并发限制取决于您的特定用例。通常将并发限制设置得相对较小,因为同时做超过十几件事情可能会轻易压倒接收端。使用1值将确保所有作业依次处理,从而有效地创建一个“瀑布”式的作业流。使用小于1的值将拒绝处理任何作业,并抛出InvalidArgumentException

// handle up to 10 jobs concurrently
$promise = Transformer::all($stream, 10, $handler);
// handle each job after another without concurrency (waterfall)
$promise = Transformer::all($stream, 1, $handler);

参数$handler必须是一个有效的可调用对象,它接受您的作业参数(来自$input流的 数据),调用适当的操作,并以占位符形式返回Promise表示其未来的结果。每个作业的完成值将被忽略,因此为了最佳性能,建议不要返回任何过度的数据结构。如果给定的参数不是有效的可调用对象,则该方法将抛出InvalidArgumentException而不会处理任何作业。

// using a Closure as handler is usually recommended
$promise = Transformer::all($stream, 10, function ($url) use ($browser) {
    return $browser->get($url);
});
// accepts any callable, so PHP's array notation is also supported
$promise = Transformer::all($stream, 10, array($browser, 'get'));

请注意,此方法仅在所有操作都成功的情况下才返回一个解析为成功操作总数量的Promise。这主要是一个方便的方法,它使用底层的Transformer。如果您的输入数据足够小(几十或几百个操作),可以放入内存中,您可能希望使用clue/reactphp-mq而不是流方法,并保留所有操作在内存中。

any()

静态方法any(ReadableStreamInterface $input, int $concurrency, callable $handler): PromiseInterface<mixed,Exception>可用于通过给定的$handler并发处理输入流中的某些作业。

这是一个方便的方法,它使用Transformer内部调度输入流中的作业,同时限制并发性,确保一次最多只运行$concurrency个作业。它将返回一个Promise,它在成功时解析为第一个成功的解析值。

$browser = new React\Http\Browser();

$promise = Transformer::any($input, 3, function ($data) use ($browser, $url) {
    return $browser->post($url, [], json_encode($data));
});

$promise->then(function (ResponseInterface $response) {
    echo 'First successful job: ' . $response->getBody() . PHP_EOL;
}, function (Exception $e) {
    echo 'Error: ' . $e->getMessage() . PHP_EOL;
});

如果第一个作业成功,它将使用其解析值解析结果Promise,关闭输入流,并尝试取消所有其他未完成的作业。

如果任一作业失败,它将保持挂起状态,并等待其他作业之一成功。如果所有作业都失败,它将拒绝结果Promise。在挂起的Promise上调用cancel()将关闭输入流,并尝试取消所有未完成的作业。类似地,如果$input流发出一个error事件,它将拒绝结果Promise,并尝试取消所有未完成的作业。

输入参数 $input 必须是一个 ReadableStreamInterface,为每个待处理的工作项触发一个 data 事件。每个元素将被传递到 $handler 以启动一个工作项。第一个成功工作项的完成值将用于满足返回的 Promise。当流触发 endclose 事件时,此方法将等待所有未完成的工作项完成,然后相应地解析或拒绝。如果此流已经关闭或未触发任何 data 事件,则此方法将不处理任何工作项,并以 UnderflowException 拒绝。

$input = new ThroughStream();

$promise = Transformer::any($input, 2, $handler);

$input->write('a');
$input->write('b');
$input->write('c');
$input->end();

由于流是ReactPHP的核心抽象之一,因此有许多不同用例的流实现可用。例如,这允许您使用clue/reactphp-ndjsonclue/reactphp-csv来处理大量结构化输入数据。有关更多详细信息,请参阅

参数$concurrency设置了一个新的软限制,以处理并发作业的最大数量。找到合适的并发限制取决于您的特定用例。通常将并发限制设置得相对较小,因为同时做超过十几件事情可能会轻易压倒接收端。使用1值将确保所有作业依次处理,从而有效地创建一个“瀑布”式的作业流。使用小于1的值将拒绝处理任何作业,并抛出InvalidArgumentException

// handle up to 10 jobs concurrently
$promise = Transformer::any($stream, 10, $handler);
// handle each job after another without concurrency (waterfall)
$promise = Transformer::any($stream, 1, $handler);

$handler 参数必须是一个有效的可调用对象,它接受您的作业参数(来自 $input 流的数据),调用适当的操作,并返回一个 Promise 作为其未来结果的占位符。第一个成功工作项的完成值将用于满足返回的 Promise。如果提供的参数不是一个有效的可调用对象,则此方法将不处理任何工作项,并以 InvalidArgumentException 拒绝。

// using a Closure as handler is usually recommended
$promise = Transformer::any($stream, 10, function ($url) use ($browser) {
    return $browser->get($url);
});
// accepts any callable, so PHP's array notation is also supported
$promise = Transformer::any($stream, 10, array($browser, 'get'));

请注意,此方法仅当任何操作成功时才返回一个解析为第一个成功解析值的 Promise。这主要是一个方便的方法,它底层使用 Transformer。如果您的输入数据足够小,可以放入内存(几十或几百个操作),您可能希望使用 clue/reactphp-mq 而不是使用流方法,并将所有操作保留在内存中。

安装

推荐通过 Composer 安装此库。您是 Composer 新手吗?

此项目遵循 SemVer。这将安装最新的支持版本。

composer require clue/reactphp-flux:^1.4

有关版本升级的详细信息,请参阅 CHANGELOG

此项目旨在在任何平台上运行,因此不需要任何 PHP 扩展,并支持在当前的 PHP 8+ 和 HHVM 以及旧版 PHP 5.3 上运行。强烈建议使用此项目支持的最新 PHP 版本

测试

要运行测试套件,您首先需要克隆此存储库,然后通过 Composer 安装所有依赖项

composer install

要运行测试套件,请转到项目根目录并运行

vendor/bin/phpunit

测试套件已设置,始终确保在所有支持的环境中实现 100% 代码覆盖率。如果您已安装 Xdebug 扩展,您还可以本地生成代码覆盖率报告,如下所示

XDEBUG_MODE=coverage vendor/bin/phpunit --coverage-text

许可证

此项目在 MIT 许可证 下发布。

您知道吗?我提供定制开发服务,并为发布赞助和贡献发布发票。请联系我(@clue)以获取详细信息。

更多

  • 如果您想了解更多关于处理数据流的信息,请参阅底层 react/stream 组件的文档。

  • 如果您只想处理几十或几百个操作,您可能希望使用 clue/reactphp-mq 而不是使用流方法,并将所有操作保留在内存中。

  • 如果您想处理结构化的 NDJSON 文件(.ndjson 扩展名),您可能希望在将解码后的流传递到转换器之前,在输入流上使用 clue/reactphp-ndjson

  • 如果您想处理压缩的 GZIP 文件(.gz 扩展名),您可能希望在将解压缩后的流传递到解码器(如 NDJSON)之前,在压缩的输入流上使用 clue/reactphp-zlib