amphp/pipeline

异步迭代器和运算符。

资助包维护!
amphp

v1.2.1 2024-07-04 00:56 UTC

README

AMPHP 是一个 PHP 事件驱动库集合,它考虑了纤程和并发性。 amphp/pipeline 提供了并发迭代器和集合运算符。

安装

此包可以作为 Composer 依赖项安装。

composer require amphp/pipeline

要求

此包需要 PHP 8.1 或更高版本。

使用方法

使用基于纤程的协程,现在可以使用 PHP 内置的 Iterator 在单个纤程中创建和消费异步集合。但是,尝试从多个纤程中消费 Iterator 实例可能会有问题,因为一个纤程可能会在另一个纤程挂起时修改迭代器的状态。

此库提供了一个 ConcurrentIterator 接口,它提供了一个线程安全的迭代器,可以被多个纤程同时消费,以及创建异步集合的工具。

并发迭代器

可以使用 ConcurrentIterator 替代 Iterator,这意味着它可以与 foreachyield fromiterator_to_array()、参数解包等一起使用!

Iterator 一样,ConcurrentIterator 也可以手动迭代,具有用于前进和检索当前值的单独方法。

use Amp\Pipeline\Pipeline;

// Pipeline::getIterator() returns a ConcurrentIterator
$concurrentIterator = Pipeline::fromIterable([1, 2, 3])->getIterator();
while ($concurrentIterator->continue()) {
    $position = $concurrentIterator->getPosition();
    $value = $concurrentIterator->getValue();

    // ...
}

// Equivalently, multiple fibers may consume a single ConcurrentIterator
// instance using foreach.
$concurrentIterator = Pipeline::fromIterable([1, 2, 3])->getIterator();
foreach ($concurrentIterator as $position => $value) {
    // ...
}

continue() 暂停当前纤程,直到有值可用或迭代器完成,分别返回 truefalse。如果迭代器的源在生成下一个值时抛出异常,则会从 continue() 抛出异常。

getValue() 返回当前纤程中迭代器上发出的最后一个值。此函数的返回值将在当前纤程中不会改变,直到再次调用 continue()。必须在调用此方法之前调用并返回 continue()

getPosition() 返回当前迭代器中的当前 0 索引位置。如果从多个纤程中消费,此值在单个纤程中可能不是顺序的。类似于 getValue(),必须在调用此方法之前调用并返回 continue()

注意 通常不需要在应用程序代码中直接调用这些方法。并发迭代器通常应与 foreach 一起使用。

队列

Queue 用于创建一个异步值集合,具有等待消费生成值的 Ability,提供更多的反压,以便消费和生成可以同步。

可以通过两种方式将值添加到 Queue 中。

  • push() 将值添加到队列中,只有当值从队列中消费后才会返回。
  • pushAsync() 将值添加到队列中,立即返回一个 Future,该 Future 仅在值从队列中消费后才会完成。
use Amp\Pipeline\Queue;
use function Amp\async;
use function Amp\delay;

$queue = new Queue();

$start = \microtime(true);
$elapsed = fn () => \microtime(true) - $start;

// Generate values in a separate fiber
async(function () use ($queue, $elapsed): void {
    printf("Starting production loop at %.3fs\n", $elapsed());

    foreach (range(1, 10) as $value) {
        delay(0.1); // Production of a value takes between 100ms
        $queue->push($value);
    }

    printf("Completing production loop at %.3fs\n", $elapsed());

    // Queue must be completed, otherwise foreach loop below will never exit!
    $queue->complete();
});

foreach ($queue->iterate() as $value) {
    printf("Iterator yielded %d at %.3fs\n", $value, $elapsed());
    delay(0.5); // Listener consumption takes 500 ms
}

将所有值推送到 Queue 后,生产者必须调用 complete() 来结束并发迭代器。如果不这样做,消费者将无限期地挂起。或者,生产者可以使用 error() 抛出异常给并发迭代器消费者并结束并发迭代器,以指示错误。

DisposedException

如果由Queue生成的并发迭代器的消费者被销毁,push()将抛出DisposedException异常(或者从pushAsync()返回的未来将因DisposedException异常而失败)。这表明不需要生成更多值,因为对这些值的消费已经结束。如果出于某种原因生产者想要继续(例如,从缓冲区中读取字节),则可以捕获该异常或忽略该未来。(每个队列只创建一个DisposedException实例。)

管道

Pipeline表示一个异步集合,并提供可以应用于集合的操作。

use Amp\Pipeline\Pipeline;
use function Amp\delay;

$pipeline = Pipeline::fromIterable(function (): \Generator {
    for ($i = 0; $i < 100; ++$i) {
        yield $i;
    }
});

$pipeline = $pipeline
    ->concurrent(10) // Process up to 10 items concurrently
    ->unordered() // Results may be consumed eagerly and out of order
    ->tap(fn () => delay(random_int(1, 10) / 10)) // Observe each value with a delay for 0.1 to 1 seconds, simulating I/O
    ->map(fn (int $input) => $input * 10) // Apply an operation to each value
    ->filter(fn (int $input) => $input % 3 === 0); // Filter only values divisible by 3

foreach ($pipeline as $value) {
    echo $value, "\n";
}

或者,Pipeline还包含消耗集合的方法,如forEach()reduce(),这些方法仅在集合完成或抛出异常后才会返回。

use Amp\Pipeline\Pipeline;

Pipeline::generate(function (): int { static $v = 0; return ++$v; })
    ->take(10) // Take only 10 values from the generation function.
    ->concurrent(3) // Process 3 values concurrently
    ->delay(1) // Delay for 1 second to simulate I/O
    ->forEach(function (int $value): void {
        echo $value, "\n";
    });

版本

amphp/pipeline遵循与所有其他amphp包相同的semver语义版本规范。

安全

如果您发现任何安全相关的问题,请通过电子邮件[email protected]而不是使用问题跟踪器来报告。

许可证

MIT许可证(MIT)。有关更多信息,请参阅LICENSE