huholoman / php-nats-streaming
1.0.1
2020-06-13 14:43 UTC
Requires
- huholoman/phpnats: ^1.0
- protobuf-php/protobuf: ^0.1.3
Requires (Dev)
- github.com/gogo/protobuf: dev-master
- phpunit/phpunit: 5.3.*
- protobuf-php/protobuf-plugin: ^0.1.2
- satooshi/php-coveralls: dev-master
- squizlabs/php_codesniffer: ~2.0
- symfony/console: ^2.8@dev
README
构建
覆盖率
简介
A php client for Nats Streaming Server.
内部使用 phpnats 并与它的API非常相似。
要求
- php 5.6+
- stan
安装
获取 composer
curl -O https://getcomposer.org.cn/composer.phar && chmod +x composer.phar
将 php-nats-streaming 作为依赖项添加到您的项目中
php composer.phar require 'byrnedo/php-nats-streaming:^0.2.4'
用法
发布
$options = new \NatsStreaming\ConnectionOptions(); $options->setClientID("test"); $options->setClusterID("test-cluster"); $c = new \NatsStreaming\Connection($options); $c->connect(); // Publish $r = $c->publish('special.subject', 'some serialized payload...'); // optionally wait for the ack $gotAck = $r->wait(); if (!$gotAck) { ... } $c->close();
注意
如果一次发布多条消息,您可能首先这样做
foreach ($req as $data){ $r = $c->publish(...); $gotAck = $r->wait(); if (!$gotAck) { ... } }
实际上,这样做要快得多
$rs = []; foreach ($req as $data){ $rs[] = $c->publish(...); } foreach ($rs as $r){ $r->wait(); }
订阅
$options = new \NatsStreaming\ConnectionOptions(); $c = new \NatsStreaming\Connection($options); $c->connect(); $subOptions = new \NatsStreaming\SubscriptionOptions(); $subOptions->setStartAt(\NatsStreamingProtos\StartPosition::First()); $sub = $c->subscribe('special.subject', function ($message) { // implement }, $subOptions); $sub->wait(1); // not explicitly needed $sub->unsubscribe(); // or $sub->close(); $c->close();
如果您想订阅多个频道,可以使用 $c->wait()
... $c->connect(); ... $sub = $c->subscribe('special.subject', function ($message) { // implement }, $subOptions); $sub2 = $c->subscribe('special.subject', function ($message) { // implement }, $subOptions); $c->wait();
队列组订阅
$options = new \NatsStreaming\ConnectionOptions(); $c = new \NatsStreaming\Connection($options); $c->connect(); $subOptions = new \NatsStreaming\SubscriptionOptions(); $sub = $c->queueSubscribe('specialer.subject', 'workgroup', function ($message) { // implement }, $subOptions); $sub->wait(1); // not explicitly needed $sub->close(); // or $sub->unsubscribe(); $c->close();
手动确认
$options = new \NatsStreaming\ConnectionOptions(); $c = new \NatsStreaming\Connection($options); $c->connect(); $subOptions = new \NatsStreaming\SubscriptionOptions(); $subOptions->setManualAck(true); $sub = $c->subscribe('special.subject', function ($message) { $message->ack(); }, $subOptions); $sub->wait(1); $c->close();
许可证
MIT,请参阅 LICENSE