三脉/pipeline

通用集合管道

资助包维护!
三脉

安装: 13,166,119

依赖者: 14

建议者: 1

安全: 0

星标: 59

关注者: 5

分支: 4

开放问题: 14

v6.11 2024-06-15 03:11 UTC

README

Total Downloads Latest Stable Version CI Coverage Status Type Coverage

Pipeline 是一个 PHP 库,它将流管道的力量带到你的代码中。受函数式编程语言中常见的管道操作符(|>)的启发,Pipeline 允许你以清晰、简洁和可读的方式将一系列操作链接到你的数据上。这种方法不仅简化了复杂的数据转换,而且使代码更易于维护和测试。

Pipeline 使处理 iterable 类型变得尽可能容易,因此它是一个专门用于定制数据处理管道的完美工具,这就是其名称的由来。如果你曾经将几个 bash 命令串联起来,其中一个命令使用另一个命令的输出,这个库就是这样做的,但是是针对 PHP 函数、生成器、数组和迭代器的。

Pipeline 包含最重要的也是最基础的构建模块。它拥有对数据从任意生成器以及所有类型的标准迭代器进行映射、过滤、归约、压缩和解包的方法。

这个经过严格测试的库可以正常工作。Pipeline 既不定义也不抛出任何异常。

安装

composer require sanmai/pipeline

最新版本需要 PHP 7.4 或更高版本,包括 PHP 8.2 及以后的版本。

有一些更早的版本可以在 PHP 5.6 及以上版本下运行,但它们的功能并不完整。

使用

use function Pipeline\take;

// iterable corresponds to arrays, generators, iterators
// we use an array here simplicity sake
$iterable = range(1, 3);

// wrap the initial iterable with a pipeline
$pipeline = take($iterable);

// join side by side with other iterables of any type
$pipeline->zip(
    \range(1, 3),
    map(function () {
        yield 1;
        yield 2;
        yield 3;
    })
);

// lazily process their elements together
$pipeline->unpack(function (int $a, int $b, int $c) {
    return $a - $b - $c;
});

// map one value into several more
$pipeline->map(function ($i) {
    yield pow($i, 2);
    yield pow($i, 3);
});

// simple one-to-one mapper
$pipeline->cast(function ($i) {
    return $i - 1;
});

// map into arrays
$pipeline->map(function ($i) {
    yield [$i, 2];
    yield [$i, 4];
});

// unpack array into arguments
$pipeline->unpack(function ($i, $j) {
    yield $i * $j;
});

// one way to filter
$pipeline->map(function ($i) {
    if ($i > 50) {
        yield $i;
    }
});

// this uses a filtering iterator from SPL under the hood
$pipeline->filter(function ($i) {
    return $i > 100;
});

// reduce to a single value; can be an array or any value
$value = $pipeline->reduce(function ($carry, $item) {
    // for the sake of convenience the default reducer from the simple
    // pipeline does summation, just like we do here
    return $carry + $item;
}, 0);

var_dump($value);
// int(104)

API 入口点

所有入口点总是返回管道的实例。

实例方法概述

Pipeline 是一个迭代器,可以用作任何其他可迭代对象。

Pipeline 可以用作 count() 的参数。实现了 Countable 接口。请注意,计数操作是一个终端操作

通常,Pipeline 实例是可变的,这意味着每个返回 Pipeline 的方法都返回同一个 Pipeline 实例。这使我们能够在不犯明显的错误的同时,灵活地信任某人或某物向 Pipeline 实例添加处理阶段。例如,如果你添加一个处理阶段,它将保留在那里,无论你是否捕获返回值。这种特性在其他情况下可能会成为线程安全问题,但在 PHP 中这不是问题。

注意事项

  • 由于大多数回调都是延迟评估的,随着更多数据的进出,你必须使用普通的 foreach 或使用 reduce() 来确保处理发生。

    foreach ($pipeline as $result) {
        // Processing happens only if you consume the results.
        // Want to stop early after few results? Not a problem here!
    }

    除非使用结果,否则几乎不会发生任何事情。这正是延迟评估的意义所在!

  • 尽管如此,如果用于初始化管道的非生成器,它将被立即执行。

    $pipeline = new \Pipeline\Standard();
    $pipeline->map(function () {
        // will be executed immediately on the spot, unless yield is used
        return $this->veryExpensiveMethod();
    })->filter();

    在上面的情况下,管道将内部存储一个数组,并且当可能时,管道将积极地操作该数组。因此,当有疑问时,请使用生成器。

    $pipeline->map(function () {
        // will be executed only as needed, when needed
        yield $this->veryExpensiveMethod();
    })->filter();
  • 在最佳努力的基础上保留生成的值的键,因此在使用 iterator_to_array() 处理管道时必须小心:具有重复键的值将被丢弃,并且对于给定的键,只返回最后一个值。

    $pipeline = \Pipeline\map(function () {
        yield 'foo' => 'bar';
        yield 'foo' => 'baz';
    });
    
    var_dump(iterator_to_array($pipeline));
    /* ['foo' => 'baz'] */

    更安全的方法是使用提供的 toArray() 方法。它会返回所有值,无论使用哪些键,确保在过程中丢弃所有键。

    var_dump($pipeline->toArray());
    /* ['bar', 'baz'] */

    此方法还接受一个可选参数以保留键。

  • 结果管道是一个迭代器,应该假定它不可重置,就像它使用的生成器一样。

     $pipeline = \Pipeline\map(function () {
         yield 1;
     });
     
     $sum = $pipeline->reduce();
     
     // Won't work the second time though
     $pipeline->reduce();
     // Exception: Cannot traverse an already closed generator

    尽管在某些情况下,管道可以被重置并像常规数组一样重复使用,但用户不应对此行为做出任何假设,因为它不是API兼容性保证的一部分。

  • 管道实现了 IteratorAggregate,这与 Iterator 不同。在需要后者的情况下,可以将管道包装在 IteratorIterator 中。

    $iterator = new \IteratorIterator($pipeline);
    /** @var $iterator \Iterator */
  • 再次遍历管道将导致未定义的行为。最好避免这样做。

类和接口:概述

  • \Pipeline\Standard 是管道的主要用户界面类,对于大多数方法提供了合理的默认值。

这个库是为了持久使用而构建的。没有抛出异常的地方。不要考虑任何断言。

方法

__construct()

接受 Traversable 的实例或无实例。在后一种情况下,管道必须通过将初始生成器传递给 map 方法来初始化。

$pipeline->map()

接受一个以生成器函数或纯映射函数形式存在的处理阶段。如果没有提供回调,则不执行任何操作。

$pipeline->map(function (Customer $customer) {
    foreach ($customer->allPayments() as $item) {
        yield $item;
    }
});

// Now process all and every payment
$pipeline->map(function (Payment $payment) {
    return $payment->amount;
});

也可以接受一个初始生成器,但该生成器不得需要任何参数。

$pipeline = new \Pipeline\Standard();
$pipeline->map(function () {
    yield $this->foo;
    yield $this->bar;
});

$pipeline->flatten()

展开输入

$pipeline->map(function () {
    yield [1];
    yield [2, 3];
})->unpack()->toArray();
// [1, 2, 3]

$pipeline->unpack()

map 的一个额外变体,它将数组解包为回调的参数。

使用 map() 时,你会这样做:

$pipeline->map(function ($args) {
    list ($a, $b) = $args;

    // and so on
});

使用 unpack() 时,这些操作会由幕后为你完成

$pipeline->map(function () {
    yield [-1, [10, 20], new DateTime()];
});
$pipeline->unpack(function ($a, array $b, \DateTime ...$dates) {
    // and so on
});

你还可以轻松进行各种标准类型检查。

如果没有回调,unpack() 的默认回调将像 flatten() 一样展开输入。

$pipeline->cast()

map 类似工作,但不为生成器提供特殊处理。想想 array_map

$pipeline->cast(function (Customer $customer) {
    foreach ($customer->allPayments() as $item) {
        yield $item;
    }
});

$pipeline->map(function (\Generator $paymentGenerator) {
    // Keeps grouping as per customer
});

在这个例子中,map() 将填充管道的一系列付款,而 cast() 将为每个客户添加一个生成器。

$pipeline->zip()

将多个可迭代对象连接在一起,形成一个元素并排的饲料。

$pipeline = take($iterableA);
$pipeline->zip($iterableB, $iterableC);
$pipeline->unpack(function ($elementOfA, $elementOfB, $elementOfC) {
    // ... 
});

对于元素数量不等的迭代器,缺失的元素将保留为 null。

$pipeline->filter()

接受一个类似于 array_filter 的过滤器回调。

$pipeline->filter(function ($item) {
    return $item->isGood() && $item->amount > 0;
});

管道具有默认回调,其效果与 array_filter 相同:它会移除所有假值。

$pipeline->slice()

接受偏移量和长度参数,其工作方式与 array_slice 非常相似,其中 $preserve_keys 设置为 true。

$pipeline->slice(1, -1);

此示例将删除序列的第一和最后一个元素。

实现使用滚动窗口缓冲区处理负偏移量和长度值,并在输入数组上回退到普通的 array_slice

$pipeline->reduce()

接受一个类似于 array_reduce 的减少回调,该回调有两个参数,用于上一迭代的值和当前项。作为第二个参数,它可以接受一个初始值。

$total = $pipeline->reduce(function ($curry, $item) {
    return $curry + $item->amount;
}, 0);

管道具有默认回调,它将求和所有值。

$pipeline->toArray()

返回包含管道中所有值的数组。所有数组键都被忽略,以确保返回每个值。

// Yields [0 => 1, 1 => 2]
$pipeline = map(function () {
    yield 1;
    yield 2;
});

// For each value yields [0 => $i + 1, 1 => $i + 2]
$pipeline->map(function ($i) {
    yield $i + 1;
    yield $i + 2;
});

$result = $pipeline->toArray();
// Since keys are ignored we get:
// [2, 3, 3, 4]

如果在示例中使用 iterator_to_array($result),他们将得到 [3, 4]

$pipeline->each()

使用提供的回调急切地遍历序列。

$pipeline->each(function ($i) {
    $this->log("Saw $i");
});

除非由第二个参数指示,否则在迭代后丢弃序列。

$pipeline->getIterator()

实现符合 Traversable 接口的方法。如果使用未经初始化的 \Pipeline\Standard,它将返回一个空数组迭代器,本质上是一个无操作管道。因此,这应该不会出错。

$pipeline = new \Pipeline\Standard();
foreach ($pipeline as $value) {
    // no errors here
}

这允许在没有任何结果返回的情况下跳过返回值的类型检查:而不是返回 falsenull,安全地返回一个未经初始化的管道。

$pipeline->runningVariance()

计算序列的在线统计信息:计数、样本均值、样本方差、标准差。您可以使用如 getCount()getMean()getVariance()getStandardDeviation() 等方法实时访问这些数字。

此方法还接受一个可选的强制转换回调,该回调应返回 float|null:丢弃 null 值。因此,您可以计算来自数据不同部分的多个运行方差。

$pipeline->runningVariance($varianceForShippedOrders, static function (order $order): ?float {
    if (!$order->isShipped()) {
        // This order will be excluded from the computation.
        return null;
    }

    return $order->getTotal();
});

$pipeline->runningVariance($varianceForPaidOrders, static function (order $order): ?float {
    if ($order->isUnpaid()) {
        // This order will be excluded from the computation.
        return null;
    }

    return $order->getProjectedTotal();
});

在处理订单时,您将能够访问 $varianceForShippedOrders->getMean()$varianceForPaidOrders->getMean()

此计算使用 Welford 的在线算法,因此可以处理非常大的数据点数量。

$pipeline->finalVariance()

一个方便的方法来计算序列的最终统计数据。接受一个可选的强制转换方法,否则假定序列包含有效的数字。

// Fibonacci numbers generator
$fibonacci = map(function () {
    yield 0;

    $prev = 0;
    $current = 1;

    while (true) {
        yield $current;
        $next = $prev + $current;
        $prev = $current;
        $current = $next;
    }
});

// Statistics for the second hundred Fibonacci numbers.
$variance = $fibonacci->slice(101, 100)->finalVariance();

$variance->getStandardDeviation();
// float(3.5101061922557E+40)

$variance->getCount();
// int(100)

贡献

欢迎对文档和测试用例的贡献。也欢迎错误报告。

尽管如此,API 预计将保持其简单性。

关于集合管道的一般信息

关于 Martin Fowler 的 集合管道编程模式

在更广泛的意义上,这个库实现了 CSP(通信顺序进程)范式的子集,而不是 Actor 模型。

还有什么

  • Hack 的管道运算符与此类似,但不会用于生成器,并且在常规 PHP 中也不工作。查看 JavaScript 中类似运算符的提案。
  • nikic/iter 提供了如 array_map 等函数,但返回的是惰性生成器。您需要相当多的粘合剂才能做到与 Pipeline 相同的事情,更不用说缺少的功能。
  • League\Pipeline 仅适用于单个值。名称相似,但用途完全不同。不打算与值序列一起工作。每个阶段只能返回一个值。
  • Illuminate\Support\Collection 是一个用于处理数据数组的流畅包装器。只能与数组一起工作,也是预期为仅数组包装器的不可变。
  • Knapsack 是一个相似的选择。可以接受可遍历的输入,具有惰性评估。但不能从单个输入中生成多个值。为需要它们的人提供了大量的实用函数:它们超出了此项目的范围。
  • transducers.php 如果您已经熟悉 Clojure 中的 transducers,则值得仔细查看。API 并非非常 PHP 风格。阅读时可能不太友好。作者详细的撰写。
  • Lars Strojny 等人编写的 PHP 函数的原语旨在补充现有的 PHP 函数,确实如此,尽管它也受到了与 array_maparray_filter 相同的一些局限。没有方法链。
  • Chain 提供了一种一致且可链式的方式来在 PHP 中处理数组,但仅限于数组。不支持延迟评估。
  • Hugh Grigg 的使用 PHP 生成器实现简单管道。对一个非常接近的概念进行了论证和解释。可能可以将这个库作为方法名不同的直接替换项使用。
  • 就处理多吉字节大小的日志文件而言,loophp 的 Collection 看起来是这个库的一个可行的替代品。它支持流畅的接口。支持不可变性作为第一原则,尽管 PHP 的生成器本质上是可以变的。
  • 如果你熟悉 Java,java.util.stream 包 提供了相同概念的实现。
  • Scala 的集合类型

提交一个 PR 添加你的内容。

更多徽章

License FOSSA Status