此包已被弃用且不再维护。未建议替代包。
此包的最新版本(dev-master)没有可用的许可证信息。

NSQ的PHP客户端

dev-master 2017-12-15 13:32 UTC

This package is not auto-updated.

Last update: 2020-03-29 15:19:36 UTC


README

PHP客户端用于NSQ

注意

我无法再维护此项目了 - 为忽视的PR表示歉意。如果有人想接管,我很乐意转让所有权。通过 @davegardnerisme 发送给我消息。

NSQ基础知识

您可以通过GitHub上的readmeBitly博客文章了解NSQ的更多信息。在项目中的每个文件夹内都提供了关于nsqd、nsqlookupd的更多详细信息。

以下是我学到的几点

  • 集群仅在nsqlookupd发现特定主题的消息所在机器的意义上提供。要从集群中消费,您只需询问一个nslookupd实例在哪里可以找到消息,然后连接到它告诉您的每个nsqd(这是使nsq变得好的因素之一)。
  • 由于每个nsqd实例都是隔离的,因此HA(对于pub)很容易;您只需连接到任何实例并发送消息(我已经将其构建到客户端中)。
  • 通过简单地写入多个nsqd并在订阅时去重,提供了容错性(我已经将其构建到客户端中)。
  • NSQ不是作为工作队列(用于长时间运行的任务)而设计的。默认设置msg-timeout为60,000ms(60秒)。这是nsq自动认为消息已失败并因此重新入队之前的时间。我们的“工作”应该比这短得多。此外,PHP是一种阻塞语言,尽管我们正在使用非阻塞IO事件循环,但您处理消息所做的任何工作都会阻止客户端能够回复任何心跳等。

安装

通过composer将nsqphp添加到您的项目。只需在composer.json中添加以下内容。

{
    ...
    "require": {
        ...
        "davegardnerisme/nsqphp": "dev-master"
    }
    ...
}

您也可以简单地将其克隆到您的项目中

git clone git://github.com/davegardnerisme/nsqphp.git
cd nsqphp
git submodule update --init --recursive

要在项目中使用nsqphp,只需包含bootstrap.php文件,或通过composer设置自动加载。设计使其适合依赖注入容器(所有依赖项都是通过构造函数注入的),尽管您也可以在需要时手动设置依赖项。

测试

按照入门指南在本地主机上安装nsq。

发布一些事件

php cruft/test-pub.php 10

在一个shell中启动一个订阅者

php cruft/test-sub.php mychannel > /tmp/processed-messages

然后在另一个shell中跟踪重定向的STDOUT,以便您可以看到接收和处理的消息

tail -f /tmp/processed-messages

注意

在这些测试中,我首先发布的是第一条,因为我还没有让客户端自动重新发现给定主题的消息节点;因此,如果你先订阅,将找不到任何节点具有该主题的消息。

其他测试

多个频道

博客文章描述了一个频道

| 每个频道都接收该主题所有消息的副本。在实践中,一个频道映射到一个消费该主题的下游服务。

因此,每个topic中的消息都将被发送到每个channel

启动两个使用不同频道(每个shell中一个)的订阅者

php cruft/test-sub.php mychannel
php cruft/test-sub.php otherchannel

发布一些消息

php cruft/test-pub.php 10

每个消息都将发送到每个频道。还值得注意的是,API允许你在同一进程中订阅多个主题/频道。

多个nsqds

设置一些运行nsqdnsqlookupd的服务器,主机名为nsq1nsq2等。现在向它们都发布一些消息。

php cruft/test-pub.php 10 nsq1
php cruft/test-pub.php 10 nsq2

现在订阅

php cruft/test-sub.php mychannel > /tmp/processed-messages

你将收到20条消息。

弹性投递

与之前的测试相同,但这次我们将相同的消息发送到两个nsqd实例,然后在订阅时去重。

php cruft/test-pub.php 10 nsq1,nsq2
php cruft/test-sub.php mychannel > /tmp/processed-messages

这次你应该只收到10条消息

待办事项

  • 使用回退策略重新入队失败的消息(目前只有简单的固定延迟重新入队策略)
  • 持续评估哪些节点包含给定主题的消息(即已订阅的),并为这些客户端建立新连接(通过事件循环定时器)

PHP客户端接口

消息

消息由nsqphp\Message\Message类封装,并在代码中通过接口引用(因此你可以实现自己的)。

接口

public function getPayload();
public function getId();
public function getAttempts();
public function getTimestamp();

发布

客户端支持向N个nsqd服务器发布消息,这些服务器必须通过主机名显式指定。与订阅不同,没有通过nslookupd查找主机名的功能(而且我们可能也不希望这样,为了速度)。

最小方法

    $nsq = new nsqphp\nsqphp;
    $nsq->publishTo('localhost')
        ->publish('mytopic', new nsqphp\Message\Message('some message payload'));

如何对负载进行编码(例如:JSON)取决于你。

高可用性发布

    $nsq = new nsqphp\nsqphp;
    $nsq->publishTo(array('nsq1', 'nsq2', 'nsq3'), nsqphp\nsqphp::PUB_QUORUM)
        ->publish('mytopic', new nsqphp\Message\Message('some message payload'));

我们需要从publishTo nsqd守护进程中的法定多数响应来认为此操作成功(目前这是按顺序发生的)。这假设我在三个主机上有三个运行的nsqd,这些主机可以通过nsq1等访问。

这种技术将记录消息两次,这将需要在订阅时去重。

订阅

客户端支持从N个nsqd服务器订阅消息,每个服务器将从一或多个nslookupd服务器自动发现。这是通过nslookupd能够提供自动发现节点的列表来实现的,这些节点为给定主题托管消息。此功能使我们的客户端无需知道消息的位置。

因此,在订阅时,我们首先需要初始化我们的查找服务对象

    $lookup = new nsqphp\Lookup\Nsqlookupd;

或者也可以

    $lookup = new nsqphp\Lookup\Nsqlookupd('nsq1,nsq2');

然后我们可以用这个来订阅

    $lookup = new nsqphp\Lookup\Nsqlookupd;
    $nsq = new nsqphp\nsqphp($lookup);
    $nsq->subscribe('mytopic', 'somechannel', function($msg) {
        echo $msg->getId() . "\n";
        })->run();

警告:如果我们的回调抛出任何异常,这些设置将不会重试消息 - 继续阅读了解更多。

或者稍微模仿一下PHP (?)的风格

    $lookup = new nsqphp\Lookup\Nsqlookupd;
    $nsq = new nsqphp\nsqphp($lookup);
    $nsq->subscribe('mytopic', 'somechannel', 'msgCallback')
        ->run();

    function msgCallback($msg)
    {
        echo $msg->getId() . "\n";
    }

我们还可以订阅多个频道/流

    $lookup = new nsqphp\Lookup\Nsqlookup;
    $nsq = new nsqphp\nsqphp($lookup);
    $nsq->subscribe('mytopic', 'somechannel', 'msgCallback')
        ->subscribe('othertopic', 'somechannel', 'msgCallback')
        ->run();

重试失败的消息

PHP客户端会捕获回调中发生的任何抛出的异常,然后要么(a)重试,要么(b)丢弃消息。通常你不会想丢弃消息。

为了解决这个问题,我们需要一个重新入队策略 - 这是以实现nsqphp\RequeueStrategy\RequeueStrategyInterface的任何对象的形式。

    public function shouldRequeue(MessageInterface $msg);

客户端目前附带了一个;一个固定延迟策略。

    $requeueStrategy = new nsqphp\RequeueStrategy\FixedDelay;
    $lookup = new nsqphp\Lookup\Nsqlookupd;
    $nsq = new nsqphp\nsqphp($lookup, NULL, $requeueStrategy);
    $nsq->subscribe('mytopic', 'somechannel', 'msgCallback')
        ->run();

    function msgCallback($msg)
    {
        if (rand(1,3) == 1) {
            throw new \Exception('Argh, something bad happened');
        }
        echo $msg->getId() . "\n";
    }

订阅时的去重

回想一下,为了实现高可用性(HA),我们在两个不同的nsqd服务器上简单地进行了发布时的复制。为了进行去重,我们只需要提供一个实现nsqphp\Dedupe\DedupeInterface的对象。

public function containsAndAdd($topic, $channel, MessageInterface $msg);

PHP客户端提供了两种在订阅时去重消息的机制。两者都是基于bloom filter的相反。一个是使用PHP数组作为哈希表(因此绑定到单个进程);另一个调用Memcached,因此可以在多个进程之间共享数据结构。

我们可以这样使用

    $requeueStrategy = new nsqphp\RequeueStrategy\FixedDelay;
    $dedupe = new nsqphp\Dedupe\OppositeOfBloomFilterMemcached;
    $lookup = new nsqphp\Lookup\Nsqlookupd;
    $nsq = new nsqphp\nsqphp($lookup, $dedupe, $requeueStrategy);
    $nsq->subscribe('mytopic', 'somechannel', 'msgCallback')
        ->run();

    function msgCallback($msg)
    {
        if (rand(1,3) == 1) {
            throw new \Exception('Argh, something bad happened');
        }
        echo $msg->getId() . "\n";
    }

你可以在我的博客上了解更多关于去重的信息,但是以下内容值得记住

  • 使用Memcached去重,然后我们可以愉快地启动N个进程来订阅同一个主题和频道,并且只处理一次消息。
  • 去重不是保证的(实际上远非如此) - 提供的实现基于损失性哈希表,因此在性能上是概率性的。对于同时下推的事件,它们通常可以接受(并且可以根据内存使用和去重能力进行权衡)。
  • nsq的设计基于幂等订阅者的概念 - 例如,你的订阅者必须能够处理处理重复的消息(例如,写入Cassandra是一个能够很好地处理执行两次的系统)。

日志记录

最后一个可选的依赖项是一个日志记录器,形式为某些实现nsqphp\Logger\LoggerInterface的对象(据我所知,PHP没有提供标准日志接口)。

    public function error($msg);
    public function warn($msg);
    public function info($msg);
    public function debug($msg);

PHP客户端附带了一个日志记录器,将所有日志信息输出到STDERR。将这些内容组合起来,我们就会有一个类似于test-sub.php文件的东西。

    $requeueStrategy = new nsqphp\RequeueStrategy\FixedDelay;
    $dedupe = new nsqphp\Dedupe\OppositeOfBloomFilterMemcached;
    $lookup = new nsqphp\Lookup\Nsqlookupd;
    $logger = new nsqphp\Logger\Stderr;
    $nsq = new nsqphp\nsqphp($lookup, $dedupe, $requeueStrategy, logger);
    $nsq->subscribe('mytopic', 'somechannel', 'msgCallback')
        ->run();

    function msgCallback($msg)
    {
        if (rand(1,3) == 1) {
            throw new \Exception('Argh, something bad happened');
        }
        echo $msg->getId() . "\n";
    }

设计日志

  • 主客户端基于事件循环(由React PHP提供动力),使我们能够处理对多个nsqd实例的多个连接。