per3evere/nsqphp

NSQ的Laravel && Lumen PHP客户端

v0.1.3 2020-01-07 03:49 UTC

This package is auto-updated.

Last update: 2024-09-07 13:27:24 UTC


README

PHP客户端,用于NSQ

该客户端针对 Laravel 框架做了一些事情。

NSQ基础

您可以通过Github上的readme了解NSQ的所有信息,或者通过Bitly博客文章了解它。每个项目文件夹内都提供了关于nsqd、nsqlookupd的更多详细信息。

我所学到的

  • 集群仅在nsqlookupd能够发现特定主题的消息宿主机器的意义上提供。要从一个集群中消费,你只需询问一个实例的nslookupd消息在哪里,然后连接到它告诉你的每一个nsqd(这是nsq好的一个原因)。
  • 由于每个nsqd实例都是隔离的,因此HA(对于发布)很容易实现;你可以简单地将消息发布到任何一个实例(我已经将此功能集成到客户端中)。
  • 通过简单地写入多个nsqd并然后在订阅时去重,提供了弹性(我已经将此功能集成到客户端中)。
  • nsq不是作为工作队列(用于长时间运行的任务)开箱设计的。默认的msg-timeout设置为60,000ms(60秒)。这是nsq自动将消息视为失败并重新入队之前的时间。我们的“工作”应该比这少得多。此外,PHP是一种阻塞语言,尽管我们正在使用非阻塞IO事件循环,但你处理消息所做的任何工作都会阻塞客户端,使其无法回复任何心跳等。

安装

nsqphp可以通过composer添加到您的项目中。只需将以下内容添加到您的composer.json中。

{
    ...
    "require": {
        ...
        "per3evere/nsqphp": "dev-master"
    }
    ...
}

您也可以直接将其克隆到您的项目中

git clone git://github.com/persevereVon/nsqphp.git

要在项目中使用nsqphp,只需通过composer设置自动加载。设计本身适合依赖注入容器(所有依赖项都是构造函数注入),尽管您可以在使用时手动设置依赖项。

测试

按照入门指南在本地安装nsq。

发布一些事件

php cruft/test-pub.php 10

在一个shell中启动一个订阅者

php cruft/test-sub.php mychannel > /tmp/processed-messages

然后在另一个shell中跟踪重定向的STDOUT,以便您可以看到接收和处理的消息

tail -f /tmp/processed-messages

注意

在这些测试中,我首先发布,因为我还没有让客户端自动重新发现给定主题的消息宿主节点;因此,如果您首先订阅,则找不到具有主题消息的节点。

其他测试

多个通道

博客文章描述了通道

| 每个通道都会收到所有主题消息的副本。在实际中,通道映射到消费主题的下游服务。

因此,主题中的每条消息都将被交付到每个通道。

启动两个具有不同通道的订阅者(每个shell中一个)

php cruft/test-sub.php mychannel
php cruft/test-sub.php otherchannel

发布一些消息

php cruft/test-pub.php 10

每条消息都将被交付到每个通道。还值得注意的是,API允许您在同一个进程中订阅多个主题/通道。

多个nsqds

配置多个运行着 nsqdnsqlookupd 的服务器,主机名为 nsq1nsq2 ... 然后,向它们发布一批消息。

php cruft/test-pub.php 10 nsq1
php cruft/test-pub.php 10 nsq2

现在进行订阅

php cruft/test-sub.php mychannel > /tmp/processed-messages

您将收到 20 条消息。

弹性投递

与之前的测试相同,但这次我们将同一消息投递给两个 nsqd 实例,然后在订阅时去重。

php cruft/test-pub.php 10 nsq1,nsq2
php cruft/test-sub.php mychannel > /tmp/processed-messages

这次您应该只收到 10 条消息

待办事项

  • 使用退避策略重新排队失败的消息(目前只有简单的固定延迟重新排队策略)
  • 持续评估哪些节点包含特定主题的消息(即已订阅的主题),并为这些客户端建立新连接(通过事件循环计时器)

PHP 客户端接口

消息

消息由 Per3evere\Nsq\Message\Message 类封装,并在代码中通过接口引用(因此您可以实现自己的)。

接口

public function getPayload();
public function getId();
public function getAttempts();
public function getTimestamp();

发布

客户端支持将消息发布到 N 个 nsqd 服务器,必须通过主机名显式指定。与订阅不同,没有通过 nslookupd 查找主机名的功能(而且我们可能也不希望这样做以提高速度)。

最小化方法

    // 原生的方式
    $nsq = new Per3evere\Nsq\nsqphp;
    $nsq->publishTo('localhost')
        ->publish('mytopic', new Per3evere\Nsq\Message\Message('some message payload'));

    // Laravel 方式
    app('nsq')->publish('mytopic', new Per3evere\Nsq\Message\Message('some message payload'));

决定如何编码您的有效载荷(例如:JSON)由您自己决定。

高可用性发布

    $nsq = new Per3evere\Nsq\nsqphp;
    $nsq->publishTo(array('nsq1', 'nsq2', 'nsq3'), Per3evere\Nsq\nsqphp::PUB_QUORUM)
        ->publish('mytopic', new Per3evere\Nsq\Message\Message('some message payload'));

我们需要 publishTo nsqd 守护进程的多数派响应才能认为此操作成功(目前这是按顺序发生的)。这假设我在三个主机上运行了 3 个 nsqd,这些主机可通过 nsq1 等访问。

此技术将记录消息两次,这需要订阅时的去重。

订阅

客户端支持从 N 个 nsqd 服务器订阅,每个服务器将自动从一个或多个 nslookupd 服务器发现。这是通过 nslookupd 能够提供包含特定主题消息的自动发现节点列表来实现的。此功能使我们的客户端无需知道消息的位置。

因此,在订阅时,我们首先需要初始化我们的查找服务对象

    $lookup = new Per3evere\Nsq\Lookup\Nsqlookupd;

或者,另一种选择

    $lookup = new Per3evere\Nsq\Lookup\Nsqlookupd('nsq1,nsq2');

然后我们可以使用它来订阅

    $lookup = new Per3evere\Nsq\Lookup\Nsqlookupd;
    $nsq = new Per3evere\Nsq\nsqphp($lookup);
    $nsq->subscribe('mytopic', 'somechannel', function($msg) {
        echo $msg->getId() . "\n";
        })->run();

警告:如果我们的回调抛出任何异常,则不会使用这些设置重试这些消息 - 继续阅读以了解更多信息。

或者稍微有点像 PHP (?) 的风格

    $lookup = new Per3evere\Nsq\Lookup\Nsqlookupd;
    $nsq = new Per3evere\Nsq\nsqphp($lookup);
    $nsq->subscribe('mytopic', 'somechannel', 'msgCallback')
        ->run();

    function msgCallback($msg)
    {
        echo $msg->getId() . "\n";
    }

我们还可以订阅多个频道/流

    $lookup = new Per3evere\Nsq\Lookup\Nsqlookup;
    $nsq = new Per3evere\Nsq\nsqphp($lookup);
    $nsq->subscribe('mytopic', 'somechannel', 'msgCallback')
        ->subscribe('othertopic', 'somechannel', 'msgCallback')
        ->run();

Laravel 方式订阅

由于订阅处理可能有很多,但是放到一个文件中并不合理。我们可以创建一个代码目录来存放订阅类,该订阅类继承自 Per3evere\Nsq\Subscribetopic 属性对应订阅的主题,channel 属性对应订阅的频道,callback 方法对应回调方法。

例如,现在有两个订阅需求 SubscribeASubscribeB,首先建立这两个文件

app/Api/V1/Subscribes/SubscribeA.php:

<?php

namespace App\Api\V1\Subscribes;

use Per3evere\Nsq\Subscribe;
use Per3evere\Nsq\Message\Message;

class SubscribeA extends Subscribe
{
    /**
     * 订阅的主题.
     *
     * @var string
     */
    protected $topic = 'test';

    /**
     * 订阅的频道.
     *
     * @var string
     */
    protected $channel = 'ch';

    /**
     * 监听消息回调处理
     *
     * @return void
     */
    public function callback(Message $msg)
    {
        var_dump($msg);
    }
}

app/Api/V1/Subscribes/SubscribeB.php:

<?php

namespace App\Api\V1\Subscribes;

use Per3evere\Nsq\Subscribe;
use Per3evere\Nsq\Message\Message;

class SubscribeB extends Subscribe
{
    /**
     * 订阅的主题.
     *
     * @var string
     */
    protected $topic = 'test';

    /**
     * 订阅的频道.
     *
     * @var string
     */
    protected $channel = 'ch';

    /**
     * 监听消息回调处理
     *
     * @return void
     */
    public function callback(Message $msg)
    {
        var_dump($msg);
    }
}

然后需要在 nsq.php 配置文件中填写配置项:

    /*
    |--------------------------------------------------------------------------
    | 订阅类列表
    |--------------------------------------------------------------------------
    |
    | 所有需要启动的订阅类,需继承 Per3evere\Nsq\Subscribe 抽象类
    |
    */
    'subscribes' => [
        App\Api\V1\Subscribes\SubscribeA::class,
        App\Api\V1\Subscribes\SubscribeB::class,
    ],

最后直接执行 php artisan nsq,监听程序就开始正常执行了。

重试失败的消息

默认采用 Per3evere\Nsq\RequeueStrategy\FixedDelay 策略,最多尝试 5 次,每次延迟 2 秒。

PHP 客户端将捕获回调中抛出的任何异常,然后要么(a)重试,要么(b)丢弃消息。通常您不希望丢弃消息。

为了解决这个问题,我们需要一个 重试策略 - 这是以任何实现了 Per3evere\Nsq\RequeueStrategy\RequeueStrategyInterface 的对象的形式。

    public function shouldRequeue(MessageInterface $msg);

客户端目前附带了一个;固定延迟策略

    $requeueStrategy = new Per3evere\Nsq\RequeueStrategy\FixedDelay;
    $lookup = new Per3evere\Nsq\Lookup\Nsqlookupd;
    $nsq = new Per3evere\Nsq\nsqphp($lookup, NULL, $requeueStrategy);
    $nsq->subscribe('mytopic', 'somechannel', 'msgCallback')
        ->run();

    function msgCallback($msg)
    {
        if (rand(1,3) == 1) {
            throw new \Exception('Argh, something bad happened');
        }
        echo $msg->getId() . "\n";
    }

订阅时的去重

回想一下,为了实现高可用性,我们只需在发布时将消息复制到两个不同的 nsqd 服务器。为了执行去重,我们只需提供一个实现了 Per3evere\Nsq\Dedupe\DedupeInterface 的对象。

    public function containsAndAdd($topic, $channel, MessageInterface $msg);

PHP客户端提供了两种机制来在订阅时去重消息。这两种机制都基于与布隆过滤器相反的机制。一种机制维护一个哈希表作为PHP数组(因此绑定到单个进程);另一种机制调用Memcached,因此可以在多个进程之间共享数据结构。

我们可以这样使用它

    $requeueStrategy = new Per3evere\Nsq\RequeueStrategy\FixedDelay;
    $dedupe = new Per3evere\Nsq\Dedupe\OppositeOfBloomFilterMemcached;
    $lookup = new Per3evere\Nsq\Lookup\Nsqlookupd;
    $nsq = new Per3evere\Nsq\nsqphp($lookup, $dedupe, $requeueStrategy);
    $nsq->subscribe('mytopic', 'somechannel', 'msgCallback')
        ->run();

    function msgCallback($msg)
    {
        if (rand(1,3) == 1) {
            throw new \Exception('Argh, something bad happened');
        }
        echo $msg->getId() . "\n";
    }

您可以在我的博客上了解更多关于去重的信息,然而,以下内容值得注意

  • 使用Memcached去重后,我们可以愉快地启动N个进程来订阅相同的主题和频道,并且只处理一次消息。
  • 去重不能保证(事实上远非如此)——提供的实现基于有损哈希表,因此在性能上具有概率性。对于同时接收的事件,它们通常会表现出可接受(并且可以通过权衡内存使用和去重能力进行调整)
  • nsq的设计理念是幂等订阅者——例如:您的订阅者必须能够处理重复的消息(写入Cassandra是能够很好地处理执行两次的系统的一个例子)。

日志记录

最后一个可选的依赖项是记录器,形式为实现了Per3evere\Nsq\Logger\LoggerInterface的对象(据我所知,PHP没有提供标准记录器接口)

    public function error($msg);
    public function warn($msg);
    public function info($msg);
    public function debug($msg);

PHP客户端提供了一个记录器,将所有日志信息输出到STDERR。将这些内容结合起来,我们就有了一个类似于test-sub.php文件的东西

    $requeueStrategy = new Per3evere\Nsq\RequeueStrategy\FixedDelay;
    $dedupe = new Per3evere\Nsq\Dedupe\OppositeOfBloomFilterMemcached;
    $lookup = new Per3evere\Nsq\Lookup\Nsqlookupd;
    $logger = new Per3evere\Nsq\Logger\Stderr;
    $nsq = new Per3evere\Nsq\nsqphp($lookup, $dedupe, $requeueStrategy, logger);
    $nsq->subscribe('mytopic', 'somechannel', 'msgCallback')
        ->run();

    function msgCallback($msg)
    {
        if (rand(1,3) == 1) {
            throw new \Exception('Argh, something bad happened');
        }
        echo $msg->getId() . "\n";
    }

设计日志

  • 主要客户端基于事件循环(由React PHP提供动力)来允许我们处理多个nsqd实例的多个连接