sonicgd/php-nats-streaming

PHP 客户端用于 nats-streaming-server。

0.2.6 2018-11-09 08:05 UTC

This package is auto-updated.

Last update: 2024-09-09 20:39:25 UTC


README

构建

覆盖率

简介

php 客户端用于 Nats Streaming Server

底层使用 phpnats,并且其 API 非常相似。

需求

安装

获取 composer

curl -O https://getcomposer.org.cn/composer.phar && chmod +x composer.phar

将 php-nats-streaming 添加为项目依赖

php composer.phar require 'byrnedo/php-nats-streaming:^0.2.4'

使用

发布

$options = new \NatsStreaming\ConnectionOptions();
$options->setClientID("test");
$options->setClusterID("test-cluster");
$c = new \NatsStreaming\Connection($options);

$c->connect();

// Publish
$r = $c->publish('special.subject', 'some serialized payload...');

// optionally wait for the ack
$gotAck = $r->wait();
if (!$gotAck) {
    ...
}

$c->close();

注意

如果同时发布多条消息,最初你可能这样做

foreach ($req as $data){
    $r = $c->publish(...);
    $gotAck = $r->wait();
    if (!$gotAck) {
        ...
    }
}

实际上 非常 快速的做法是以下这样

$rs = [];
foreach ($req as $data){
    $rs[] = $c->publish(...);
}

foreach ($rs as $r){
    $r->wait();
}

订阅

$options = new \NatsStreaming\ConnectionOptions();
$c = new \NatsStreaming\Connection($options);

$c->connect();

$subOptions = new \NatsStreaming\SubscriptionOptions();
$subOptions->setStartAt(\NatsStreamingProtos\StartPosition::First());

$sub = $c->subscribe('special.subject', function ($message) {
    // implement
}, $subOptions);

$sub->wait(1);

// not explicitly needed
$sub->unsubscribe(); // or $sub->close();

$c->close();

如果你想订阅多个频道,可以使用 $c->wait()

...

$c->connect();

...

$sub = $c->subscribe('special.subject', function ($message) {
    // implement
}, $subOptions);
$sub2 = $c->subscribe('special.subject', function ($message) {
    // implement
}, $subOptions);

$c->wait();

队列组订阅

$options = new \NatsStreaming\ConnectionOptions();
$c = new \NatsStreaming\Connection($options);

$c->connect();

$subOptions = new \NatsStreaming\SubscriptionOptions();
$sub = $c->queueSubscribe('specialer.subject', 'workgroup', function ($message) {
    // implement
}, $subOptions);


$sub->wait(1);

// not explicitly needed
$sub->close(); // or $sub->unsubscribe();

$c->close();

手动确认

$options = new \NatsStreaming\ConnectionOptions();
$c = new \NatsStreaming\Connection($options);

$c->connect();

$subOptions = new \NatsStreaming\SubscriptionOptions();
$subOptions->setManualAck(true);

$sub = $c->subscribe('special.subject', function ($message) {
    $message->ack();
}, $subOptions);

$sub->wait(1);

$c->close();

许可证

MIT,见 LICENSE