rx / stream
为 RxPHP 的异步流
3.0.2
2020-05-05 14:08 UTC
Requires
- php: ^7.0
- react/stream: ^1.0 || ^0.7.1
- reactivex/rxphp: ^2.0
- rx/operator-extras: ^2.0
- voryx/event-loop: ^3.0 || ^2.0
Requires (Dev)
- phpunit/phpunit: ^5.7
README
提供 RxPHP 观察者对象的 PHP 流库
这个库是对 ReactPHP 流库的包装。它使用 Voryx 事件循环,其行为类似于 JavaScript 事件循环。也就是说,您不需要启动它。
使用方法
从文件
$source = new \Rx\React\FromFileObservable("example.csv");
$source
->cut() //Cut the stream by PHP_EOL
->map('str_getcsv') //Convert csv row to an array
->map(function (array $row) {
//Strip numbers from the first field
$row[0] = preg_replace('/\d+/u', '', $row[0]);
return $row;
})
->subscribe(
function ($data) {
echo $data[0] . "\n";
},
function ($e) {
echo "error\n";
},
function () {
echo "done\n";
}
);
读取和写入文件
$source = new \Rx\React\FromFileObservable("source.txt"); $dest = new \Rx\React\ToFileObserver("dest.txt"); $source ->cut() ->filter(function ($row) { return strpos($row, 'foo'); }) ->map(function ($row) { return $row . 'bar'; }) ->subscribe($dest);
流 - echo 示例
$read = new \Rx\React\StreamSubject(STDIN); $read ->takeWhile(function ($x) { return trim($x) != 15; }) ->subscribe(new \Rx\React\StreamSubject(STDOUT));