rxnet/eventstore-client

具有ReactiveX风格的EventStore异步PHP客户端

3.0.2 2019-03-15 09:19 UTC

README

License Latest Stable Version Total Downloads Latest Unstable Version composer.lock FOSSA Status

EventStore TCP Api的异步客户端

用法

连接

<?php
$eventStore = new \Rxnet\EventStore\EventStore();
// Default value
$eventStore->connect('tcp://admin:changeit@localhost:1113');

$eventStore = new \Rxnet\EventStore\EventStore();
// Lazy way, to connect
$eventStore->connect()
->subscribe(function() { echo "connected"; });

写入

您可以放入尽可能多的事件(最多2000个)

<?php
$eventA = new \Rxnet\EventStore\NewEvent\JsonEvent('event_type1', ['data' => 'a'], ['worker'=>'metadata']);
$eventB = new \Rxnet\EventStore\RawEvent('event_type2', 'raw data', 'raw metadata');

$eventStore->write('category-test_stream_id', [$eventA, $eventB])
    ->subscribe(function(\Rxnet\EventStore\Data\WriteEventsCompleted $eventsCompleted) {
        echo "Last event number {$eventsCompleted->getLastEventNumber()} on commit position {$eventsCompleted->getCommitPosition()} \n";
    });

事务

<?php
$eventStore->startTransaction('category-test_stream')
    ->subscribe(
        function (\Rxnet\EventStore\Transaction $transaction) {
            $eventA = new JsonEvent('event_type', ['i' => "data"]);
            $eventB = new JsonEvent('event_type', ['i' => "data"]);
            // You can write as many as you want
            return $transaction->write([$eventA, $eventB])
                // Commit to make it work
                ->flatMap([$transaction, 'commit'])
                ->subscribe(
                    function (TransactionCommitCompleted $commitCompleted) {
                        echo "Transaction {$commitCompleted->getTransactionId()} commit completed : events from {$commitCompleted->getFirstEventNumber()} to {$commitCompleted->getLastEventNumber()} \n";
                    }
                );
        }
    );

订阅

连接到具有group my-group的持久订阅$ce-category(投影),然后确认或不确认

<?php
$eventStore->persistentSubscription('projection-name', 'my-group')
    ->subscribe(function(\Rxnet\EventStore\AcknowledgeableEventRecord $event) {
        echo "received {$event->getId()} event {$event->getType()} ({$event->getNumber()}) with id {$event->getId()} on {$event->getStreamId()} \n";
        if($event->getNumber() %2) {
            $event->ack();
        }
        else {
            $event->nack($event::NACK_ACTION_RETRY, 'Explain why');
        }
    });

监视给定流中的新事件。
当出现新事件时,将调用SubscribeCallback

<?php
$eventStore->volatileSubscription('category-test_stream_id')
    ->subscribe(function(\Rxnet\EventStore\EventRecord $event) {
        echo "received {$event->getId()} event {$event->getType()} ({$event->getNumber()}) with id {$event->getId()} on {$event->getStreamId()} \n";
    });

从位置100读取所有事件,当读取完毕后,监视新事件(类似于volatile)

<?php
$eventStore->catchUpSubscription('category-test_stream_id', 100)
    ->subscribe(function(\Rxnet\EventStore\EventRecord $event) {
        echo "received {$event->getId()} event {$event->getType()} ({$event->getNumber()}) with id {$event->getId()} on {$event->getStreamId()} \n";
    });

读取

从流category-test_stream_id的事件0读取到事件100,然后结束

<?php
$eventStore->readEventsForward('category-test_stream_id', 0, 100)
    ->subscribe(function(\Rxnet\EventStore\EventRecord $event) {
        echo "received {$event->getId()} event {$event->getType()} ({$event->getNumber()}) with id {$event->getId()} on {$event->getStreamId()} \n";
    });

从流category-test_stream_id的事件100反向读取到事件90,然后结束

<?php
$eventStore->readEventsBackWard('category-test_stream_id', 100, 10)
    ->subscribe(function(\Rxnet\EventStore\EventRecord $event) {
        echo "received {$event->getId()} event {$event->getType()} ({$event->getNumber()}) with id {$event->getId()} on {$event->getStreamId()} \n";
    });

从category-test_stream_id读取第一个事件详情

<?php
$eventStore->readEvent('category-test_stream_id', 0)
    ->subscribe(function(\Rxnet\EventStore\EventRecord $event) {
        echo "received {$event->getId()} event {$event->getType()} ({$event->getNumber()}) with id {$event->getId()} on {$event->getStreamId()} \n";
    });

贡献

待办事项

  • 将事件追加到流中
  • 读取给定流
  • 订阅给定流
  • 读取大量流
  • 持久订阅
  • 连接到集群
  • 如果需要,自动重新连接到主节点
  • 重新连接并从远程断开连接
  • 事务
  • TLS连接
  • 编写一些规范
  • 创建/更新/删除持久订阅
  • 创建/更新/删除投影
  • 删除流

协议缓冲区

如果ClientMessageDtos.proto被修改,您必须生成新的Data php类

./vendor/bin/protobuf --include-descriptors -i . -o ./src ./ClientMessageDtos.proto

许可证

FOSSA Status