sonicgd / php-nats-streaming
PHP 客户端用于 nats-streaming-server。
0.2.6
2018-11-09 08:05 UTC
Requires
- protobuf-php/protobuf: ^0.1.3
- repejota/nats: dev-develop
Requires (Dev)
- github.com/gogo/protobuf: dev-master
- phpunit/phpunit: 5.3.*
- protobuf-php/protobuf-plugin: ^0.1.2
- satooshi/php-coveralls: dev-master
- squizlabs/php_codesniffer: ~2.0
- symfony/console: ^2.8@dev
This package is auto-updated.
Last update: 2024-09-09 20:39:25 UTC
README
构建
覆盖率
简介
php 客户端用于 Nats Streaming Server。
底层使用 phpnats,并且其 API 非常相似。
需求
- php 5.6+
- stan
安装
获取 composer
curl -O https://getcomposer.org.cn/composer.phar && chmod +x composer.phar
将 php-nats-streaming 添加为项目依赖
php composer.phar require 'byrnedo/php-nats-streaming:^0.2.4'
使用
发布
$options = new \NatsStreaming\ConnectionOptions(); $options->setClientID("test"); $options->setClusterID("test-cluster"); $c = new \NatsStreaming\Connection($options); $c->connect(); // Publish $r = $c->publish('special.subject', 'some serialized payload...'); // optionally wait for the ack $gotAck = $r->wait(); if (!$gotAck) { ... } $c->close();
注意
如果同时发布多条消息,最初你可能这样做
foreach ($req as $data){ $r = $c->publish(...); $gotAck = $r->wait(); if (!$gotAck) { ... } }
实际上 非常 快速的做法是以下这样
$rs = []; foreach ($req as $data){ $rs[] = $c->publish(...); } foreach ($rs as $r){ $r->wait(); }
订阅
$options = new \NatsStreaming\ConnectionOptions(); $c = new \NatsStreaming\Connection($options); $c->connect(); $subOptions = new \NatsStreaming\SubscriptionOptions(); $subOptions->setStartAt(\NatsStreamingProtos\StartPosition::First()); $sub = $c->subscribe('special.subject', function ($message) { // implement }, $subOptions); $sub->wait(1); // not explicitly needed $sub->unsubscribe(); // or $sub->close(); $c->close();
如果你想订阅多个频道,可以使用 $c->wait()
... $c->connect(); ... $sub = $c->subscribe('special.subject', function ($message) { // implement }, $subOptions); $sub2 = $c->subscribe('special.subject', function ($message) { // implement }, $subOptions); $c->wait();
队列组订阅
$options = new \NatsStreaming\ConnectionOptions(); $c = new \NatsStreaming\Connection($options); $c->connect(); $subOptions = new \NatsStreaming\SubscriptionOptions(); $sub = $c->queueSubscribe('specialer.subject', 'workgroup', function ($message) { // implement }, $subOptions); $sub->wait(1); // not explicitly needed $sub->close(); // or $sub->unsubscribe(); $c->close();
手动确认
$options = new \NatsStreaming\ConnectionOptions(); $c = new \NatsStreaming\Connection($options); $c->connect(); $subOptions = new \NatsStreaming\SubscriptionOptions(); $subOptions->setManualAck(true); $sub = $c->subscribe('special.subject', function ($message) { $message->ack(); }, $subOptions); $sub->wait(1); $c->close();
许可证
MIT,见 LICENSE