domraider/rxnet

PHP 的 Rx 连接器


README

License Latest Stable Version Total Downloads Latest Unstable Version composer.lock

RxPhp 是一项伟大的工作,它为我们带来了响应式编程:面向人类的异步编程。
您可以在 RxMarble.com 上进行响应式X的实验,在官方的 Reactivex.io 网站上查找所有可用的操作符,或阅读一篇 有趣的介绍

RxNet 是一项旨在“开箱即用”的努力。

感谢 react/react,它的奇妙反应模式以及与之相关的所有工作,许多只是简单的封装。

安装

使用 composer : domraider/rxnet

或直接克隆库,运行 composer install 并尝试示例脚本。

为什么有一个仓库包含所有内容?因为当您开始使用它时,您希望每个库都符合 RxFriendly :)

Dns

异步 DNS 解析器。感谢 daverandom/libdns 解析器,代码编写变得非常容易。

不需要额外的扩展

$dns = new Dns();
// Procedural way
echo Rx\awaitOnce($dns->resolve('www.google.fr', '8.8.4.4'));

// All types of queries are allowed
$dns->soa('www.google.fr')
  ->subscribe(new StdoutObserver());

Http

具有所有 ReactiveX 优点的 HTTP 客户端

不需要额外的扩展

$http = new Http();
$http->get("https://github.com/Domraider/rxnet/commits/master")
  // Timeout after 0.3s
  ->timeout(300)
  // will retry 2 times on error 
  ->retry(2)
  // Transform response to something else
  ->map(function(Psr\Http\Message\ResponseInterface $response) {
  	$body = (string) $response->getBody();
	// Domcrawler to extract commits
    return $body;
  })
  ->subscribe(new StdoutObserver());

// All the given options
$opts = [
  // No buffering, you will receive chunks has they arrived
  // Perfect for big files to parse or streaming json
  'stream' => true,
  // You can use body, json or form_params
  // * json will add the header and json_encode
  // * form_params will build query in body and add application/x-www-form-urlencoded header
  'body' => 'raw body for post',
  'json' => ['my'=>'parameters', 'they-will->be'=>'json'],
  'form_param' => ['param_0'=>'value_0', 'param_1'=>'value_1'],
  // Set query string from here not the url
  'query'=> [
    'param1'=>'one'
  ],
  // Use a proxy
  'proxy' => 'http://user:password@myproxy:8080',
  // Append extra headers
  'headers' => [
    'Authorization' => 'Basic '.base64_encode('user:password'),
    // Specify user-agent to use
    'User-Agent' => 'SuperParser/0.1',
  ], 
  // see https://php.ac.cn/manual/en/context.ssl.php
  // Add whatever option you want on your https query
  'ssl' => [
    'verify_peer' => false
  ],
  // allow redirect
  'allow_redirects' => true,
  // or
  'allow_redirects' => [
    // max redirects to follow
    'max' => 10
  ]
];

$http->post('https://adwords.google.com/my-10gb.xml', $opts)
  ->subscribeCallback(function($chunk) {
    // let's give it to expat while it arrives
  });

待办事项

[ ] Psr7 请求/响应

[ ] 多部分

[ ] gzip/deflate

HttpD

基于 react/http 的独立 HTTP 服务器,默认路由器为 nikic/fast-route

不需要额外的扩展

$httpd = new HttpD();
$httpd->route('GET', '/', function(HttpdRequest $request, HttpdResponse $response) {
  $response->text('Hello');
});
$httpd->route('POST', '/{var}', function(HttpdRequest $request, HttpdResponse $response) {
  $var = $request->getRouteParam('var');
  $response->json(['var'=>$var]);
});
$httpd->listen(8080);

在 MacBook Pro 上的性能约为每秒 500 条消息。

请记住,它不需要任何 fpm 即可运行。默认的 fpm 配置为 10 个子进程。

待办事项

[ ] Psr7 请求/响应

[ ] gzip/deflate

[ ] http2

[ ] 多部分?

[ ] ssl?:D

RabbitMq

来自 jakubkulhan/bunny 的包装器,它运行得很好

不需要额外的扩展

消费

$rabbit = new RabbitMq('rabbit://guest:guest@127.0.0.1:5672/', new Serialize());
// Wait for rabbit to be connected
\Rxnet\awaitOnce($rabbit->connect());

// Will wait for messages
$rabbit->consume()
    ->subscribeCallback(function (RabbitMessage $message) use ($debug, $rabbit) {
        echo '.';
        $data = $message->getData();
        $name = $message->getName();
        $head = $message->getLabels();
        // Do what you want but do one of this to get next
        $message->ack();
        //$message->nack();
        //$message->reject();
        //$message->rejectToBottom();
    });

生产

$queue = $rabbit->queue('test_queue', []);
$exchange = $rabbit->exchange('amq.direct');

$rabbit->connect()
    ->zip([
      // Declare all the binding
        $queue->create($queue::DURABLE),
        $queue->bind('/routing/key', 'amq.direct'),
        $exchange->create($exchange::TYPE_DIRECT, [
            $exchange::DURABLE,
            $exchange::AUTO_DELETE
        ])
    ])
    // Everything's done let's produce
    ->subscribeCallback(function () use ($exchange, $loop) {
        $done = 0;
		// Just a simple array
        \Rx\Observable::just(['id' => 2, 'foo' => 'bar'])
            // Wait for one produce to be done before starting another
            ->flatMap(function ($data) use ($exchange) {
                // Rabbit will handle serialize and unserialize
                return $exchange->produce($data, '/routing/key');
            })
            // Produce it many times
            ->repeat($10000)
            // Let's get some stats
            ->subscribeCallback(
                function () use (&$done) { $done++;}, 
                function (\Exception $e) { echo "shit happens : ".$e->getMessage();}, 
                function () use (&$done, $loop) { echo number_format($done)." lines produced"; }
        	);
    });

待办事项

[ ] 将所有可能的选项添加为常量

Redis

来自 clue/redis 的包装器(伟大的工作!)

不需要额外的扩展

$redis = new Redis();

// Wait for redis to be ready
$redis = RxNet\awaitOnce($redis->connect('127.0.0.1:6379'));

$redis->get('key')
  ->subscribe(new StdoutObserver());
// Every redis operators return an observable
// And they are all implemented

ZeroMq

通过 TCP(或 ipc 或 inproc)进行消息交换。

需要 Pecl ZMQ 扩展才能运行

路由器/经销商是双向通信。经销商将等待路由器,路由器不会,因此经销商必须首先启动

$zmq = new ZeroMq(new MsgPack());
// Connect to the router with my identity
$dealer = $zmq->dealer('tcp://127.0.0.1:3000', 'Roger');
// Display output
$dealer->subscribeCallback(new StdoutObserver())	
// And start
$dealer->send(new PingCommand('ping'));
// Bind and wait
$router = $zmq->router('tcp://127.0.0.1:3000');
// Show received message and wait 0.1s to answer
$router->doOnEach(new StdOutObserver())
  ->delay(100)
  ->subscribeCallback(function($data) use($router) {
  	$router->send(new ReceivedEvent('pong'), 'Roger');
  });

不同的协议

您可以进行 push/pullreq/rep,阅读 ØMQ - The Guide 以查看哪种网络模型适合您。

路由器经销商每秒 5k 到 10k 条消息。推送/拉取每秒 30k 条消息。

待办事项

[ ] pub/sub

InfluxDB

基于 influxdata/influxdb-php 的 InfluxDB 客户端。目前仅支持 UDP 协议(仅限写入)。我们的主要目标是实现一个非阻塞客户端以发送指标。

Statsd

基于 此 gistphp-datadogstatsd 的 Statsd 客户端,支持标签功能。

$statsd->gauge("database.connections", 42)
  ->subscribe(new StdOutObserver(), new EventLoopScheduler($loop));

MySQL

mysql 客户端使用 mysqli。

$conn = new Rxnte\Mysql\Connection([
    'host' => 'localhost',
    'user' => 'myUser',
    'password' => 'myPasswd',
    'database' => 'myDb'
]);

$conn->query('SELECT NOW()');

$conn->transaction(['SELECT NOW()']);

Sweet

AwaitOnce

经典的程序式方法始终是可行的,但要注意副作用。

$observable = $http->get('http://www.google.fr')
  ->timeout(1000)
  ->retry(3);
// Loop continue to go forward during await
$response = Rxnet\awaitOnce($observable);
// but this echo will wait before running
echo "1";

Await

使用 rx/await,可以将你的可观察对象转换为生成器。

$source = \Rx\Observable::interval(1000)
    ->take(5); //Limit items to 5

$generator = \Rx\await($source);

foreach ($generator as $item) {
    echo $item, PHP_EOL;
}
echo "DONE";

按需

// Great to read gigabytes without memory leaks
$reader = new \Rxnet\OnDemand\OnDemandFileReader("./test.csv");
$reader->getObservable()
    ->subscribeCallback(
        function ($row) use ($reader) {
            echo "got row : {$row}\n";
            // read next octet
            $reader->produceNext();
        },
        null,
        function() {
            echo "------------------\n";
            echo "read completed\n";
        }
    );
$reader->produceNext(1);

OnBackPressureBuffer

$backPressure = new \Rxnet\Operator\OnBackPressureBuffer(
    5, // Buffer capacity 
    function($next, \SplQueue $queue) {echo "Buffer overflow";}, // Callable on buffer full (nullable) 
    OnBackPressureBuffer::OVERFLOW_STRATEGY_ERROR // strategy on overflow
);

\Rx\Observable::interval(1000)
    ->doOnNext(function($i) {
        echo "produce {$i} ";
    })
    ->lift($backPressure->operator())
    ->flatMap(function ($i) {
        return \Rx\Observable::just($i)
            ->delay(3000);
    })
    ->doOnNext([$backPressure, 'request'])
    ->subscribe($stdout, $scheduler);

OnBackPressureBufferFile

如果消费速度低于生产速度,则下一个元素将被写入给定文件夹中的文件。

启动时,读取缓冲区的路径以查找现有和未消费的事件。

$backPressure = new \Rxnet\Operator\OnBackPressureBufferFile(
    './', // Folder to write files
    new MsgPack(), // Serializer to use
    -1, // Buffer capacity, -1 for unlimited
    function($next, \SplQueue $queue) {echo "Buffer overflow";}, // Callable on buffer full (nullable) 
    OnBackPressureBuffer::OVERFLOW_STRATEGY_ERROR // strategy on overflow
);

\Rx\Observable::interval(1000)
    ->doOnNext(function($i) {
        echo "produce {$i} ";
    })
    ->lift($backPressure->operator())
    ->flatMap(function ($i) {
        return \Rx\Observable::just($i)
            ->delay(3000);
    })
    ->doOnNext([$backPressure, 'request'])
    ->subscribe($stdout, $scheduler);