raoptimus / php-nats-streaming
用于 nats-streaming-server 的 PHP 客户端。
此软件包的官方仓库似乎已丢失,因此软件包已被冻结。
0.2.9
2019-01-18 15:41 UTC
Requires
- protobuf-php/protobuf: ^0.1.3
- repejota/nats: dev-master#7164c9247eb1b49af5f0fed0c9d2e46080b9b71e@dev
Requires (Dev)
- github.com/gogo/protobuf: dev-master
- phpunit/phpunit: 5.3.*
- protobuf-php/protobuf-plugin: ^0.1.2
- satooshi/php-coveralls: dev-master
- squizlabs/php_codesniffer: ~2.0
- symfony/console: ^2.8@dev
README
构建
主分支 | 开发分支 |
---|---|
覆盖率
主分支 | 开发分支 |
---|---|
简介
为 Nats Streaming 服务器提供的 PHP 客户端。
在底层使用 phpnats,并且其 API 与之非常相似。
要求
- php 5.6+
- stan
安装
获取 composer
curl -O https://getcomposer.org.cn/composer.phar && chmod +x composer.phar
将 php-nats-streaming 添加为项目的依赖项
php composer.phar require 'raoptimus/php-nats-streaming:^0.2.5'
使用方法
发布
$options = new \NatsStreaming\ConnectionOptions(); $options->setClientID("test"); $options->setClusterID("test-cluster"); $c = new \NatsStreaming\Connection($options); $c->connect(); // Publish $r = $c->publish('special.subject', 'some serialized payload...'); // optionally wait for the ack $gotAck = $r->wait(); if (!$gotAck) { ... } $c->close();
注意
如果您要一次发布多条消息,最初可能这样做:
foreach ($req as $data){ $r = $c->publish(...); $gotAck = $r->wait(); if (!$gotAck) { ... } }
实际上,以下做法要快得多:
$rs = []; foreach ($req as $data){ $rs[] = $c->publish(...); } foreach ($rs as $r){ $r->wait(); }
订阅
$options = new \NatsStreaming\ConnectionOptions(); $c = new \NatsStreaming\Connection($options); $c->connect(); $subOptions = new \NatsStreaming\SubscriptionOptions(); $subOptions->setStartAt(\NatsStreamingProtos\StartPosition::First()); $sub = $c->subscribe('special.subject', function ($message) { // implement }, $subOptions); $sub->wait(1); // not explicitly needed $sub->unsubscribe(); // or $sub->close(); $c->close();
如果您想订阅多个频道,可以使用 $c->wait()
... $c->connect(); ... $sub = $c->subscribe('special.subject', function ($message) { // implement }, $subOptions); $sub2 = $c->subscribe('special.subject', function ($message) { // implement }, $subOptions); $c->wait();
队列组订阅
$options = new \NatsStreaming\ConnectionOptions(); $c = new \NatsStreaming\Connection($options); $c->connect(); $subOptions = new \NatsStreaming\SubscriptionOptions(); $sub = $c->queueSubscribe('specialer.subject', 'workgroup', function ($message) { // implement }, $subOptions); $sub->wait(1); // not explicitly needed $sub->close(); // or $sub->unsubscribe(); $c->close();
手动确认
$options = new \NatsStreaming\ConnectionOptions(); $c = new \NatsStreaming\Connection($options); $c->connect(); $subOptions = new \NatsStreaming\SubscriptionOptions(); $subOptions->setManualAck(true); $sub = $c->subscribe('special.subject', function ($message) { $message->ack(); }, $subOptions); $sub->wait(1); $c->close();
许可
MIT 许可,请参阅 LICENSE