amphp / pipeline
异步迭代器和运算符。
Requires
- php: >=8.1
- amphp/amp: ^3
- revolt/event-loop: ^1
Requires (Dev)
- amphp/php-cs-fixer-config: ^2
- amphp/phpunit-util: ^3
- phpunit/phpunit: ^9
- psalm/phar: ^5.18
This package is auto-updated.
Last update: 2024-09-04 01:18:09 UTC
README
AMPHP 是一个 PHP 事件驱动库集合,它考虑了纤程和并发性。 amphp/pipeline
提供了并发迭代器和集合运算符。
安装
此包可以作为 Composer 依赖项安装。
composer require amphp/pipeline
要求
此包需要 PHP 8.1 或更高版本。
使用方法
使用基于纤程的协程,现在可以使用 PHP 内置的 Iterator
在单个纤程中创建和消费异步集合。但是,尝试从多个纤程中消费 Iterator
实例可能会有问题,因为一个纤程可能会在另一个纤程挂起时修改迭代器的状态。
此库提供了一个 ConcurrentIterator
接口,它提供了一个线程安全的迭代器,可以被多个纤程同时消费,以及创建异步集合的工具。
并发迭代器
可以使用 ConcurrentIterator
替代 Iterator
,这意味着它可以与 foreach
、yield from
、iterator_to_array()
、参数解包等一起使用!
与 Iterator
一样,ConcurrentIterator
也可以手动迭代,具有用于前进和检索当前值的单独方法。
use Amp\Pipeline\Pipeline; // Pipeline::getIterator() returns a ConcurrentIterator $concurrentIterator = Pipeline::fromIterable([1, 2, 3])->getIterator(); while ($concurrentIterator->continue()) { $position = $concurrentIterator->getPosition(); $value = $concurrentIterator->getValue(); // ... } // Equivalently, multiple fibers may consume a single ConcurrentIterator // instance using foreach. $concurrentIterator = Pipeline::fromIterable([1, 2, 3])->getIterator(); foreach ($concurrentIterator as $position => $value) { // ... }
continue()
暂停当前纤程,直到有值可用或迭代器完成,分别返回 true
或 false
。如果迭代器的源在生成下一个值时抛出异常,则会从 continue()
抛出异常。
getValue()
返回当前纤程中迭代器上发出的最后一个值。此函数的返回值将在当前纤程中不会改变,直到再次调用 continue()
。必须在调用此方法之前调用并返回 continue()
。
getPosition()
返回当前迭代器中的当前 0 索引位置。如果从多个纤程中消费,此值在单个纤程中可能不是顺序的。类似于 getValue()
,必须在调用此方法之前调用并返回 continue()
。
注意 通常不需要在应用程序代码中直接调用这些方法。并发迭代器通常应与
foreach
一起使用。
队列
Queue
用于创建一个异步值集合,具有等待消费生成值的 Ability,提供更多的反压,以便消费和生成可以同步。
可以通过两种方式将值添加到 Queue
中。
push()
将值添加到队列中,只有当值从队列中消费后才会返回。pushAsync()
将值添加到队列中,立即返回一个Future
,该Future
仅在值从队列中消费后才会完成。
use Amp\Pipeline\Queue; use function Amp\async; use function Amp\delay; $queue = new Queue(); $start = \microtime(true); $elapsed = fn () => \microtime(true) - $start; // Generate values in a separate fiber async(function () use ($queue, $elapsed): void { printf("Starting production loop at %.3fs\n", $elapsed()); foreach (range(1, 10) as $value) { delay(0.1); // Production of a value takes between 100ms $queue->push($value); } printf("Completing production loop at %.3fs\n", $elapsed()); // Queue must be completed, otherwise foreach loop below will never exit! $queue->complete(); }); foreach ($queue->iterate() as $value) { printf("Iterator yielded %d at %.3fs\n", $value, $elapsed()); delay(0.5); // Listener consumption takes 500 ms }
将所有值推送到 Queue
后,生产者必须调用 complete()
来结束并发迭代器。如果不这样做,消费者将无限期地挂起。或者,生产者可以使用 error()
抛出异常给并发迭代器消费者并结束并发迭代器,以指示错误。
DisposedException
如果由Queue
生成的并发迭代器的消费者被销毁,push()
将抛出DisposedException
异常(或者从pushAsync()
返回的未来将因DisposedException
异常而失败)。这表明不需要生成更多值,因为对这些值的消费已经结束。如果出于某种原因生产者想要继续(例如,从缓冲区中读取字节),则可以捕获该异常或忽略该未来。(每个队列只创建一个DisposedException
实例。)
管道
Pipeline
表示一个异步集合,并提供可以应用于集合的操作。
use Amp\Pipeline\Pipeline; use function Amp\delay; $pipeline = Pipeline::fromIterable(function (): \Generator { for ($i = 0; $i < 100; ++$i) { yield $i; } }); $pipeline = $pipeline ->concurrent(10) // Process up to 10 items concurrently ->unordered() // Results may be consumed eagerly and out of order ->tap(fn () => delay(random_int(1, 10) / 10)) // Observe each value with a delay for 0.1 to 1 seconds, simulating I/O ->map(fn (int $input) => $input * 10) // Apply an operation to each value ->filter(fn (int $input) => $input % 3 === 0); // Filter only values divisible by 3 foreach ($pipeline as $value) { echo $value, "\n"; }
或者,Pipeline
还包含消耗集合的方法,如forEach()
或reduce()
,这些方法仅在集合完成或抛出异常后才会返回。
use Amp\Pipeline\Pipeline; Pipeline::generate(function (): int { static $v = 0; return ++$v; }) ->take(10) // Take only 10 values from the generation function. ->concurrent(3) // Process 3 values concurrently ->delay(1) // Delay for 1 second to simulate I/O ->forEach(function (int $value): void { echo $value, "\n"; });
版本
amphp/pipeline
遵循与所有其他amphp
包相同的semver语义版本规范。
安全
如果您发现任何安全相关的问题,请通过电子邮件[email protected]
而不是使用问题跟踪器来报告。
许可证
MIT许可证(MIT)。有关更多信息,请参阅LICENSE
。