icyboy / nsqphp
NSQ 的 PHP 客户端
Requires
- php: >=5.3.0
- monolog/monolog: ~1.0
- react/react: >=0.2.1
This package is not auto-updated.
Last update: 2024-09-15 04:47:20 UTC
README
PHP 客户端用于 NSQ。
NSQ 基础知识
您可以通过 GitHub 上的说明 或通过描述它的 Bitly 博客文章 来了解有关 NSQ 的所有信息。关于 nsqd、nsqlookupd 的更多详细信息可在项目中的每个文件夹内找到。
我所学到的
- 仅从 nsqlookupd 发现特定主题的消息宿主机的角度提供了集群功能。要从集群中消费,您只需询问一个
nslookupd
实例消息在哪里,然后连接到它告诉您的每个nsqd
(这是使 nsq 优秀的原因之一)。 - 由于每个
nsqd
实例都是隔离的,因此 pub 的 HA(高可用性)非常容易;您可以简单地连接到 任何 并发送消息(我已经将其构建到客户端中)。 - 通过简单地写入多个
nsqd
并在订阅时去重提供容错性(我已经将其构建到客户端中)。 - nsq 不是作为 工作队列(用于长时间运行的任务)而设计的。默认的
msg-timeout
设置为 60,000ms(60 秒)。这是 nsq 在将其自动视为失败并重新排队之前所等待的时间。我们的“工作”应该比这短得多。此外,PHP 是一种阻塞语言,尽管我们正在使用非阻塞 IO 事件循环,但您在处理消息时所做的任何工作都将阻止客户端能够回复任何心跳等。
安装
nsqphp
可通过 composer 添加到您的项目中。只需将以下内容添加到您的 composer.json 文件中。
{
...
"require": {
...
"icyboy/nsqphp": "dev-master"
}
...
}
您也可以简单地将其克隆到您的项目中。
git clone git://github.com/icyxp/nsqphp.git
cd nsqphp
git submodule update --init --recursive
要在项目中使用 nsqphp
,只需包含 bootstrap.php
文件,或通过 composer 设置自动加载。设计本身适用于依赖注入容器(所有依赖项都是构造函数注入),尽管您可以在使用时手动设置依赖项。
尝试使用它
按照 入门指南 在本地主机上安装 nsq。
发布一些事件
php demo/test-pub.php 10
在一个 shell 中启动一个订阅者
php demo/test-sub.php mychannel > /tmp/processed-messages
然后在另一个 shell 中跟踪重定向的 STDOUT,以便您可以查看接收和处理的消息
tail -f /tmp/processed-messages
注意
在这些测试中,我首先发布消息,因为我还没有让客户端自动重新发现具有给定主题消息的节点;因此,如果您首先订阅,则找不到具有该主题消息的节点。
其他测试
多个通道
博客文章描述了一个通道
| 每个通道都接收主题的所有消息的副本。在实际应用中,通道映射到消费主题的下游服务。
因此,每个 topic
中的消息都将发送到每个 channel
。
启动两个带有不同通道的订阅者(每个 shell 中一个)
php demo/test-sub.php mychannel
php demo/test-sub.php otherchannel
发布一些消息
php demo/test-pub.php 10
每条消息都将发送到每个频道。值得注意的是,API 允许您在同一个过程中订阅多个主题/频道。
多个 nsqds
设置一些运行 nsqd
和 nsqlookupd
的服务器,主机名分别为 nsq1
、nsq2
... 现在向它们发送大量消息
php demo/test-pub.php 10 nsq1
php demo/test-pub.php 10 nsq2
现在订阅
php demo/test-sub.php mychannel > /tmp/processed-messages
您将收到 20 条消息。
弹性投递
与前一个测试相同,但这次我们将相同的消息发送到两个 nsqd
实例,然后在订阅时进行去重。
php demo/test-pub.php 10 nsq1,nsq2
php demo/test-sub.php mychannel > /tmp/processed-messages
这次您应该只收到 10 条消息。
待办事项
- 使用回退策略重新排队失败的消息(目前只有简单的固定延迟重新排队策略)
- 持续评估哪些节点包含特定主题的消息(即已订阅的主题),并为这些客户端(通过事件循环计时器)建立新的连接
PHP 客户端界面
消息
消息由 nsqphp\Message\Message 类封装,并在代码中通过接口引用(因此您可以自己实现)。
接口
public function getPayload();
public function getId();
public function getAttempts();
public function getTimestamp();
发布
客户端支持向 N 个 nsqd
服务器发布,必须通过主机名显式指定。与订阅不同,没有通过 nslookupd
查找主机名的功能(我们可能也不想这样做,因为速度更快)。
最小化方法
$nsq = new nsqphp\nsqphp; $nsq->publishTo('localhost') ->publish('mytopic', new nsqphp\Message\Message('some message payload'));
您需要决定如何/是否对有效负载进行编码(例如:JSON)。
高可用性发布
$nsq = new nsqphp\nsqphp; $nsq->publishTo(array('nsq1', 'nsq2', 'nsq3'), nsqphp\nsqphp::PUB_QUORUM) ->publish('mytopic', new nsqphp\Message\Message('some message payload'));
我们需要 publishTo
nsqd 守护进程的多数派响应才能认为此操作成功(目前这是按顺序发生的)。假设我在三个主机上运行了 3 个 nsqd
,这些主机可以通过 nsq1
等进行访问。
这种技术将会记录两次消息,这需要在订阅时进行去重。
订阅
客户端支持从 N 个 nsqd
服务器订阅,每个服务器将自动从一个或多个 nslookupd
服务器发现。这是通过 nslookupd
能够提供一个列表来实现的,该列表包含为给定主题托管消息的自动发现节点。此功能将我们的客户端与知道在哪里找到消息的需求解耦。
因此,在订阅时,我们首先需要初始化我们的查找服务对象
$lookup = new nsqphp\Lookup\Nsqlookupd;
或者,也可以
$lookup = new nsqphp\Lookup\Nsqlookupd('nsq1,nsq2');
然后我们可以使用这个来订阅
$lookup = new nsqphp\Lookup\Nsqlookupd; $nsq = new nsqphp\nsqphp($lookup); $nsq->subscribe('mytopic', 'somechannel', function($msg) { echo $msg->getId() . "\n"; })->run();
警告:如果我们的回调抛出任何异常,则不会使用这些设置重新尝试消息 - 请继续阅读以了解更多信息。
或者有点像 PHP 的风格
$lookup = new nsqphp\Lookup\Nsqlookupd; $nsq = new nsqphp\nsqphp($lookup); $nsq->subscribe('mytopic', 'somechannel', 'msgCallback') ->run(); function msgCallback($msg) { echo $msg->getId() . "\n"; }
我们还可以订阅多个频道/流
$lookup = new nsqphp\Lookup\Nsqlookup; $nsq = new nsqphp\nsqphp($lookup); $nsq->subscribe('mytopic', 'somechannel', 'msgCallback') ->subscribe('othertopic', 'somechannel', 'msgCallback') ->run();
重试失败的消息
PHP 客户端将捕获回调中抛出的任何异常,然后要么(a)重试,要么(b)丢弃消息。通常您不会想要丢弃消息。
为了解决这个问题,我们需要一个 重新排队策略 - 这是以实现 nsqphp\RequeueStrategy\RequeueStrategyInterface
的任何对象的形式。
public function shouldRequeue(MessageInterface $msg);
客户端目前附带一个;固定延迟策略
$requeueStrategy = new nsqphp\RequeueStrategy\FixedDelay; $lookup = new nsqphp\Lookup\Nsqlookupd; $nsq = new nsqphp\nsqphp($lookup, NULL, $requeueStrategy); $nsq->subscribe('mytopic', 'somechannel', 'msgCallback') ->run(); function msgCallback($msg) { if (rand(1,3) == 1) { throw new \Exception('Argh, something bad happened'); } echo $msg->getId() . "\n"; }
订阅时的去重
回想一下,为了实现高可用性,我们只需将发布到两个不同的 nsqd
服务器上即可。为了执行去重,我们只需要提供一个实现 nsqphp\Dedupe\DedupeInterface
的对象。
public function containsAndAdd($topic, $channel, MessageInterface $msg);
PHP 客户端为在订阅时去重消息提供了两种机制。这两种机制都是基于 与 bloom 过滤器相反的技术。一个是维护一个 PHP 数组作为哈希表(因此绑定到单个进程);另一个调用 Memcached,因此可以在许多进程之间共享数据结构。
我们可以这样使用
$requeueStrategy = new nsqphp\RequeueStrategy\FixedDelay; $dedupe = new nsqphp\Dedupe\OppositeOfBloomFilterMemcached; $lookup = new nsqphp\Lookup\Nsqlookupd; $nsq = new nsqphp\nsqphp($lookup, $dedupe, $requeueStrategy); $nsq->subscribe('mytopic', 'somechannel', 'msgCallback') ->run(); function msgCallback($msg) { if (rand(1,3) == 1) { throw new \Exception('Argh, something bad happened'); } echo $msg->getId() . "\n"; }
您可以在我的博客上了解更多关于去重的内容:点击这里查看,但以下内容值得您牢记
- 使用Memcached去重,我们可以开心地启动N个进程来订阅相同的主题和频道,并且只需处理一次消息。
- 去重不是保证的(实际上远非如此)——所提供的实现基于一种损失性哈希表,因此其性能具有概率性。对于在同一时间下发的事件,它们通常会表现出可接受的表现(并且可以通过权衡内存使用和去重能力进行调节)。
- nsq的设计理念是基于幂等订阅者——例如:您的订阅者必须能够处理重复消息的处理(例如,将数据写入Cassandra的系统就是一个很好地处理重复执行的例子)。
日志记录
最后的可选依赖项是一个日志记录器,它以实现了nsqphp\Logger\LoggerInterface
的对象形式存在(据我所知,PHP没有提供标准的日志接口)。
public function error($msg); public function warn($msg); public function info($msg); public function debug($msg);
PHP客户端附带了一个日志记录器,它会将所有日志信息输出到STDERR。将这些内容组合起来,我们会得到类似于test-sub.php
文件的内容。
$requeueStrategy = new nsqphp\RequeueStrategy\FixedDelay; $dedupe = new nsqphp\Dedupe\OppositeOfBloomFilterMemcached; $lookup = new nsqphp\Lookup\Nsqlookupd; $logger = new nsqphp\Logger\Stderr; $nsq = new nsqphp\nsqphp($lookup, $dedupe, $requeueStrategy, $logger); $nsq->subscribe('mytopic', 'somechannel', 'msgCallback') ->run(); function msgCallback($msg) { if (rand(1,3) == 1) { throw new \Exception('Argh, something bad happened'); } echo $msg->getId() . "\n"; }
设计日志
- 主要客户端基于事件循环(由React PHP提供动力),允许我们处理多个
nsqd
实例的多个连接。