miquido / observable
可观察库
v1.0.0
2018-09-26 09:28 UTC
Requires
- php: ^7.1.3
Requires (Dev)
- friendsofphp/php-cs-fixer: ^2.13
- phpstan/phpstan: ^0.10.3
- phpstan/phpstan-phpunit: ^0.10.0
- phpunit/phpunit: ^7.0
This package is not auto-updated.
Last update: 2024-09-20 23:34:00 UTC
README
可观察
数据流类的集合。
安装
使用 Composer 安装此包
composer require miquido/observable
代码示例
请也查看 miquido/csv-file-reader 库以获取更多实际示例。
创建并订阅数据流
您可以从简单的 Miquido\Observable\Stream\FromArray::create 方法开始。
<?php use Miquido\Observable\Stream\FromArray; use Miquido\Observable\Observer; // create an observable stream from an array: // $stream is an objects that implements Miquido\Observable\ObservableInterface $stream = FromArray::create([1, 2, 3, 4, 5]); // then you can subscribe to the stream using Observer (both parameters are optional): $stream->subscribe(new Observer( function (int $i): void { /* this callback will be called 5 times with consecutive 1, 2, 3, 4, 5*/ }, function (): void { /* this callback will be called once after every items in the stream will be emitted */ } )); // alternatively - if you are only interested in items in the stream you can pass just a callback $stream->subscribe(function (int $i): void { // do something with numbers });
使用 操作符 在流中操作数据
操作符在您想在通知观察者之前处理流中的数据时非常有用。操作符不会干扰源流,每个操作符都返回一个可以独立订阅的新流。
<?php use Miquido\Observable\Stream\FromArray; use Miquido\Observable\Operator; $stream = FromArray::create([1, 2, 3, 4, 5]); $squareStream = $stream->pipe(new Operator\Map(function (int $i): int { return $i * $i; })); $sumStream = $stream->pipe(new Operator\Sum()); $squareSumStream = $squareStream->pipe(new Operator\Sum()); $tripleSumStream = $stream ->pipe(new Operator\Map(function (int $i): int { return $i ** 3; })) ->pipe(new Operator\Filter(function (int $i): bool { return $i % 3 > 0; })); $squareStream->subscribe(function ($i) {}); // called 5 times with consecutive: 1, 4, 9, 16, 25 $squareSumStream->subscribe(function ($i) {}); // called once with a number 55 $sumStream->subscribe(function ($i) {}); // called once with a number 15
您还可以添加多个管道
<?php use Miquido\Observable\Stream\FromArray; use Miquido\Observable\Operator; $stream = FromArray::create([1, 2, 3, 4, 5, 6]); $stream // first pipe raises each number to a power 3, ->pipe(new Operator\Map(function (int $i): int { return $i ** 3; })) // then BufferCount(3) receives stream of numbers: 1, 8, 25, 64, 125, 216 // holds the stream until it receives 3 values, then releases array with three values ->pipe(new Operator\BufferCount(3)) // next pipes receives two arrays of three numbers: [1, 8, 27], [64, 125, 216] and returns sum of each group ->pipe(new Operator\Map(function (array $numbers): int { return array_sum($numbers); })) ->subscribe(function (int $i): void { // subscribe() is called twice with numbers: 36, 405 });
使用 Subject
Subject 既是观察者又是可观察对象。下面是一个示例
<?php use Miquido\Observable\Subject\Subject; use Miquido\Observable\Operator; // lets create a Subject $words = new Subject(); // because it is an observable, you can pipe and subscribe to the data $words ->pipe(new Operator\Map(function (string $word): string { return \strtoupper($word); })) ->subscribe(function (string $word): void { // receives upper cased words }); $words ->pipe(new Operator\Map('ucfirst')) ->pipe(new Operator\Reduce( function (string $sentenceInProgress, string $word) { return \sprintf('%s %s', $sentenceInProgress, $word); }, '' )) ->pipe(new Operator\Map('trim')) ->subscribe(function (string $sentence): void { // receives a sentence of all words int the stream }); // And because a Subject is also an observer, you can push new items to the stream. $words->next('lorem'); $words->next('ipsum'); $words->next('dolor'); $words->next('sit'); $words->next('amet'); // complete will send a "complete" notification to all observers and will remove observers from the subject $words->complete(); /** * In this example: * - first subscriber will receive 5 items: 'LOREM', 'IPSUM', 'DOLOR', 'SIT' and 'AMET' * - second subscriber will recive one item: 'Lorem Ipsum Dolor Sit Amet' */
内置操作符列表
ArrayCount 操作符
将数组项转换为具有计数值的数字。
<?php use Miquido\Observable\Stream\FromArray; use Miquido\Observable\Operator; $stream = FromArray::create([ [1, 2], [3, 4, 5] ]); $stream ->pipe(new Operator\ArrayCount()) ->subscribe(function (int $count): void { // called twice with values: 2 and 3 });
BufferCount 操作符
将单个项目分组到提供大小的数组中。
<?php use Miquido\Observable\Stream\FromArray; use Miquido\Observable\Operator; $stream = FromArray::create([1, 2, 3, 4, 5, 6]); $stream ->pipe(new Operator\BufferCount(3)) ->subscribe(function (array $values): void { // called twice with values: [1, 2, 3] and [4, 5, 6] });
BufferUniqueCount 操作符
类似于 BufferCount,但删除重复项。
<?php use Miquido\Observable\Stream\FromArray; use Miquido\Observable\Operator; $stream = FromArray::create([1, 1, 2, 1, 3, 4, 5, 5, 6]); $stream ->pipe(new Operator\BufferUniqueCount(3)) ->subscribe(function (array $values): void { // called twice with values: [1, 2, 3] and [4, 5, 6] });
Count 操作符
计算流中发出的所有项。
<?php use Miquido\Observable\Stream\FromArray; use Miquido\Observable\Operator; $stream = FromArray::create([1, 1, 2, 1, 3, 4, 5, 5, 6]); $stream ->pipe(new Operator\Count()) ->subscribe(function (int $count): void { // called once with value: 9 });
Filter 操作符
删除所有为提供的回调返回 false 的值。
<?php use Miquido\Observable\Stream\FromArray; use Miquido\Observable\Operator; $stream = FromArray::create([1, 2, 3, 4, 5]); $stream ->pipe(new Operator\Filter(function (int $number): bool { return $number % 2 === 0; })) ->subscribe(function (int $number): void { // called twice with values: 2, 4 });
Flat 操作符
如果流中的项目是数组,则 Flat 将此数组转换为一系列单个项。
<?php use Miquido\Observable\Stream\FromArray; use Miquido\Observable\Operator; $stream = FromArray::create([ [1, 2], [3, 4, 5] ]); $stream ->pipe(new Operator\Flat()) ->subscribe(function (int $number): void { // called 5 times with values: 1, 2, 3, 4, 5 var_dump($number); });
Let 操作符
什么都不做,只是为流中的每个项触发提供的回调,并返回未更改的值。
<?php use Miquido\Observable\Stream\FromArray; use Miquido\Observable\Operator; $stream = FromArray::create([1, 2, 3, 4, 5]); $stream ->pipe(new Operator\Let(function (int $number): void { // do something with this number, no need to return anything })) ->subscribe(function (int $number): void { // called 5 times with values: 1, 2, 3, 4, 5 var_dump($number); });
Map 操作符
将流中的每个项转换为新的值。
<?php use Miquido\Observable\Stream\FromArray; use Miquido\Observable\Operator; $stream = FromArray::create(['lorem', 'ipsum', 'dolor', 'sit', 'amet']); $stream ->pipe(new Operator\Map(function (string $word): int { return \strlen($word); })) ->subscribe(function (int $length): void { // called 5 times with values: 5, 5, 5, 3, 5 });
Reduce 操作符
<?php use Miquido\Observable\Stream\FromArray; use Miquido\Observable\Operator; $stream = FromArray::create([1, 2, 3, 4, 5]); $stream ->pipe(new Operator\Reduce( function (int $sum, int $number): int { return $sum + $number; }, 0 )) ->subscribe(function (int $sum): void { // called once with value 15 });
Scan 操作符
类似于 Reduce,但观察者在每次 Scan 调用后都会收到一个值。
<?php use Miquido\Observable\Stream\FromArray; use Miquido\Observable\Operator; $stream = FromArray::create([1, 2, 3, 4, 5]); $stream ->pipe(new Operator\Scan( function (int $sum, int $number): int { return $sum + $number; }, 0 )) ->subscribe(function (int $sum): void { // called 5 times with values: 1, 3, 6, 10, 15 });
Sum 操作符
将流中的所有项相加。
<?php use Miquido\Observable\Stream\FromArray; use Miquido\Observable\Operator; $stream = FromArray::create([1, 2, 3, 4, 5]); $stream ->pipe(new Operator\Sum()) ->subscribe(function (int $sum): void { // called once with value 15 });
贡献
欢迎提交拉取请求、错误修复和问题报告。在提出更改之前,请通过创建问题来讨论您的更改。