tanatoss/nsqphp

该软件包最新版本(dev-master)没有可用的许可信息。

NSQ 的 PHP 客户端

dev-master 2019-12-19 09:52 UTC

This package is auto-updated.

Last update: 2024-09-19 20:33:40 UTC


README

PHP 客户端用于 NSQ

注意

我不能再维护它了 - 对未处理的 PR 表示歉意。如果有人想接管,我很乐意转让所有权。通过 @davegardnerisme 发送给我消息。

NSQ 基础知识

您可以通过 Github 上的 readme 或通过描述它的 Bitly 博客文章 了解 NSQ 的全部内容。每个项目文件夹中提供了关于 nsqd、nsqlookupd 的更多详细信息。

我学到的一些东西

  • 集群仅提供在 nsqlookupd 将发现哪些机器托管特定主题的消息的意义上。要从集群中消费,您只需询问一个 nslookupd 实例在哪里可以找到消息,然后连接到它告诉您的每个 nsqd(这是 nsq 优点之一)。
  • 由于每个 nsqd 实例都是隔离的,因此 HA(对于发布)很容易;您可以简单地连接到 任何 并发布您的消息(我已经将其集成到客户端中)。
  • 通过简单地写入多个 nsqd 并在订阅时去重,提供弹性(我已经将其集成到客户端中)。
  • nsq 不是作为 工作队列(用于长期运行的任务)设计的。默认的 msg-timeout 设置为 60,000ms(60 秒)。这是 nsq 将自动认为消息失败并重新入队之前的时间。我们的“工作”应该比这少得多。此外,PHP 是一种阻塞语言,尽管我们使用的是非阻塞 IO 事件循环,但您在处理消息时所做的任何工作都会阻止客户端回复任何心跳等。

安装

您可以通过 composer 将 nsqphp 添加到您的项目中。只需将以下内容添加到您的 composer.json 中。

{
    ...
    "require": {
        ...
        "tanatoss/nsqphp": "dev-master"
    }
    ...
}

您也可以简单地将其克隆到您的项目中

git clone git://github.com/tanatoss/nsqphp.git
cd nsqphp
git submodule update --init --recursive

要在项目中使用 nsqphp,只需包含 bootstrap.php 文件,或通过 composer 设置自动加载。设计使其适用于依赖注入容器(所有依赖项都是通过构造函数注入),尽管您可以在使用时手动设置依赖项。

测试它

按照 入门指南 在本地主机上安装 nsq。

发布一些事件

php cruft/test-pub.php 10

在一个壳中启动一个订阅者

php cruft/test-sub.php mychannel > /tmp/processed-messages

然后在另一个壳中跟踪重定向的 STDOUT,以便您可以看到接收和处理的消息

tail -f /tmp/processed-messages

注意

在这些测试中,我首先发布,因为我还没有让客户端自动重新发现具有给定主题消息的节点;因此,如果您先订阅,则找不到具有该主题消息的节点。

其他测试

多个通道

博客文章描述了一个通道

| 每个通道都接收主题中所有消息的副本。实际上,通道映射到消费主题的下游服务。

因此,每个主题中的每条消息都将交付到每个通道。

启动两个具有不同通道的订阅者(每个壳中一个)

php cruft/test-sub.php mychannel
php cruft/test-sub.php otherchannel

发布一些消息

php cruft/test-pub.php 10

每条消息都将交付到每个通道。值得注意的是,API 允许您在同一个过程中订阅多个主题/通道。

多个nsqds

设置一组运行着 nsqdnsqlookupd 服务的服务器,主机名为 nsq1nsq2 ... 现在向这两个都发布大量消息。

php cruft/test-pub.php 10 nsq1
php cruft/test-pub.php 10 nsq2

现在进行订阅

php cruft/test-sub.php mychannel > /tmp/processed-messages

您将收到20条消息。

弹性投递

与之前的测试相同,但这次我们将同一消息投递到两个 nsqd 实例,然后在订阅时去重。

php cruft/test-pub.php 10 nsq1,nsq2
php cruft/test-sub.php mychannel > /tmp/processed-messages

这次您应该只收到 10条消息

待办事项

  • 使用回退策略重新入队失败的消息(目前仅支持简单的固定延迟重新入队策略)
  • 持续评估哪些节点包含特定主题的消息(即已订阅的主题)并为这些客户端建立新连接(通过事件循环定时器)

PHP客户端接口

消息

消息被封装在 nsqphp\Message\Message 类中,并在代码中通过接口引用(因此您可以自己实现)。

接口

public function getPayload();
public function getId();
public function getAttempts();
public function getTimestamp();

发布

客户端支持向 N 个 nsqd 服务器发布消息,必须通过主机名明确指定。与订阅不同,没有通过 nslookupd 查找主机名的功能(而且我们可能也不希望这么做以提高速度)。

最小化方法

    $nsq = new nsqphp\nsqphp;
    $nsq->publishTo('localhost')
        ->publish('mytopic', new nsqphp\Message\Message('some message payload'));

由您决定如何编码负载(例如:JSON)。

高可用性发布

    $nsq = new nsqphp\nsqphp;
    $nsq->publishTo(array('nsq1', 'nsq2', 'nsq3'), nsqphp\nsqphp::PUB_QUORUM)
        ->publish('mytopic', new nsqphp\Message\Message('some message payload'));

我们将需要 publishTo nsqd 守护进程的多数派响应来考虑此操作成功(目前这是按顺序发生的)。这假设我在三个主机上运行了3个 nsqd,这些主机可以通过 nsq1 等进行访问。

这种技术将导致消息被记录两次,这将需要在订阅时去重。

订阅

客户端支持从 N 个 nsqd 服务器订阅消息,每个服务器将从一个或多个 nslookupd 服务器自动发现。这种方式是,nslookupd 能够提供自动发现节点列表,这些节点托管特定主题的消息。此功能使我们的客户端无需知道消息在哪里。

因此,在订阅时,我们首先需要初始化我们的查找服务对象

    $lookup = new nsqphp\Lookup\Nsqlookupd;

或者

    $lookup = new nsqphp\Lookup\Nsqlookupd('nsq1,nsq2');

然后我们可以使用它进行订阅

    $lookup = new nsqphp\Lookup\Nsqlookupd;
    $nsq = new nsqphp\nsqphp($lookup);
    $nsq->subscribe('mytopic', 'somechannel', function($msg) {
        echo $msg->getId() . "\n";
        })->run();

警告:如果我们的回调抛出任何异常,则不会使用这些设置重试消息 - 请继续阅读以了解更多信息。

或者更符合PHP风格的写法

    $lookup = new nsqphp\Lookup\Nsqlookupd;
    $nsq = new nsqphp\nsqphp($lookup);
    $nsq->subscribe('mytopic', 'somechannel', 'msgCallback')
        ->run();

    function msgCallback($msg)
    {
        echo $msg->getId() . "\n";
    }

我们可以订阅多个频道/流

    $lookup = new nsqphp\Lookup\Nsqlookup;
    $nsq = new nsqphp\nsqphp($lookup);
    $nsq->subscribe('mytopic', 'somechannel', 'msgCallback')
        ->subscribe('othertopic', 'somechannel', 'msgCallback')
        ->run();

重试失败的消息

PHP客户端会捕获回调中发生的任何抛出的异常,然后要么(a)重试,要么(b)丢弃消息。通常您不会想丢弃消息。

为了解决这个问题,我们需要一个 重新入队策略 - 这是以实现 nsqphp\RequeueStrategy\RequeueStrategyInterface 的任何对象的形式。

    public function shouldRequeue(MessageInterface $msg);

客户端目前提供了一种,固定延迟策略

    $requeueStrategy = new nsqphp\RequeueStrategy\FixedDelay;
    $lookup = new nsqphp\Lookup\Nsqlookupd;
    $nsq = new nsqphp\nsqphp($lookup, NULL, $requeueStrategy);
    $nsq->subscribe('mytopic', 'somechannel', 'msgCallback')
        ->run();

    function msgCallback($msg)
    {
        if (rand(1,3) == 1) {
            throw new \Exception('Argh, something bad happened');
        }
        echo $msg->getId() . "\n";
    }

订阅时的去重

回想一下,为了实现高可用性,我们在发布时简单地复制到两个不同的 nsqd 服务器。为了执行去重,我们只需要提供一个实现 nsqphp\Dedupe\DedupeInterface 的对象。

public function containsAndAdd($topic, $channel, MessageInterface $msg);

PHP客户端为在订阅时去重消息提供了两种机制。这两种机制都是基于 与布隆过滤器相反的技术。一个维护一个PHP数组作为哈希表(因此绑定到单个进程);另一个调用Memcached,因此可以在多个进程之间共享数据结构。

我们可以这样使用它

    $requeueStrategy = new nsqphp\RequeueStrategy\FixedDelay;
    $dedupe = new nsqphp\Dedupe\OppositeOfBloomFilterMemcached;
    $lookup = new nsqphp\Lookup\Nsqlookupd;
    $nsq = new nsqphp\nsqphp($lookup, $dedupe, $requeueStrategy);
    $nsq->subscribe('mytopic', 'somechannel', 'msgCallback')
        ->run();

    function msgCallback($msg)
    {
        if (rand(1,3) == 1) {
            throw new \Exception('Argh, something bad happened');
        }
        echo $msg->getId() . "\n";
    }

您可以在我的博客上了解更多关于去重的内容 ,然而,以下几点值得注意

  • 使用Memcached去重后,我们可以愉快地启动N个进程来订阅同一个主题和频道,并且只处理一次消息。
  • 去重不可保证(实际上远非如此)——提供的实现基于有损哈希表,因此在性能上具有概率性。对于在同一时间输入的事件,它们通常表现可接受(并且可以通过权衡内存使用和去重能力进行调节)
  • nsq 的设计理念是基于幂等订阅者——例如:您的订阅者必须能够处理重复的消息(将数据写入 Cassandra 是一个很好地处理重复执行的系统示例)。

日志记录

最后一个可选的依赖项是一个日志记录器,以实现 nsqphp\Logger\LoggerInterface 的某些对象的形式(据我所知,PHP 没有提供标准日志接口)

    public function error($msg);
    public function warn($msg);
    public function info($msg);
    public function debug($msg);

PHP 客户端自带一个日志记录器,将所有日志信息输出到 STDERR。将所有这些组合在一起,我们就会得到类似 test-sub.php 文件的东西

    $requeueStrategy = new nsqphp\RequeueStrategy\FixedDelay;
    $dedupe = new nsqphp\Dedupe\OppositeOfBloomFilterMemcached;
    $lookup = new nsqphp\Lookup\Nsqlookupd;
    $logger = new nsqphp\Logger\Stderr;
    $nsq = new nsqphp\nsqphp($lookup, $dedupe, $requeueStrategy, logger);
    $nsq->subscribe('mytopic', 'somechannel', 'msgCallback')
        ->run();

    function msgCallback($msg)
    {
        if (rand(1,3) == 1) {
            throw new \Exception('Argh, something bad happened');
        }
        echo $msg->getId() . "\n";
    }

设计日志

  • 主客户端基于事件循环(由 React PHP 驱动)来实现,以便我们能够处理多个 nsqd 实例的多个连接