raoptimus / php-nats-streaming

用于 nats-streaming-server 的 PHP 客户端。

此软件包的官方仓库似乎已丢失,因此软件包已被冻结。

0.2.9 2019-01-18 15:41 UTC

README

构建

主分支 开发分支
Build Status Build Status

覆盖率

主分支 开发分支
Coverage Status Coverage Status

简介

Nats Streaming 服务器提供的 PHP 客户端。

在底层使用 phpnats,并且其 API 与之非常相似。

要求

安装

获取 composer

curl -O https://getcomposer.org.cn/composer.phar && chmod +x composer.phar

将 php-nats-streaming 添加为项目的依赖项

php composer.phar require 'raoptimus/php-nats-streaming:^0.2.5'

使用方法

发布

$options = new \NatsStreaming\ConnectionOptions();
$options->setClientID("test");
$options->setClusterID("test-cluster");
$c = new \NatsStreaming\Connection($options);

$c->connect();

// Publish
$r = $c->publish('special.subject', 'some serialized payload...');

// optionally wait for the ack
$gotAck = $r->wait();
if (!$gotAck) {
    ...
}

$c->close();

注意

如果您要一次发布多条消息,最初可能这样做:

foreach ($req as $data){
    $r = $c->publish(...);
    $gotAck = $r->wait();
    if (!$gotAck) {
        ...
    }
}

实际上,以下做法要快得多:

$rs = [];
foreach ($req as $data){
    $rs[] = $c->publish(...);
}

foreach ($rs as $r){
    $r->wait();
}

订阅

$options = new \NatsStreaming\ConnectionOptions();
$c = new \NatsStreaming\Connection($options);

$c->connect();

$subOptions = new \NatsStreaming\SubscriptionOptions();
$subOptions->setStartAt(\NatsStreamingProtos\StartPosition::First());

$sub = $c->subscribe('special.subject', function ($message) {
    // implement
}, $subOptions);

$sub->wait(1);

// not explicitly needed
$sub->unsubscribe(); // or $sub->close();

$c->close();

如果您想订阅多个频道,可以使用 $c->wait()

...

$c->connect();

...

$sub = $c->subscribe('special.subject', function ($message) {
    // implement
}, $subOptions);
$sub2 = $c->subscribe('special.subject', function ($message) {
    // implement
}, $subOptions);

$c->wait();

队列组订阅

$options = new \NatsStreaming\ConnectionOptions();
$c = new \NatsStreaming\Connection($options);

$c->connect();

$subOptions = new \NatsStreaming\SubscriptionOptions();
$sub = $c->queueSubscribe('specialer.subject', 'workgroup', function ($message) {
    // implement
}, $subOptions);


$sub->wait(1);

// not explicitly needed
$sub->close(); // or $sub->unsubscribe();

$c->close();

手动确认

$options = new \NatsStreaming\ConnectionOptions();
$c = new \NatsStreaming\Connection($options);

$c->connect();

$subOptions = new \NatsStreaming\SubscriptionOptions();
$subOptions->setManualAck(true);

$sub = $c->subscribe('special.subject', function ($message) {
    $message->ack();
}, $subOptions);

$sub->wait(1);

$c->close();

许可

MIT 许可,请参阅 LICENSE