reactivex/rxphp

PHP 的响应式扩展。

2.0.13 2024-09-21 23:52 UTC

README

PHP 的响应式扩展。PHP 的响应式扩展是一组库,用于使用可观察流组合异步和基于事件的程序。

CI status Coverage Status

示例

$source = \Rx\Observable::fromArray([1, 2, 3, 4]);

$source->subscribe(
    function ($x) {
        echo 'Next: ', $x, PHP_EOL;
    },
    function (Exception $ex) {
        echo 'Error: ', $ex->getMessage(), PHP_EOL;
    },
    function () {
        echo 'Completed', PHP_EOL;
    }
);

//Next: 1
//Next: 2
//Next: 3
//Next: 4
//Completed

尝试演示

$ git clone https://github.com/ReactiveX/RxPHP.git
$ cd RxPHP
$ composer install
$ php demo/interval/interval.php

/demo 中运行演示。

注意:运行演示时,调度器会自动启动。当在您的项目中使用 RxPHP 时,您需要设置默认调度器。

安装

  1. 安装事件循环。任何事件循环都应工作,但建议使用 ReactPHP 事件循环。
$ composer require react/event-loop
  1. 使用 composer 安装 RxPHP。
$ composer require reactivex/rxphp
  1. 编写一些代码。
<?php

require_once __DIR__ . '/vendor/autoload.php';

use Rx\Observable;
use React\EventLoop\Factory;
use Rx\Scheduler;

$loop = Factory::create();

//You only need to set the default scheduler once
Scheduler::setDefaultFactory(function() use($loop){
    return new Scheduler\EventLoopScheduler($loop);
});

Observable::interval(1000)
    ->take(5)
    ->flatMap(function ($i) {
        return Observable::of($i + 1);
    })
    ->subscribe(function ($e) {
        echo $e, PHP_EOL;
    });

$loop->run();

与 Promise 一起工作

一些异步 PHP 框架尚未完全拥抱可观察对象的强大功能。为了帮助缓解过渡,RxPHP 内置了对 ReactPHP promises 的支持。

将 Promise 混入可观察流中

Observable::interval(1000)
    ->flatMap(function ($i) {
        return Observable::fromPromise(\React\Promise\resolve(42 + $i));
    })
    ->subscribe(function ($v) {
        echo $v . PHP_EOL;
    });

将可观察对象转换为 Promise。(这对于使用生成器和协程的库很有用)

$observable = Observable::interval(1000)
    ->take(10)
    ->toArray()
    ->map('json_encode');

$promise = $observable->toPromise();

更多信息

许可证

RxPHP 使用 MIT 许可证授权 - 请参阅 LICENSE 文件以获取详细信息