PHP 的 NSQ 客户端

0.9.1 2023-12-14 15:43 UTC

This package is auto-updated.

Last update: 2024-09-14 17:32:13 UTC


README

PHP 客户端为 NSQ.

Latest Stable Version Total Downloads License codecov Mutation testing badge telegram

这个库遵循 SemVer。直到 1.0 版本发布之前,任何内容都可能随时更改,公共 API 应不被视为稳定。如果您想在稳定版本发布前使用它,请安装不带范围的严格版本。

安装

此库可以通过 Composer 安装。

composer require nsq/nsq

需求

此库需要 PHP 8.0 或更高版本。

尽管不是必需的,但建议您安装 phpinnacle/ext-buffer 以加速 phpinnacle/buffer

特性

  • PUB
  • SUB
  • 特性协商
  • 发现
  • 回退
  • TLS
  • Deflate
  • Snappy
  • 采样
  • AUTH

使用

生产者

use Nsq\Producer;

$producer = Producer::create(address: 'tcp://nsqd:4150');

// Publish a message to a topic
$producer->publish('topic', 'Simple message');

// Publish multiple messages to a topic (atomically) 
$producer->publish('topic', [
    'Message one',
    'Message two',
]);

// Publish a deferred message to a topic
$producer->publish('topic', 'Deferred message', delay: 5000);

消费者

use Nsq\Consumer;
use Nsq\Message;

$consumer = Consumer::create(
    address: 'tcp://nsqd:4150', 
    topic: 'topic',
    channel: 'channel',
    onMessage: static function (Message $message): Generator {
        yield $message->touch(); // Reset the timeout for an in-flight message        
        yield $message->requeue(timeout: 5000); // Re-queue a message (indicate failure to process)        
        yield $message->finish(); // Finish a message (indicate successful processing)        
    },
);

查找

use Nsq\Lookup;
use Nsq\Message;

$lookup = new Lookup('http://nsqlookupd0:4161');
$lookup = new Lookup(['http://nsqlookupd0:4161', 'http://nsqlookupd1:4161', 'http://nsqlookupd2:4161']);

$callable = static function (Message $message): Generator {
    yield $message->touch(); // Reset the timeout for an in-flight message        
    yield $message->requeue(timeout: 5000); // Re-queue a message (indicate failure to process)        
    yield $message->finish(); // Finish a message (indicate successful processing)        
};

$lookup->subscribe(topic: 'topic', channel: 'channel', onMessage: $callable);  
$lookup->subscribe(topic: 'anotherTopic', channel: 'channel', onMessage: $callable);

$lookup->unsubscribe(topic: 'local', channel: 'channel');
$lookup->stop(); // unsubscribe all  

集成

许可证

MIT 许可证 (MIT)。有关更多信息,请参阅 LICENSE