domraider / rxnet
PHP 的 Rx 连接器
Requires
- php: ^5.6 || ^7.0
- anahkiasen/underscore-php: ^2.0
- clue/redis-react: ^1.0
- domraider/bunny: @dev
- domraider/libdns: ^1.1
- guzzlehttp/guzzle: ^6.0
- guzzlehttp/psr7: ~1.4.0
- nikic/fast-route: ^1.2
- phpoption/phpoption: ^1.5
- ramsey/uuid: ^3.5.1
- react/react: 0.4.2
- reactivex/rxphp: ^1.1
- voryx/event-loop: 0.2.*
Requires (Dev)
- bcrowe/growl: ^2.0
- cloak/peridot-cloak-plugin: ^2.0
- holyshared/file-fixture: ^2.0
- peridot-php/leo: ^1.5
- peridot-php/peridot-code-coverage-reporters: ^2.0
- peridot-php/peridot-concurrency: ^1.1
- peridot-php/peridot-dot-reporter: ^1.0
- peridot-php/peridot-list-reporter: ^1.0
- peridot-php/peridot-prophecy-plugin: ^1.1
- peridot-php/peridot-watcher-plugin: ^1.3
Suggests
- ext-mysqli: *
- ext-zmq: *
- dev-master
- 0.6.18
- 0.6.17
- 0.6.16
- 0.6.15
- 0.6.14
- 0.6.13
- 0.6.12
- 0.6.11
- 0.6.9
- 0.6.8
- 0.6.6
- 0.6.5
- 0.6.4
- 0.6.3
- 0.6.2
- 0.6.1
- 0.6.0
- 0.5.1
- 0.5.0
- 0.4.1
- 0.4.0
- 0.3.3
- 0.3.1
- 0.3.0
- 0.1.0
- dev-alternate
- dev-revert-45-revert-44-feature/yolo-routing
- dev-fix/router-exact-match
- dev-fix/regression-buffered-stream
- dev-feature/react-socket
- dev-label-interface
- dev-feature/backpressure
- dev-feature/rabbit
This package is not auto-updated.
Last update: 2024-09-12 10:20:53 UTC
README
RxPhp 是一项伟大的工作,它为我们带来了响应式编程:面向人类的异步编程。
您可以在 RxMarble.com 上进行响应式X的实验,在官方的 Reactivex.io 网站上查找所有可用的操作符,或阅读一篇 有趣的介绍。
RxNet 是一项旨在“开箱即用”的努力。
- Dns
- Http
- Httpd
- RabbitMq
- Redis
- ZeroMq
- InfluxDB
- Statsd
- 其他外部
- voryx/pg-async postgres 客户端
- RxPHP/RxStream 流
- RxPHP/RxWebsocket WebSocket 客户端/服务器
- RxPHP/RxChildProcess 进程创建
感谢 react/react,它的奇妙反应模式以及与之相关的所有工作,许多只是简单的封装。
安装
使用 composer : domraider/rxnet
或直接克隆库,运行 composer install
并尝试示例脚本。
为什么有一个仓库包含所有内容?因为当您开始使用它时,您希望每个库都符合 RxFriendly :)
Dns
异步 DNS 解析器。感谢 daverandom/libdns 解析器,代码编写变得非常容易。
不需要额外的扩展
$dns = new Dns(); // Procedural way echo Rx\awaitOnce($dns->resolve('www.google.fr', '8.8.4.4')); // All types of queries are allowed $dns->soa('www.google.fr') ->subscribe(new StdoutObserver());
Http
具有所有 ReactiveX 优点的 HTTP 客户端
不需要额外的扩展
$http = new Http(); $http->get("https://github.com/Domraider/rxnet/commits/master") // Timeout after 0.3s ->timeout(300) // will retry 2 times on error ->retry(2) // Transform response to something else ->map(function(Psr\Http\Message\ResponseInterface $response) { $body = (string) $response->getBody(); // Domcrawler to extract commits return $body; }) ->subscribe(new StdoutObserver()); // All the given options $opts = [ // No buffering, you will receive chunks has they arrived // Perfect for big files to parse or streaming json 'stream' => true, // You can use body, json or form_params // * json will add the header and json_encode // * form_params will build query in body and add application/x-www-form-urlencoded header 'body' => 'raw body for post', 'json' => ['my'=>'parameters', 'they-will->be'=>'json'], 'form_param' => ['param_0'=>'value_0', 'param_1'=>'value_1'], // Set query string from here not the url 'query'=> [ 'param1'=>'one' ], // Use a proxy 'proxy' => 'http://user:password@myproxy:8080', // Append extra headers 'headers' => [ 'Authorization' => 'Basic '.base64_encode('user:password'), // Specify user-agent to use 'User-Agent' => 'SuperParser/0.1', ], // see https://php.ac.cn/manual/en/context.ssl.php // Add whatever option you want on your https query 'ssl' => [ 'verify_peer' => false ], // allow redirect 'allow_redirects' => true, // or 'allow_redirects' => [ // max redirects to follow 'max' => 10 ] ]; $http->post('https://adwords.google.com/my-10gb.xml', $opts) ->subscribeCallback(function($chunk) { // let's give it to expat while it arrives });
待办事项
[ ] Psr7 请求/响应
[ ] 多部分
[ ] gzip/deflate
HttpD
基于 react/http 的独立 HTTP 服务器,默认路由器为 nikic/fast-route
不需要额外的扩展
$httpd = new HttpD(); $httpd->route('GET', '/', function(HttpdRequest $request, HttpdResponse $response) { $response->text('Hello'); }); $httpd->route('POST', '/{var}', function(HttpdRequest $request, HttpdResponse $response) { $var = $request->getRouteParam('var'); $response->json(['var'=>$var]); }); $httpd->listen(8080);
在 MacBook Pro 上的性能约为每秒 500 条消息。
请记住,它不需要任何 fpm 即可运行。默认的 fpm 配置为 10 个子进程。
待办事项
[ ] Psr7 请求/响应
[ ] gzip/deflate
[ ] http2
[ ] 多部分?
[ ] ssl?:D
RabbitMq
来自 jakubkulhan/bunny 的包装器,它运行得很好
不需要额外的扩展
消费
$rabbit = new RabbitMq('rabbit://guest:guest@127.0.0.1:5672/', new Serialize()); // Wait for rabbit to be connected \Rxnet\awaitOnce($rabbit->connect()); // Will wait for messages $rabbit->consume() ->subscribeCallback(function (RabbitMessage $message) use ($debug, $rabbit) { echo '.'; $data = $message->getData(); $name = $message->getName(); $head = $message->getLabels(); // Do what you want but do one of this to get next $message->ack(); //$message->nack(); //$message->reject(); //$message->rejectToBottom(); });
生产
$queue = $rabbit->queue('test_queue', []); $exchange = $rabbit->exchange('amq.direct'); $rabbit->connect() ->zip([ // Declare all the binding $queue->create($queue::DURABLE), $queue->bind('/routing/key', 'amq.direct'), $exchange->create($exchange::TYPE_DIRECT, [ $exchange::DURABLE, $exchange::AUTO_DELETE ]) ]) // Everything's done let's produce ->subscribeCallback(function () use ($exchange, $loop) { $done = 0; // Just a simple array \Rx\Observable::just(['id' => 2, 'foo' => 'bar']) // Wait for one produce to be done before starting another ->flatMap(function ($data) use ($exchange) { // Rabbit will handle serialize and unserialize return $exchange->produce($data, '/routing/key'); }) // Produce it many times ->repeat($10000) // Let's get some stats ->subscribeCallback( function () use (&$done) { $done++;}, function (\Exception $e) { echo "shit happens : ".$e->getMessage();}, function () use (&$done, $loop) { echo number_format($done)." lines produced"; } ); });
待办事项
[ ] 将所有可能的选项添加为常量
Redis
来自 clue/redis 的包装器(伟大的工作!)
不需要额外的扩展
$redis = new Redis(); // Wait for redis to be ready $redis = RxNet\awaitOnce($redis->connect('127.0.0.1:6379')); $redis->get('key') ->subscribe(new StdoutObserver()); // Every redis operators return an observable // And they are all implemented
ZeroMq
通过 TCP(或 ipc 或 inproc)进行消息交换。
需要 Pecl ZMQ 扩展才能运行
路由器/经销商是双向通信。经销商将等待路由器,路由器不会,因此经销商必须首先启动
$zmq = new ZeroMq(new MsgPack()); // Connect to the router with my identity $dealer = $zmq->dealer('tcp://127.0.0.1:3000', 'Roger'); // Display output $dealer->subscribeCallback(new StdoutObserver()) // And start $dealer->send(new PingCommand('ping'));
// Bind and wait $router = $zmq->router('tcp://127.0.0.1:3000'); // Show received message and wait 0.1s to answer $router->doOnEach(new StdOutObserver()) ->delay(100) ->subscribeCallback(function($data) use($router) { $router->send(new ReceivedEvent('pong'), 'Roger'); });
不同的协议
您可以进行 push/pull
,req/rep
,阅读 ØMQ - The Guide 以查看哪种网络模型适合您。
路由器经销商每秒 5k 到 10k 条消息。推送/拉取每秒 30k 条消息。
待办事项
[ ] pub/sub
InfluxDB
基于 influxdata/influxdb-php 的 InfluxDB 客户端。目前仅支持 UDP 协议(仅限写入)。我们的主要目标是实现一个非阻塞客户端以发送指标。
Statsd
基于 此 gist 和 php-datadogstatsd 的 Statsd 客户端,支持标签功能。
$statsd->gauge("database.connections", 42) ->subscribe(new StdOutObserver(), new EventLoopScheduler($loop));
MySQL
mysql 客户端使用 mysqli。
$conn = new Rxnte\Mysql\Connection([ 'host' => 'localhost', 'user' => 'myUser', 'password' => 'myPasswd', 'database' => 'myDb' ]); $conn->query('SELECT NOW()'); $conn->transaction(['SELECT NOW()']);
Sweet
AwaitOnce
经典的程序式方法始终是可行的,但要注意副作用。
$observable = $http->get('http://www.google.fr') ->timeout(1000) ->retry(3); // Loop continue to go forward during await $response = Rxnet\awaitOnce($observable); // but this echo will wait before running echo "1";
Await
使用 rx/await,可以将你的可观察对象转换为生成器。
$source = \Rx\Observable::interval(1000) ->take(5); //Limit items to 5 $generator = \Rx\await($source); foreach ($generator as $item) { echo $item, PHP_EOL; } echo "DONE";
按需
// Great to read gigabytes without memory leaks $reader = new \Rxnet\OnDemand\OnDemandFileReader("./test.csv"); $reader->getObservable() ->subscribeCallback( function ($row) use ($reader) { echo "got row : {$row}\n"; // read next octet $reader->produceNext(); }, null, function() { echo "------------------\n"; echo "read completed\n"; } ); $reader->produceNext(1);
OnBackPressureBuffer
$backPressure = new \Rxnet\Operator\OnBackPressureBuffer( 5, // Buffer capacity function($next, \SplQueue $queue) {echo "Buffer overflow";}, // Callable on buffer full (nullable) OnBackPressureBuffer::OVERFLOW_STRATEGY_ERROR // strategy on overflow ); \Rx\Observable::interval(1000) ->doOnNext(function($i) { echo "produce {$i} "; }) ->lift($backPressure->operator()) ->flatMap(function ($i) { return \Rx\Observable::just($i) ->delay(3000); }) ->doOnNext([$backPressure, 'request']) ->subscribe($stdout, $scheduler);
OnBackPressureBufferFile
如果消费速度低于生产速度,则下一个元素将被写入给定文件夹中的文件。
启动时,读取缓冲区的路径以查找现有和未消费的事件。
$backPressure = new \Rxnet\Operator\OnBackPressureBufferFile( './', // Folder to write files new MsgPack(), // Serializer to use -1, // Buffer capacity, -1 for unlimited function($next, \SplQueue $queue) {echo "Buffer overflow";}, // Callable on buffer full (nullable) OnBackPressureBuffer::OVERFLOW_STRATEGY_ERROR // strategy on overflow ); \Rx\Observable::interval(1000) ->doOnNext(function($i) { echo "produce {$i} "; }) ->lift($backPressure->operator()) ->flatMap(function ($i) { return \Rx\Observable::just($i) ->delay(3000); }) ->doOnNext([$backPressure, 'request']) ->subscribe($stdout, $scheduler);