rx/stream

为 RxPHP 的异步流

3.0.2 2020-05-05 14:08 UTC

This package is auto-updated.

Last update: 2024-09-05 23:55:31 UTC


README

提供 RxPHP 观察者对象的 PHP 流库

这个库是对 ReactPHP 流库的包装。它使用 Voryx 事件循环,其行为类似于 JavaScript 事件循环。也就是说,您不需要启动它。

使用方法

从文件

    
    $source = new \Rx\React\FromFileObservable("example.csv");
    
    $source
        ->cut() //Cut the stream by PHP_EOL
        ->map('str_getcsv') //Convert csv row to an array
        ->map(function (array $row) {
            //Strip numbers from the first field
            $row[0] = preg_replace('/\d+/u', '', $row[0]);
            return $row;
        })
        ->subscribe(
            function ($data) {
                echo $data[0] . "\n";
            },
            function ($e) {
                echo "error\n";
            },
            function () {
                echo "done\n";
            }
        );
    

读取和写入文件

$source = new \Rx\React\FromFileObservable("source.txt");
$dest   = new \Rx\React\ToFileObserver("dest.txt");

$source
    ->cut()
    ->filter(function ($row) {
        return strpos($row, 'foo');
    })
    ->map(function ($row) {
        return $row . 'bar';
    })
    ->subscribe($dest);

流 - echo 示例

$read  = new \Rx\React\StreamSubject(STDIN);

$read
    ->takeWhile(function ($x) {
        return trim($x) != 15;
    })
    ->subscribe(new \Rx\React\StreamSubject(STDOUT));