miquido/observable

可观察库

v1.0.0 2018-09-26 09:28 UTC

This package is not auto-updated.

Last update: 2024-09-20 23:34:00 UTC


README

Build Maintainability Test Coverage MIT Licence

可观察

数据流类的集合。

安装

使用 Composer 安装此包

composer require miquido/observable

代码示例

请也查看 miquido/csv-file-reader 库以获取更多实际示例。

创建并订阅数据流

您可以从简单的 Miquido\Observable\Stream\FromArray::create 方法开始。

<?php

use Miquido\Observable\Stream\FromArray;
use Miquido\Observable\Observer;

// create an observable stream from an array:
// $stream is an objects that implements Miquido\Observable\ObservableInterface
$stream = FromArray::create([1, 2, 3, 4, 5]);

// then you can subscribe to the stream using Observer (both parameters are optional):
$stream->subscribe(new Observer(
    function (int $i): void { /* this callback will be called 5 times with consecutive 1, 2, 3, 4, 5*/ }, 
    function (): void { /* this callback will be called once after every items in the stream will be emitted */ }
));

// alternatively - if you are only interested in items in the stream you can pass just a callback 
$stream->subscribe(function (int $i): void {
    // do something with numbers
});

使用 操作符 在流中操作数据

操作符在您想在通知观察者之前处理流中的数据时非常有用。操作符不会干扰源流,每个操作符都返回一个可以独立订阅的新流。

<?php

use Miquido\Observable\Stream\FromArray;
use Miquido\Observable\Operator;

$stream = FromArray::create([1, 2, 3, 4, 5]);

$squareStream = $stream->pipe(new Operator\Map(function (int $i): int {
    return $i * $i;
}));

$sumStream = $stream->pipe(new Operator\Sum());
$squareSumStream = $squareStream->pipe(new Operator\Sum());
$tripleSumStream = $stream
    ->pipe(new Operator\Map(function (int $i): int {
        return $i ** 3;
    }))
    ->pipe(new Operator\Filter(function (int $i): bool {
        return $i % 3 > 0;
    }));

$squareStream->subscribe(function ($i) {}); // called 5 times with consecutive: 1, 4, 9, 16, 25
$squareSumStream->subscribe(function ($i) {}); // called once with a number 55
$sumStream->subscribe(function ($i) {}); // called once with a number 15

您还可以添加多个管道

<?php

use Miquido\Observable\Stream\FromArray;
use Miquido\Observable\Operator;


$stream = FromArray::create([1, 2, 3, 4, 5, 6]);
$stream
    // first pipe raises each number to a power 3,
    ->pipe(new Operator\Map(function (int $i): int {
        return $i ** 3;
    }))
    // then BufferCount(3) receives stream of numbers: 1, 8, 25, 64, 125, 216
    // holds the stream until it receives 3 values, then releases array with three values
    ->pipe(new Operator\BufferCount(3))
    // next pipes receives two arrays of three numbers: [1, 8, 27], [64, 125, 216] and returns sum of each group
    ->pipe(new Operator\Map(function (array $numbers): int {
        return array_sum($numbers);
    }))
    ->subscribe(function (int $i): void {
        // subscribe() is called twice with numbers: 36, 405
    });

使用 Subject

Subject 既是观察者又是可观察对象。下面是一个示例

<?php

use Miquido\Observable\Subject\Subject;
use Miquido\Observable\Operator;

// lets create a Subject
$words = new Subject();

// because it is an observable, you can pipe and subscribe to the data
$words
    ->pipe(new Operator\Map(function (string $word): string {
        return \strtoupper($word);
    }))
    ->subscribe(function (string $word): void {
        // receives upper cased words
    });

$words
    ->pipe(new Operator\Map('ucfirst'))
    ->pipe(new Operator\Reduce(
        function (string $sentenceInProgress, string $word) {
            return \sprintf('%s %s', $sentenceInProgress, $word);
        },
        ''
    ))
    ->pipe(new Operator\Map('trim'))
    ->subscribe(function (string $sentence): void {
        // receives a sentence of all words int the stream
    });

// And because a Subject is also an observer, you can push new items to the stream.
$words->next('lorem');
$words->next('ipsum');
$words->next('dolor');
$words->next('sit');
$words->next('amet');

// complete will send a "complete" notification to all observers and will remove observers from the subject
$words->complete();

/**
 * In this example:
 * - first subscriber will receive 5 items: 'LOREM', 'IPSUM', 'DOLOR', 'SIT' and 'AMET'
 * - second subscriber will recive one item: 'Lorem Ipsum Dolor Sit Amet'
 */

内置操作符列表

ArrayCount 操作符

将数组项转换为具有计数值的数字。

<?php

use Miquido\Observable\Stream\FromArray;
use Miquido\Observable\Operator;

$stream = FromArray::create([
    [1, 2],
    [3, 4, 5] 
]);
$stream
    ->pipe(new Operator\ArrayCount())
    ->subscribe(function (int $count): void {
        // called twice with values: 2 and 3
    });

BufferCount 操作符

将单个项目分组到提供大小的数组中。

<?php

use Miquido\Observable\Stream\FromArray;
use Miquido\Observable\Operator;

$stream = FromArray::create([1, 2, 3, 4, 5, 6]);
$stream
    ->pipe(new Operator\BufferCount(3))
    ->subscribe(function (array $values): void {
        // called twice with values: [1, 2, 3] and [4, 5, 6]
    });

BufferUniqueCount 操作符

类似于 BufferCount,但删除重复项。

<?php

use Miquido\Observable\Stream\FromArray;
use Miquido\Observable\Operator;

$stream = FromArray::create([1, 1, 2, 1, 3, 4, 5, 5, 6]);
$stream
    ->pipe(new Operator\BufferUniqueCount(3))
    ->subscribe(function (array $values): void {
        // called twice with values: [1, 2, 3] and [4, 5, 6]
    });

Count 操作符

计算流中发出的所有项。

<?php

use Miquido\Observable\Stream\FromArray;
use Miquido\Observable\Operator;

$stream = FromArray::create([1, 1, 2, 1, 3, 4, 5, 5, 6]);
$stream
    ->pipe(new Operator\Count())
    ->subscribe(function (int $count): void {
        // called once with value: 9
    });

Filter 操作符

删除所有为提供的回调返回 false 的值。

<?php

use Miquido\Observable\Stream\FromArray;
use Miquido\Observable\Operator;

$stream = FromArray::create([1, 2, 3, 4, 5]);
$stream
    ->pipe(new Operator\Filter(function (int $number): bool {
        return $number % 2 === 0;
    }))
    ->subscribe(function (int $number): void {
        // called twice with values: 2, 4
    });

Flat 操作符

如果流中的项目是数组,则 Flat 将此数组转换为一系列单个项。

<?php

use Miquido\Observable\Stream\FromArray;
use Miquido\Observable\Operator;

$stream = FromArray::create([
    [1, 2],
    [3, 4, 5]
]);
$stream
    ->pipe(new Operator\Flat())
    ->subscribe(function (int $number): void {
        // called 5 times with values: 1, 2, 3, 4, 5
        var_dump($number);
    });

Let 操作符

什么都不做,只是为流中的每个项触发提供的回调,并返回未更改的值。

<?php

use Miquido\Observable\Stream\FromArray;
use Miquido\Observable\Operator;

$stream = FromArray::create([1, 2, 3, 4, 5]);
$stream
    ->pipe(new Operator\Let(function (int $number): void {
        // do something with this number, no need to return anything
    }))
    ->subscribe(function (int $number): void {
        // called 5 times with values: 1, 2, 3, 4, 5
        var_dump($number);
    });

Map 操作符

将流中的每个项转换为新的值。

<?php

use Miquido\Observable\Stream\FromArray;
use Miquido\Observable\Operator;

$stream = FromArray::create(['lorem', 'ipsum', 'dolor', 'sit', 'amet']);
$stream
    ->pipe(new Operator\Map(function (string $word): int {
        return \strlen($word);
    }))
    ->subscribe(function (int $length): void {
        // called 5 times with values: 5, 5, 5, 3, 5
    });

Reduce 操作符

<?php

use Miquido\Observable\Stream\FromArray;
use Miquido\Observable\Operator;

$stream = FromArray::create([1, 2, 3, 4, 5]);
$stream
    ->pipe(new Operator\Reduce(
        function (int $sum, int $number): int {
            return $sum + $number;
        },
        0
    ))
    ->subscribe(function (int $sum): void {
        // called once with value 15
    });

Scan 操作符

类似于 Reduce,但观察者在每次 Scan 调用后都会收到一个值。

<?php

use Miquido\Observable\Stream\FromArray;
use Miquido\Observable\Operator;

$stream = FromArray::create([1, 2, 3, 4, 5]);
$stream
    ->pipe(new Operator\Scan(
        function (int $sum, int $number): int {
            return $sum + $number;
        },
        0
    ))
    ->subscribe(function (int $sum): void {
        // called 5 times with values: 1, 3, 6, 10, 15
    });

Sum 操作符

将流中的所有项相加。

<?php

use Miquido\Observable\Stream\FromArray;
use Miquido\Observable\Operator;

$stream = FromArray::create([1, 2, 3, 4, 5]);
$stream
    ->pipe(new Operator\Sum())
    ->subscribe(function (int $sum): void {
        // called once with value 15
    });

贡献

欢迎提交拉取请求、错误修复和问题报告。在提出更改之前,请通过创建问题来讨论您的更改。