三脉 / pipeline
通用集合管道
Requires
- php: ^7.4 || ^8.0
Requires (Dev)
- ergebnis/composer-normalize: ^2.8
- friendsofphp/php-cs-fixer: ^3.17
- infection/infection: >=0.10.5
- league/pipeline: ^0.3 || ^1.0
- phan/phan: >=1.1
- php-coveralls/php-coveralls: ^2.4.1
- phpstan/phpstan: >=0.10
- phpunit/phpunit: >=9.4
- vimeo/psalm: >=2
- v6.11
- v6.10
- v6.9
- v6.8.1
- v6.8
- v6.7
- v6.6
- v6.5
- v6.4
- v6.3
- v6.2
- v6.1
- v6.0.1
- v6.0
- v5.2.1
- v5.2.0
- v5.1.0
- v5.0.1
- v5.0
- v4.0.2
- v4.0.1
- v4.0.0
- v3.1.4
- v3.1.3
- v3.1.2
- v3.1.1
- v3.1
- v3.0.2
- v3.0.1
- v3.0
- v2.0.5
- v2.0.4
- v2.0.3
- v2.0.2
- v2.0.1
- v2.0
- v1.5.5
- v1.5.4
- v1.5.3
- v1.5.2
- v1.5.1
- v1.5
- v0.3.6
- v0.3.5
- v0.3.4
- v0.3.3
- v0.3.2
- v0.3.1
- v0.3
- dev-pr/splice
- dev-main / 6.x-dev
This package is auto-updated.
Last update: 2024-09-15 03:49:50 UTC
README
Pipeline 是一个 PHP 库,它将流管道的力量带到你的代码中。受函数式编程语言中常见的管道操作符(|>
)的启发,Pipeline 允许你以清晰、简洁和可读的方式将一系列操作链接到你的数据上。这种方法不仅简化了复杂的数据转换,而且使代码更易于维护和测试。
Pipeline 使处理 iterable
类型变得尽可能容易,因此它是一个专门用于定制数据处理管道的完美工具,这就是其名称的由来。如果你曾经将几个 bash 命令串联起来,其中一个命令使用另一个命令的输出,这个库就是这样做的,但是是针对 PHP 函数、生成器、数组和迭代器的。
Pipeline 包含最重要的也是最基础的构建模块。它拥有对数据从任意生成器以及所有类型的标准迭代器进行映射、过滤、归约、压缩和解包的方法。
这个经过严格测试的库可以正常工作。Pipeline 既不定义也不抛出任何异常。
安装
composer require sanmai/pipeline
最新版本需要 PHP 7.4 或更高版本,包括 PHP 8.2 及以后的版本。
有一些更早的版本可以在 PHP 5.6 及以上版本下运行,但它们的功能并不完整。
使用
use function Pipeline\take; // iterable corresponds to arrays, generators, iterators // we use an array here simplicity sake $iterable = range(1, 3); // wrap the initial iterable with a pipeline $pipeline = take($iterable); // join side by side with other iterables of any type $pipeline->zip( \range(1, 3), map(function () { yield 1; yield 2; yield 3; }) ); // lazily process their elements together $pipeline->unpack(function (int $a, int $b, int $c) { return $a - $b - $c; }); // map one value into several more $pipeline->map(function ($i) { yield pow($i, 2); yield pow($i, 3); }); // simple one-to-one mapper $pipeline->cast(function ($i) { return $i - 1; }); // map into arrays $pipeline->map(function ($i) { yield [$i, 2]; yield [$i, 4]; }); // unpack array into arguments $pipeline->unpack(function ($i, $j) { yield $i * $j; }); // one way to filter $pipeline->map(function ($i) { if ($i > 50) { yield $i; } }); // this uses a filtering iterator from SPL under the hood $pipeline->filter(function ($i) { return $i > 100; }); // reduce to a single value; can be an array or any value $value = $pipeline->reduce(function ($carry, $item) { // for the sake of convenience the default reducer from the simple // pipeline does summation, just like we do here return $carry + $item; }, 0); var_dump($value); // int(104)
API 入口点
所有入口点总是返回管道的实例。
实例方法概述
Pipeline 是一个迭代器,可以用作任何其他可迭代对象。
Pipeline 可以用作 count()
的参数。实现了 Countable
接口。请注意,计数操作是一个终端操作。
通常,Pipeline 实例是可变的,这意味着每个返回 Pipeline 的方法都返回同一个 Pipeline 实例。这使我们能够在不犯明显的错误的同时,灵活地信任某人或某物向 Pipeline 实例添加处理阶段。例如,如果你添加一个处理阶段,它将保留在那里,无论你是否捕获返回值。这种特性在其他情况下可能会成为线程安全问题,但在 PHP 中这不是问题。
注意事项
-
由于大多数回调都是延迟评估的,随着更多数据的进出,你必须使用普通的
foreach
或使用reduce()
来确保处理发生。foreach ($pipeline as $result) { // Processing happens only if you consume the results. // Want to stop early after few results? Not a problem here! }
除非使用结果,否则几乎不会发生任何事情。这正是延迟评估的意义所在!
-
尽管如此,如果用于初始化管道的非生成器,它将被立即执行。
$pipeline = new \Pipeline\Standard(); $pipeline->map(function () { // will be executed immediately on the spot, unless yield is used return $this->veryExpensiveMethod(); })->filter();
在上面的情况下,管道将内部存储一个数组,并且当可能时,管道将积极地操作该数组。因此,当有疑问时,请使用生成器。
$pipeline->map(function () { // will be executed only as needed, when needed yield $this->veryExpensiveMethod(); })->filter();
-
在最佳努力的基础上保留生成的值的键,因此在使用
iterator_to_array()
处理管道时必须小心:具有重复键的值将被丢弃,并且对于给定的键,只返回最后一个值。$pipeline = \Pipeline\map(function () { yield 'foo' => 'bar'; yield 'foo' => 'baz'; }); var_dump(iterator_to_array($pipeline)); /* ['foo' => 'baz'] */
更安全的方法是使用提供的
toArray()
方法。它会返回所有值,无论使用哪些键,确保在过程中丢弃所有键。var_dump($pipeline->toArray()); /* ['bar', 'baz'] */
此方法还接受一个可选参数以保留键。
-
结果管道是一个迭代器,应该假定它不可重置,就像它使用的生成器一样。
$pipeline = \Pipeline\map(function () { yield 1; }); $sum = $pipeline->reduce(); // Won't work the second time though $pipeline->reduce(); // Exception: Cannot traverse an already closed generator
尽管在某些情况下,管道可以被重置并像常规数组一样重复使用,但用户不应对此行为做出任何假设,因为它不是API兼容性保证的一部分。
-
管道实现了
IteratorAggregate
,这与Iterator
不同。在需要后者的情况下,可以将管道包装在IteratorIterator
中。$iterator = new \IteratorIterator($pipeline); /** @var $iterator \Iterator */
-
再次遍历管道将导致未定义的行为。最好避免这样做。
类和接口:概述
\Pipeline\Standard
是管道的主要用户界面类,对于大多数方法提供了合理的默认值。
这个库是为了持久使用而构建的。没有抛出异常的地方。不要考虑任何断言。
方法
__construct()
接受 Traversable
的实例或无实例。在后一种情况下,管道必须通过将初始生成器传递给 map
方法来初始化。
$pipeline->map()
接受一个以生成器函数或纯映射函数形式存在的处理阶段。如果没有提供回调,则不执行任何操作。
$pipeline->map(function (Customer $customer) { foreach ($customer->allPayments() as $item) { yield $item; } }); // Now process all and every payment $pipeline->map(function (Payment $payment) { return $payment->amount; });
也可以接受一个初始生成器,但该生成器不得需要任何参数。
$pipeline = new \Pipeline\Standard(); $pipeline->map(function () { yield $this->foo; yield $this->bar; });
$pipeline->flatten()
展开输入
$pipeline->map(function () { yield [1]; yield [2, 3]; })->unpack()->toArray(); // [1, 2, 3]
$pipeline->unpack()
是 map
的一个额外变体,它将数组解包为回调的参数。
使用 map()
时,你会这样做:
$pipeline->map(function ($args) { list ($a, $b) = $args; // and so on });
使用 unpack()
时,这些操作会由幕后为你完成
$pipeline->map(function () { yield [-1, [10, 20], new DateTime()]; }); $pipeline->unpack(function ($a, array $b, \DateTime ...$dates) { // and so on });
你还可以轻松进行各种标准类型检查。
如果没有回调,unpack()
的默认回调将像 flatten()
一样展开输入。
$pipeline->cast()
与 map
类似工作,但不为生成器提供特殊处理。想想 array_map
。
$pipeline->cast(function (Customer $customer) { foreach ($customer->allPayments() as $item) { yield $item; } }); $pipeline->map(function (\Generator $paymentGenerator) { // Keeps grouping as per customer });
在这个例子中,map()
将填充管道的一系列付款,而 cast()
将为每个客户添加一个生成器。
$pipeline->zip()
将多个可迭代对象连接在一起,形成一个元素并排的饲料。
$pipeline = take($iterableA); $pipeline->zip($iterableB, $iterableC); $pipeline->unpack(function ($elementOfA, $elementOfB, $elementOfC) { // ... });
对于元素数量不等的迭代器,缺失的元素将保留为 null。
$pipeline->filter()
接受一个类似于 array_filter
的过滤器回调。
$pipeline->filter(function ($item) { return $item->isGood() && $item->amount > 0; });
管道具有默认回调,其效果与 array_filter
相同:它会移除所有假值。
$pipeline->slice()
接受偏移量和长度参数,其工作方式与 array_slice
非常相似,其中 $preserve_keys
设置为 true。
$pipeline->slice(1, -1);
此示例将删除序列的第一和最后一个元素。
实现使用滚动窗口缓冲区处理负偏移量和长度值,并在输入数组上回退到普通的 array_slice
。
$pipeline->reduce()
接受一个类似于 array_reduce
的减少回调,该回调有两个参数,用于上一迭代的值和当前项。作为第二个参数,它可以接受一个初始值。
$total = $pipeline->reduce(function ($curry, $item) { return $curry + $item->amount; }, 0);
管道具有默认回调,它将求和所有值。
$pipeline->toArray()
返回包含管道中所有值的数组。所有数组键都被忽略,以确保返回每个值。
// Yields [0 => 1, 1 => 2] $pipeline = map(function () { yield 1; yield 2; }); // For each value yields [0 => $i + 1, 1 => $i + 2] $pipeline->map(function ($i) { yield $i + 1; yield $i + 2; }); $result = $pipeline->toArray(); // Since keys are ignored we get: // [2, 3, 3, 4]
如果在示例中使用 iterator_to_array($result)
,他们将得到 [3, 4]
。
$pipeline->each()
使用提供的回调急切地遍历序列。
$pipeline->each(function ($i) { $this->log("Saw $i"); });
除非由第二个参数指示,否则在迭代后丢弃序列。
$pipeline->getIterator()
实现符合 Traversable
接口的方法。如果使用未经初始化的 \Pipeline\Standard
,它将返回一个空数组迭代器,本质上是一个无操作管道。因此,这应该不会出错。
$pipeline = new \Pipeline\Standard(); foreach ($pipeline as $value) { // no errors here }
这允许在没有任何结果返回的情况下跳过返回值的类型检查:而不是返回 false
或 null
,安全地返回一个未经初始化的管道。
$pipeline->runningVariance()
计算序列的在线统计信息:计数、样本均值、样本方差、标准差。您可以使用如 getCount()
、getMean()
、getVariance()
、getStandardDeviation()
等方法实时访问这些数字。
此方法还接受一个可选的强制转换回调,该回调应返回 float|null
:丢弃 null
值。因此,您可以计算来自数据不同部分的多个运行方差。
$pipeline->runningVariance($varianceForShippedOrders, static function (order $order): ?float { if (!$order->isShipped()) { // This order will be excluded from the computation. return null; } return $order->getTotal(); }); $pipeline->runningVariance($varianceForPaidOrders, static function (order $order): ?float { if ($order->isUnpaid()) { // This order will be excluded from the computation. return null; } return $order->getProjectedTotal(); });
在处理订单时,您将能够访问 $varianceForShippedOrders->getMean()
和 $varianceForPaidOrders->getMean()
。
此计算使用 Welford 的在线算法,因此可以处理非常大的数据点数量。
$pipeline->finalVariance()
一个方便的方法来计算序列的最终统计数据。接受一个可选的强制转换方法,否则假定序列包含有效的数字。
// Fibonacci numbers generator $fibonacci = map(function () { yield 0; $prev = 0; $current = 1; while (true) { yield $current; $next = $prev + $current; $prev = $current; $current = $next; } }); // Statistics for the second hundred Fibonacci numbers. $variance = $fibonacci->slice(101, 100)->finalVariance(); $variance->getStandardDeviation(); // float(3.5101061922557E+40) $variance->getCount(); // int(100)
贡献
欢迎对文档和测试用例的贡献。也欢迎错误报告。
尽管如此,API 预计将保持其简单性。
关于集合管道的一般信息
关于 Martin Fowler 的 集合管道编程模式。
在更广泛的意义上,这个库实现了 CSP(通信顺序进程)范式的子集,而不是 Actor 模型。
还有什么
- Hack 的管道运算符与此类似,但不会用于生成器,并且在常规 PHP 中也不工作。查看 JavaScript 中类似运算符的提案。
nikic/iter
提供了如 array_map 等函数,但返回的是惰性生成器。您需要相当多的粘合剂才能做到与 Pipeline 相同的事情,更不用说缺少的功能。- League\Pipeline 仅适用于单个值。名称相似,但用途完全不同。不打算与值序列一起工作。每个阶段只能返回一个值。
- Illuminate\Support\Collection 是一个用于处理数据数组的流畅包装器。只能与数组一起工作,也是预期为仅数组包装器的不可变。
- Knapsack 是一个相似的选择。可以接受可遍历的输入,具有惰性评估。但不能从单个输入中生成多个值。为需要它们的人提供了大量的实用函数:它们超出了此项目的范围。
- transducers.php 如果您已经熟悉 Clojure 中的 transducers,则值得仔细查看。API 并非非常 PHP 风格。阅读时可能不太友好。作者详细的撰写。
- Lars Strojny 等人编写的 PHP 函数的原语旨在补充现有的 PHP 函数,确实如此,尽管它也受到了与
array_map
和array_filter
相同的一些局限。没有方法链。 - Chain 提供了一种一致且可链式的方式来在 PHP 中处理数组,但仅限于数组。不支持延迟评估。
- Hugh Grigg 的使用 PHP 生成器实现简单管道。对一个非常接近的概念进行了论证和解释。可能可以将这个库作为方法名不同的直接替换项使用。
- 就处理多吉字节大小的日志文件而言,loophp 的 Collection 看起来是这个库的一个可行的替代品。它支持流畅的接口。支持不可变性作为第一原则,尽管 PHP 的生成器本质上是可以变的。
- 如果你熟悉 Java,java.util.stream 包 提供了相同概念的实现。
- Scala 的集合类型。
提交一个 PR 添加你的内容。