rxnet / eventstore-client
具有ReactiveX风格的EventStore异步PHP客户端
3.0.2
2019-03-15 09:19 UTC
Requires
- php: ^7.2
- ext-json: *
- google/protobuf: ^3.2
- ramsey/uuid: ^3.5
- reactivex/rxphp: ^2.0
- rxnet/socket: ^0.2.0
- trafficcophp/bytebuffer: ^0.3
- voryx/event-loop: ^3.0 || ^2.0
- zendframework/zend-stdlib: ^3.2
Requires (Dev)
- friendsofphp/php-cs-fixer: ^2.14
- phpstan/phpstan: ^0.11.1
- protobuf-php/protobuf-plugin: ^0.1.3
README
EventStore TCP Api的异步客户端
用法
连接
<?php $eventStore = new \Rxnet\EventStore\EventStore(); // Default value $eventStore->connect('tcp://admin:changeit@localhost:1113'); $eventStore = new \Rxnet\EventStore\EventStore(); // Lazy way, to connect $eventStore->connect() ->subscribe(function() { echo "connected"; });
写入
您可以放入尽可能多的事件(最多2000个)
<?php $eventA = new \Rxnet\EventStore\NewEvent\JsonEvent('event_type1', ['data' => 'a'], ['worker'=>'metadata']); $eventB = new \Rxnet\EventStore\RawEvent('event_type2', 'raw data', 'raw metadata'); $eventStore->write('category-test_stream_id', [$eventA, $eventB]) ->subscribe(function(\Rxnet\EventStore\Data\WriteEventsCompleted $eventsCompleted) { echo "Last event number {$eventsCompleted->getLastEventNumber()} on commit position {$eventsCompleted->getCommitPosition()} \n"; });
事务
<?php $eventStore->startTransaction('category-test_stream') ->subscribe( function (\Rxnet\EventStore\Transaction $transaction) { $eventA = new JsonEvent('event_type', ['i' => "data"]); $eventB = new JsonEvent('event_type', ['i' => "data"]); // You can write as many as you want return $transaction->write([$eventA, $eventB]) // Commit to make it work ->flatMap([$transaction, 'commit']) ->subscribe( function (TransactionCommitCompleted $commitCompleted) { echo "Transaction {$commitCompleted->getTransactionId()} commit completed : events from {$commitCompleted->getFirstEventNumber()} to {$commitCompleted->getLastEventNumber()} \n"; } ); } );
订阅
连接到具有group my-group的持久订阅$ce-category(投影),然后确认或不确认
<?php $eventStore->persistentSubscription('projection-name', 'my-group') ->subscribe(function(\Rxnet\EventStore\AcknowledgeableEventRecord $event) { echo "received {$event->getId()} event {$event->getType()} ({$event->getNumber()}) with id {$event->getId()} on {$event->getStreamId()} \n"; if($event->getNumber() %2) { $event->ack(); } else { $event->nack($event::NACK_ACTION_RETRY, 'Explain why'); } });
监视给定流中的新事件。
当出现新事件时,将调用SubscribeCallback
<?php $eventStore->volatileSubscription('category-test_stream_id') ->subscribe(function(\Rxnet\EventStore\EventRecord $event) { echo "received {$event->getId()} event {$event->getType()} ({$event->getNumber()}) with id {$event->getId()} on {$event->getStreamId()} \n"; });
从位置100读取所有事件,当读取完毕后,监视新事件(类似于volatile)
<?php $eventStore->catchUpSubscription('category-test_stream_id', 100) ->subscribe(function(\Rxnet\EventStore\EventRecord $event) { echo "received {$event->getId()} event {$event->getType()} ({$event->getNumber()}) with id {$event->getId()} on {$event->getStreamId()} \n"; });
读取
从流category-test_stream_id的事件0读取到事件100,然后结束
<?php $eventStore->readEventsForward('category-test_stream_id', 0, 100) ->subscribe(function(\Rxnet\EventStore\EventRecord $event) { echo "received {$event->getId()} event {$event->getType()} ({$event->getNumber()}) with id {$event->getId()} on {$event->getStreamId()} \n"; });
从流category-test_stream_id的事件100反向读取到事件90,然后结束
<?php $eventStore->readEventsBackWard('category-test_stream_id', 100, 10) ->subscribe(function(\Rxnet\EventStore\EventRecord $event) { echo "received {$event->getId()} event {$event->getType()} ({$event->getNumber()}) with id {$event->getId()} on {$event->getStreamId()} \n"; });
从category-test_stream_id读取第一个事件详情
<?php $eventStore->readEvent('category-test_stream_id', 0) ->subscribe(function(\Rxnet\EventStore\EventRecord $event) { echo "received {$event->getId()} event {$event->getType()} ({$event->getNumber()}) with id {$event->getId()} on {$event->getStreamId()} \n"; });
贡献
待办事项
- 将事件追加到流中
- 读取给定流
- 订阅给定流
- 读取大量流
- 持久订阅
- 连接到集群
- 如果需要,自动重新连接到主节点
- 重新连接并从远程断开连接
- 事务
- TLS连接
- 编写一些规范
- 创建/更新/删除持久订阅
- 创建/更新/删除投影
- 删除流
协议缓冲区
如果ClientMessageDtos.proto被修改,您必须生成新的Data php类
./vendor/bin/protobuf --include-descriptors -i . -o ./src ./ClientMessageDtos.proto