rmh-media / php-nats-streaming
PHP客户端用于nats-streaming-server。
0.3.0
2019-08-01 15:17 UTC
Requires
- protobuf-php/protobuf: ^0.1.3
- rmh-media/phpnats: ^0.9.0
Requires (Dev)
- github.com/gogo/protobuf: dev-master
- phpunit/phpunit: 5.3.*
- protobuf-php/protobuf-plugin: ^0.1.2
- satooshi/php-coveralls: dev-master
- squizlabs/php_codesniffer: ~2.0
- symfony/console: ^2.8@dev
README
简介
这是Nats Streaming Server的PHP客户端。
内部使用phpnats,其API非常相似。
需求
- php 5.6+
- stan
安装
获取composer
curl -O https://getcomposer.org.cn/composer.phar && chmod +x composer.phar
将php-nats-streaming作为依赖项添加到您的项目中
php composer.phar require 'rmh-media/php-nats-streaming:^0.3.0'
用法
发布
$options = new \NatsStreaming\ConnectionOptions(); $options->setClientID("test"); $options->setClusterID("test-cluster"); $c = new \NatsStreaming\Connection($options); $c->connect(); // Publish $r = $c->publish('special.subject', 'some serialized payload...'); // optionally wait for the ack $gotAck = $r->wait(); if (!$gotAck) { ... } $c->close();
注意
如果您一次发布多条消息,您可能会首先这样做
foreach ($req as $data){ $r = $c->publish(...); $gotAck = $r->wait(); if (!$gotAck) { ... } }
实际上,以下操作要快得多
$rs = []; foreach ($req as $data){ $rs[] = $c->publish(...); } foreach ($rs as $r){ $r->wait(); }
订阅
$options = new \NatsStreaming\ConnectionOptions(); $c = new \NatsStreaming\Connection($options); $c->connect(); $subOptions = new \NatsStreaming\SubscriptionOptions(); $subOptions->setStartAt(\NatsStreamingProtos\StartPosition::First()); $sub = $c->subscribe('special.subject', function ($message) { // implement }, $subOptions); $sub->wait(1); // not explicitly needed $sub->unsubscribe(); // or $sub->close(); $c->close();
如果您想订阅多个通道,可以使用$c->wait()
... $c->connect(); ... $sub = $c->subscribe('special.subject', function ($message) { // implement }, $subOptions); $sub2 = $c->subscribe('special.subject', function ($message) { // implement }, $subOptions); $c->wait();
队列组订阅
$options = new \NatsStreaming\ConnectionOptions(); $c = new \NatsStreaming\Connection($options); $c->connect(); $subOptions = new \NatsStreaming\SubscriptionOptions(); $sub = $c->queueSubscribe('specialer.subject', 'workgroup', function ($message) { // implement }, $subOptions); $sub->wait(1); // not explicitly needed $sub->close(); // or $sub->unsubscribe(); $c->close();
手动确认
$options = new \NatsStreaming\ConnectionOptions(); $c = new \NatsStreaming\Connection($options); $c->connect(); $subOptions = new \NatsStreaming\SubscriptionOptions(); $subOptions->setManualAck(true); $sub = $c->subscribe('special.subject', function ($message) { $message->ack(); }, $subOptions); $sub->wait(1); $c->close();
许可证
MIT,见LICENSE