hhreact/hhreactor

ReactiveX算子,迭代器风格。

dev-master 2018-12-09 04:11 UTC

This package is not auto-updated.

Last update: 2024-09-24 05:46:05 UTC


README

$ composer require hhreact/hhreactor

入门

是什么

如果你了解什么是InteractiveX,HHReactor与InteractiveX理念紧密匹配,实际上只是在绑定操作符上略有不同。

如果你了解什么是ReactiveX,InteractiveX是“拉”的类似物,其中可枚举类型代替了观察者。

如果你了解AsyncGeneratorAsyncIterator,HHReactor允许你克隆、缩减和扩展异步生成器,并允许你对缓冲区强制执行精确的策略。

实际应用

<?hh
// Producer and connection_factory contain most of the functionality
use HHReactor\Producer; // *
use function HHReactor\HTTP\connection_factory; // **
use HHReactor\WebSocket\RFC6455;

\HH\Asio\join(async {

	///////////////
	// * BASIC * //
	///////////////
	
	// Start with any AsyncIterator
	$iter_numbers = async {
		for($i = 0; ; $i++) {
			yield $i;
			await \HH\Asio\later();
		}
	};

	// Now make it really P R O D U C E
	$number_producer = Producer::create($iter_numbers);

	// Transform a stream, e.g. map
	$square_producer = (clone $number_producer)->map(async ($root) ==> pow($root, 2));
	// Transform two streams into one, e.g. zip
	$cube_producer = Producer::zip($number_producer, $square_producer, ($root, $square) ==> $root * $square);
	// Transform many streams into one, e.g. merge
	foreach(Producer::merge(Vector{ $number_producer, $square_producer, $cube_producer }) await as $some_number) {
		// numbers flying at your face! Beware: no guaranteed order with `merge`
	}

	// Note that Producer wraps transparently:
	foreach(clone $producer await as $item) { /* same items as $iter_numbers */ }
	
	////////////////
	// ** HTTP ** //
	////////////////
	
	// Merge stream of requests from ports 80 and 8080
	$http_firehose = Producer::merge(Vector{ connection_factory(80), new connection_factory(8080) });
	
	// To cancel/dispose, just use what the language gives you: `break`, `return` and `throw`;
	//  the iterating scope is in full control.
	foreach(clone $http_firehose await as $connection) {
		await $connection->write('No, _you_ deal with this');
		break; // great for if you don't like commitment
		
		// The "Details of Producer" section further down explains what
		//  happens when you cancel a Producer partway
	}
	
	// If you're up for it, do something more interesting than quitting immediately
	foreach(clone $http_firehose await as $maybe_connection) {
		try {
			// try to parse headers; fail if client fails to send them all
			$connection = await $maybe_connection;
			
			$request = $connection->get_request();
			if($request->getHeader('Upgrade') === 'websocket') {
				$handler = $websocket_router->route($request->getUri());
				
				// wrap the connection object in a `WebSocketConnection` to
				//  handle handshake and websocket frames
				$handler(new RFC6455($connection))
			}
			else {
				// non-websocket requests
				$handler = $router->route($request->getMethod(), $request->getUri());
				$handler($connection); // stream the rest of the body (if there is one)
			}
		}
		catch(\Exception $e) {
			
		}
	}
	
	// In general, don't try iterate the original AsyncGenerator:
	//  you'll probably get a "Generator already started" exception
});

背景(销售点)

试图将ReactiveX思维带入Hack,使用其本地的async-await异步生成器是不自然的,因为它们宣传了相反的控制方案:await-async将迭代控制权交给消费范围来管理,而ReactiveX使用回调来将迭代控制权交给Observable。实际上,ReactiveX有一个更不为人知的双胞胎,称为InteractiveX,它用IEnumerable替换了IObservable,并且更好地描述了在async-await方案中的反应式方法。Bart De Smet提供了一个关于InteractiveX考虑的有说服力(并且,根据缺乏竞争,权威)的55分钟演讲,特别是在最后的15分钟讨论缓冲。

Hack的异步生成器对于流使用来说出类拔萃,因为无需设置:HHVM有一个内置的、隐藏的调度器。然而,这些生成器在简单性上受到限制;foreach-await几乎是他们唯一的广告用例。所以,多个流的包装器实际上只能通过依次串行迭代这些流来实现concat

通过谨慎使用ConditionWaitHandle,HHReactor的Producer能够并行化、扩展和缩减流,并将丰富的InteractiveX操作符系列引入Hack的异步迭代器中。Producer还设计得尽可能无侵入:它符合AsyncIterator签名直到+T(强调协变),它与底层迭代器的行为相匹配,并且如果没有应用操作符或克隆,它几乎是无状态的。

* HHVM >3.23的兼容性

Hack的维护者已将在即将发布的HHVM >3.23中完全从严格Hack中移除析构函数。HHReactor依赖于紧迫的引用计数并在BaseProducer上定义一个自定义析构函数以确保计时正确。如果没有保证析构函数将“根本”被调用,当它暂停时,Producer就会泄露,这是一个关键问题,如果不是破坏应用程序的bug。

因此,**我不建议在HHVM >3.23上运行HHReactor的当前版本(~1.0)**。这可能会是一个艰难的补丁,如果它甚至值得的话,解决方案可能涉及手动清理,这比HHVM <3.23中现有的自动清理要脏得多。

HHReactor:包含的内容

  • BaseProducer:管理运行克隆的克隆和会计
  • Producer 扩展自 BaseProducer:交互式X运算符和对任意调度和高阶迭代器的支持。演出的明星
  • connection_factory:监听TCP流上的HTTP请求,解析头部,并产生请求体流
  • Connection 扩展自 BaseProducer:从HTTP请求中流式传输身体,并向客户端发送响应

运算符

大多数运算符与交互式X/ReactiveX规范匹配。扫描签名的最快方法是查看参考文档

主要差异

  1. “绑定”运算符 – sharememoizepublish:这些被克隆取代。
  2. debounce 运算符:由于技术挑战尚未实现,但优先级很高。
  3. defer 运算符:没有强烈的动机来实现它。
  4. never 运算符:在Hack中(目前是这样;2017-06-17),无法实现非终止的懒惰 AwaitableAsyncIterator
  5. 在自然情况下保留顺序,例如在 mapreducefilter 中。Hack规范不保护极端病态的竞态条件,其中从迭代器产生到 Producer 缓冲区的 单一 弧线被来自另一个作用域的重启迭代器的弧线级联所超越,然后获取下一个值并将其放入共享缓冲区。截至HHVM 3.19,看起来实际的异步实现不允许这样做,但没有规范,顺序保留无法保证。

Producer 的动态性

如果两个或更多作用域消费相同的流,它们可以克隆或不需要克隆 Producer

  1. [Ix的Memoize & Publish] 如果 Producer 被克隆,则缓冲区也已被克隆,因此消费者将从克隆的那一刻开始接收相同的元素。以这种方式,克隆就像交互式X的 Memoize 和ReactiveX的 Replay

Ix Memoize and Publish marble diagram

要强调的是,克隆看不到在克隆存在之前由该 Producer 产生的任何元素。以这种方式,MemoizePublish 行为只在消费者何时/克隆什么方面有所不同。从永远不会迭代的 Producer 克隆将给出 Memoize 行为。从移动 Producer 克隆将给出 Publish 行为。

但是请注意,由于消费者控制迭代,底层迭代器直到第一个克隆请求第一个元素时才开始。

行为说明:正如下面将提到的,缓冲区像 Publish 而不是 Memoize 一样管理。 Producer 在决定哪些节点被回收方面非常简单明了,因为它依赖于垃圾回收器:一旦最慢的消费者前进,节点就会被回收。然后简单地保持或放弃未开始的 Producer,分别将分别持有或不会持有从开始就有的元素。

  1. [Ix的Share] 如果 Producer 没有克隆,则所有消费者共享相同的缓冲区,因此它们直接竞争值。

Share marble diagram

行为说明:所有运算符隐式地克隆其操作数,以避免与其他运算符或原始消费者竞争值;它们都隐式地 Publish

HTTP服务器

1. connection_factory

connection_factory 在被调用时启动HTTP服务器,通过TCP套接字接受连接。它接受每个连接并将工作委托给一个单独的异步函数来解析头部,因此它产生一个 Awaitable,如果流关闭时头部不完整,它将失败。

2. Connection

适当的头部会产生一个Connection对象,作为一个异步迭代器,它可以流式传输请求体,并提供异步的write方法来响应用户。

WebSocketConnection

当识别到WebSocket请求时,可以使用该RequestConnection对象来构建一个(很可能是)子类自WebScoketConnectionRFC6455对象。它处理握手、解析帧,并将客户端的字符串转换为帧,还提供了一个异步的write_frames方法来将字符串帧发送回客户端。

参考文档

hphpdoc使得按需编译参考文档变得非常容易。

$ mkdir doc
$ ./vendor/appertly/hphpdoc/bin/hphpdoc -o doc -- src
$ # View in browser at ./doc/index.html

Producer的详细信息

缓冲的详细信息

生产者和消费者之间的时间线通过一个缓冲区和通知信号来分隔,该信号告知消费者队列中至少有一个项目。它的工作方式类似于餐厅厨房:项目被生产并排队,然后拉响“铃声”来通知工作人员在方便时尽快为消费者提供服务。

由于Hack调度器的时序和排序规则相对较弱,因此信号非常弱。值得注意的是,如果许多await语句并行排队并且可以同时恢复,Hack调度器对它们处理的顺序没有保证。Producer等待它们持有的各种迭代器,消费者等待Producer;这些都在并行排队,这受制于排序规则的弱点。为了在不使用缓冲区的情况下实现Producer,我们必须保证消费者在任何一个Producer下的迭代器产生之后立即获得控制权,这在一般情况下是不可靠的。

Producer依赖于垃圾收集器来清除由于消费者消费速度不同而积累的缓冲区。随着最慢的消费者逐步通过缓冲区,他们对缓冲区链表中最早期节点的引用被丢弃,垃圾收集器清除这些不可达的节点。

虽然这可能会改变,但PHP的工作方式迫使HHVM在一定程度上采用急切引用计数,这有助于更快地清除耗尽的节点。

任意调度和高级迭代器

/* Condensed signature */
Producer<T>::__construct(Vector<(function((function(AsyncIterator<T>): void)): AsyncIterator<T>)> $generators)

/* Expanded signature */
Producer<T>::__construct(
	Vector< /* many-to-one */
		(function( /* factories of AsyncIterators */
			(function(AsyncIterator<T>): void) /* that can append new ones at any time */
		): AsyncIterator<T>)
	> $generators
)

Producer::create是大多数情况下应用运算符到AsyncGenerator的起点。然而,Producer::create只是构造函数的一个特殊情况,它允许生成函数通过调用传递给它的“追加”函数在任何点上将其创建的迭代器合并到输出流中。这是一种有用的行为,因为它意味着**异步代码可以制造更多异步代码并运行它而不会阻塞自身**。

友情提示:追加是通过与值相同的“铃声”弱信号通知的,因此迭代器不一定会立即迭代。

flat_map最直接地使用这种行为——运算符的主体必须并行迭代Producer和从它接收到的AsyncIterator。实际上,这种并行化是通过追加函数完成的

如果不使用追加器,Producer的构造函数将把生成函数的值流合并到一个公共输出中。merge实际上就是这样实现的

友情提示:对于高阶 Producer,如 Producer<Producer<T>>,外层 Producer 不会自动克隆内层 Producer。这限制了较少,但可能会让人惊讶:外层生产者的克隆将产生相同的内层 Producer,这些内层 Producer 将被 共享,因此它们的消费者将竞争值,尽管外层生产者是冷态。

运行、暂停和提前退出

方法

您可以通过几种方式停止从 Producer 消费,每种方式对资源的影响都不同。

  1. 只是释放对其的所有引用,尽可能快地释放资源。这包括所有克隆以及来自感兴趣 Producer 操作的所有 Producer
  2. 只释放正在运行的实例/克隆,快速停止消耗资源,但可能稍后重启。
  3. 快速释放资源,但可能稍后重启
// given:
$producer = Producer::create($async_generator);

//...

$iterator_edge = $producer->get_iterator_edge(); // Awaitable<void>
$producer = null; // drop the references like they're hot
await $iterator_edge; // wait for $async_generator `next` to become available

// ...

foreach($async_generator await as $v) { /* begin iterating it again */ }

原因

在处置 Producer 时,有两个决定因素在它们成为不可达后影响其生态系统的迭代器和缓冲区

  1. 是否曾经对其、其克隆或来自其操作员的 Producer 调用了 next
  2. 它们包装了什么?
    • 其他 Producer(例如,它们是否是其他 Producer 操作的结果)?
    • AsyncGenerator
    • (值得注意的是,打开的 TCP 流会发生什么?)

Producer 考虑到暂停而设计,以满足任意停止缓冲值但保持资源以稍后恢复的需求。以下是一些有关此过程的有益方面

  1. Producer 请求第一项时,它开始“运行”。
  2. 每个 Producer 都知道正在运行的克隆数量。
  3. 分离一个正在运行的 Producer 会减少正在运行的计数。
  4. 当正在运行的计数降至 0 时,Producer
    1. 停止运行其子项(这将停止缓冲区的增长)并通过减少其子 Producer 的运行引用计数来“分离”它们。

请参阅 1. Producer::_attach;2. BaseProducer::running_countBaseProducer::this_runningBaseProducer::some_running;3. Producer::__destruct;4.1. Producer::awaitify;4.2. Producer::detach

Producer 包装的 connection_factory 迭代器会停止 缓冲,但 它不会在引用计数降至 0 之前关闭 TCP 套接字,因此该套接字的系统队列可能开始填充。再次强调,HHReactor 依赖于垃圾回收器来关闭这些套接字,并且只有当 所有connection_factory 迭代器的引用都释放(而不仅仅是运行中的引用)时。