goootlib/monster-mq

RabbitMQ 的 PHP 客户端

dev-master 2020-03-09 10:18 UTC

This package is auto-updated.

Last update: 2024-09-13 16:32:59 UTC


README

MonsterMQ 是 AMQP 客户端的 PHP 实现。它提供了方便且优雅的 API,以及消费者和生产者 AMQP 客户端变体。如果您已经熟悉 AMQP 和 RabbitMQ,请继续阅读本手册,否则我建议您从阅读这篇教程开始。

需求

MonsterMQ 针对使用 0.9.1 版本的 AMQP 协议,因此它支持所有支持 AMQP 0.9.1 的 RabbitMQ 版本。此外,MonsterMQ 需要 PHP 7.1 或更高版本。

安装

要安装库,请使用以下 composer 命令

composer require goootlib/monster-mq:dev-master

或者,您可以将 "goootlib/monster-mq":"dev-master" 添加为新的依赖项到您的 composer.json 文件的 require 部分,如下所示

{
    "require": {
        "other-vendor/other-package": "^5.4",
	"goootlib/monster-mq":"dev-master"
    }
}

然后调用 composer update 命令安装声明的依赖项。请注意 - composer update 还会更新您其他所有依赖项到最新版本。安装后,将 composer 自动加载器包含在您的脚本文件中,以便访问库类

require_once __DIR__.'/vendor/autoload.php';

使用

该库提供了两个类用于操作。生产者

$producer = new \MonsterMQ\Client\Producer();

和消费者

$consumer = new \MonsterMQ\Client\Consumer();

接下来,我们将检查它们提供的所有功能。此外,当在生产环境中使用 MonsterMQ 时,不要忘记将使用 MonsterMQ 的所有代码包含在 try/catch 构造中,并编写处理可能由库抛出的异常实例的代码。

常见功能

两个客户端变体都提供以下功能

  • 使用 TLS 或 TCP 建立网络连接。
  • 使用指定的用户名和密码建立会话。
  • 交换声明。
  • 队列声明。
  • 事件管理。

网络连接建立

为了通过 TCP 协议连接到指定的 RabbitMQ 服务器,首先创建生产者或消费者实例,然后调用带有 RabbitMQ 服务器 IP 地址和端口号的 connect() 方法。您还可以指定连接超时作为 connect() 方法的第三个参数。

$consumer->connect('127.0.0.1', 5672, 10);

您可以省略 connect() 方法的所有参数。在这种情况下,MonsterMQ 将尝试连接到本地的默认端口(5672)上的服务器。

配置网络 TCP 连接

如果您想配置您的网络连接,可以使用消费者或生产者实例的 network()socket() 方法。这两个方法互为别名,并提供对相同程序模块的访问。以下是他们提供的方法

$consumer->socket()->bindTo(9999, '127.0.0.1')->enableNodelay()->disableKeepalive()
  ->setTimeout($seconds, $microseconds)->connect();

bindTo($portNumber, $ipAddress) - 方法将 MonsterMQ 绑定到指定网络接口上的指定端口,如果希望将 MonsterMQ 绑定到任意端口号,请使用此方法。第二个参数(IP 地址)可以省略,在这种情况下,IP 地址将被自动选择。

enableNodelay() - 此方法禁用 Nagle's 算法。默认情况下是启用的。

disableKeepalive() - 此方法禁用 keepalive TCP 功能

setTimeout($seconds, $microseconds) - 此方法用于设置网络连接的读取/写入超时。您可以将方法的第一和第二个参数指定为整数,分别表示秒和微秒,之后如果没有在套接字上进行读取或写入,连接将被关闭。setTimeout()方法还允许传递一个参数,该参数可以是整数或浮点数,如果这个单一参数是浮点数,则它的分数部分将作为微秒处理,而小数点前的数字将作为秒处理。

配置加密网络连接

MonsterMQ 允许使用 TLS 协议进行加密连接。为了使用和配置它,请调用以下方法

$consumer->network()->useTLS()->verifyPeer()->verifyPeerName()->peerName($name)
  ->CA($pathToCAFile)->certificate($pathToCertificateFile)
  ->privateKey($pathToPrivateKey)->password($password)
  ->verifyDepth($number)->ciphers($ciphers)->connect('127.0.0.1', 5671);

请记住在您的 RabbitMQ 配置文件中指定您配置的 TLS 监听器的端口号。

useTLS() - 必须调用此方法以启用 TLS。

verifyPeer() - 为了启用对等方验证,必须调用此方法。如果您只想使用加密而不进行对等方验证,您可以跳过此方法调用。

verifyPeerName() - 可以调用此方法以启用对等方名称验证,如果调用此方法,您还必须通过调用 peerName() 方法指定对等方名称。如果您不想验证远程对等方证书的名称,则不需要调用此方法。

peerName($name) - 当启用对等方名称验证时,使用此方法指定对等方名称。如果您不想验证远程对等方证书的名称,或如果希望对等方名称将根据 connect() 方法的地址参数自动选择,则不需要调用此方法。

CA($pathToCAFile) - 使用此方法指定证书颁发机构文件的路径,以便能够建立 TLS 连接。

privateKey($pathToPrivateKey) - 使用此方法指定私钥文件的路径。

password($password) - 使用此方法指定创建您的证书时使用的密码。如果您的证书是在没有密码的情况下创建的,则不需要调用此方法。

verifyDepth($number) - 指定在验证中断后的证书链长度。

ciphers($list) - 设置用于连接的密码列表。可以通过 'openssl ciphers' 命令行命令获取系统支持的密码列表,该列表的格式为此方法接受。

要启用自签名证书的利用,请使用网络模块的 allowSelfSigned() 方法。

$consumer->network()->useTLS()->allowSelfSigneed()
  ->CA($pathToCAFile)->certificate($pathToCertificateFile)
  ->privateKey($pathToPrivateKey)->password($password)
  ->connect();

enableNodelay()setTimeout() 也可用于加密连接,而 TLS 没有保持活动状态功能。

会话建立

在消费者或生产者实例上使用 logIn() 方法以启动会话。此方法接受两个参数(可以省略),它们是您的 RabbitMQ 用户的用户名和密码。如果您省略登录参数,MonsterMQ 将使用 'guest' 作为用户名,对于密码则是默认 RabbitMQ 用户的凭据。

$consumer->connect('127.0.0.1', 5672);
$consumer->logIn('my-username', 'my-password');

使用会话模块的以下方法来配置会话

$consumer->session()->locale('en_US')->virtualHost('/')->logIn('my-username, 'my-password');

前面的方法允许您选择要使用的区域设置和虚拟主机。还应指出,如果您想连接到运行在 localhost 的 RabbitMQ(默认端口为 5672),您可以省略 connect() 方法的调用,只需调用 logIn() 方法即可建立 TCP 连接和会话。

通道

要更改所使用的通道,请调用消费者或生产者实例的 changeChannel() 方法

$consumer->changeChannel();
$consumer->changeChannel(2);

changeChannel($channel) 方法接受一个可选参数,即将要使用的通道号。如果您省略该参数,此方法将自动选择通道号并返回其值。如果指定的通道被服务器挂起,changeChannel($channel) 将返回 false。要关闭指定的通道,请使用通道号作为参数调用 closeChannel($channel) 方法。要获取当前正在使用的通道,请调用 currentChannel()

事件

在MonsterMQ和RabbitMQ的工作过程中,后者可以挂起或关闭过载的通道。为了处理这些事件,请使用事件模块的以下方法。

$producer->events()->channelSuspesion(
 
 function ($suspendedChannel) use ($producer) {
    echo "channel {$suspendedChannel} was suspended";
    $producer->changeChannel();
  }
 
 )->channelClosure(
 
  function ($closedChannel) use ($producer) {
    echo "channel {$closedChannel} was closed";
    $producer->changeChannel();
   }
 
 );

处理这些事件的关闭函数接受挂起或关闭的通道数量。

交换机

使用declare()方法,配合newDirectExchange($exchangeName)声明新的直接交换机,配合newFanoutExchange($exchangeName)声明新的扇出交换机,或者配合newTopicExchange($exchangeName)在客户端(消费者或生产者)实例上声明新的主题交换机。您还可以将交换机设置为持久化自动删除。持久化交换机在服务器重启时保持活动状态。非持久化交换机(临时交换机)在服务器重启时被清除。自动删除交换机在没有任何队列使用它们时将被删除。

$consumer->newDirectExchange('my-direct')->declare();
$consumer->newFanoutExchange('my-fanout')->setAutodelete()->declare();
$consumer->newTopicExchange('my-topic')->setDurable()->declare();

如果您希望将交换机绑定或解绑到另一个交换机,可以使用以下方法

$consumer->exchange('exchange-to-be-bound')->bind('my-exchange', 'routing-key');
$consumer->exchange('exchange-to-be-unbound')->unbind('my-exchange', 'routing-key');

如果绑定或解绑交换机,不要忘记将路由键作为bind()unbind()方法的第二个参数指定。

队列

为了声明队列,首先需要指定队列名称作为客户端(生产者或消费者)实例的queue()方法的第一个参数,然后调用声明方法。您还可以将队列设置为持久化自动删除独占。持久化队列在服务器重启后仍然存在,而非持久化队列(默认使用)则不会。自动删除队列在没有任何消费者使用时被删除。独占队列只能由当前连接访问,并且在连接关闭时被删除。

$consumer->queue('queue-1')->declare()->bind('my_direct', 'cba');
$consumer->queue('queue-2')->setDurable()->declare()->bind('my_topic','abc');
$consumer->queue('queue-3')->setAutodelete()->declare()->bind('my_direct', 'cab');
$consumer->queue('queue-4')->setExclusive()->declare()->bind('my_direct', 'bca');
$consumer->queue('queue-5')->setDurable()->declare()->bind('my_direct', 'bac');
$consumer->queue('queue-1')->unbind('my_direct', 'abc');

bind($exchange, $routingkey)方法将队列绑定到指定的交换机(第一个方法参数)和指定的路由键(第二个方法参数)。unbind($exchange, $routingKey)方法将队列从交换机解绑,并指定路由键。

$consumer->queue('queue-1')->deleteIfUnused();
$consumer->queue('queue-2')->deleteIfEmpty();
$consumer->queue('queue-3')->delete();
$consumer->queue('queue-4')->purge();

使用delete()deleteIfEmpty()deleteIfUnused()方法删除由queue()方法选定的队列。deleteIfEmpty()方法在队列不包含任何消息时删除队列。deleteIfUnused()方法在没有任何消费者使用该队列时删除队列。delete()方法在任何情况下都删除队列。您还可以使用purge()方法从队列中删除所有未等待确认的消息。这四个方法都返回在删除或净化过程中删除的消息数量。

生产者

使用生产者实例上的publish($message, $routingKey, $exchange)方法向队列发布消息。此方法的第二个参数(用于发布的路由键)可以省略,如果您已经调用了defaultRoutingKey($routingKey)来为所有没有路由键的发布设置默认路由键。除了设置默认路由键外,您还可以覆盖默认交换机,以便在省略publish()方法的第三个参数时使用。要覆盖默认的RabbitMQ交换机,请使用生产者实例上的overrideDefaultExchange($exchange)方法。

$producer->publish('with exchange and routing key specified', 'abc', 'my_direct');
$producer->overrideDefaultExchange('my_direct');
$producer->defaultRoutingKey('abc');
$producer->publish('with overridden exchange and default routing key set');
$producer->publish('with overridden exchange and default routing key set 2');

请注意,默认情况下,RabbitMQ提供默认交换机,该交换机将消息转发到名为发布时使用的路由键的队列。例如,如果您尚未使用overrideDefaultExchange()方法覆盖默认交换机,以下消息将被发送到名为'queue-1'和'queue-2'的队列。

$producer->publish('with default RabbitMQ's exchange', 'queue-1');
$producer->publish('with default RabbitMQ's exchange 2', 'queue-2');

消费者

$consumer->consume('my-queue', true);

使用 consume($queue, $noAck) 方法以开始从队列接收消息。此方法的第一个参数表示要接收消息的队列名称。第二个参数是可选的,如果省略,您将必须使用 ackLast()ackAll()rejectLast()rejectAll() 方法来确认或拒绝每个接收到的消息。如果 consume() 方法的(第二个)参数未设置为 true,则消息将保留在队列中,直到它们得到确认。如果不怕丢失消息,请使用 $noAck 参数。consume() 方法还返回 consumerTag,可用于与 stopConsume($consumerTag) 一起停止消费特定的队列。如果 stopConsume() 方法未带参数调用,它将停止消费当前正在使用的通道的所有队列。

开始消费循环

使用 wait($closure) 以异步方式从服务器接收消息来启动消费循环。此方法仅接受一个参数,即一个闭包,当收到消息时将被调用,该闭包接受两个参数:消息和已使用的通道号。

$consumer = new \MonsterMQ\Client\Consumer();
$consumer->logIn();
$consumer->queue('my-queue')->declare();
$consumer->consume('my-queue');
$consumer->wait(function ($message, $channel) use ($consumer){
   echo $message."\n";
   echo $channel."\n";
   $consumer->ackLast();
});

使用 ackLast() 确认最后一个已接受的消息,ackAll() 确认直到当前处理的消息(包括它)的所有未确认消息。rejectLast() 方法允许客户端拒绝最后一个传入的消息。它可以用来中断和取消大消息,或将无法处理的消息返回到原始队列。rejectAll() 拒绝直到当前处理的消息(包括它)的所有未确认消息。

服务质量

MonsterMQ 中的 prefetchCount($number) 方法允许您提前发送消息,这样当客户端处理完一个消息后,下一个消息已经保存在本地,而不是需要通过通道发送。此设置可以针对每个通道或每个消费者使用。

$producer->qos()->prefetchCount(10)->perConsumer()->apply();
$producer->qos()->prefetchCount(5)->perChannel()->apply();

消息重发

使用 redeliver($requeue) 方法以重发所有未确认的消息。它仅接受一个可选参数,如果省略,将重发消息到原始接收者。如果设置为 true,则尝试重新排队消息,可能将其发送到替代订阅者。

$consumer->wait(function ($message, $channel) use ($consumer){
  echo $message."\n";
  echo $channel."\n";
  $consumer->redeliver();
});

日志记录

如果您检查 MonsterMQ 的日志目录,您会发现按年份命名的目录,其中包含按月份命名的生产者日志文件。消费者不写入日志文件,而是将处理描述输出到 cli 输出。