decrypted/php-amqplib

之前为 videlalvaro/php-amqplib。这是一个 AMQP 协议的纯 PHP 实现。它已经在 RabbitMQ 上进行了测试。

v3.4.0 2022-10-18 20:52 UTC

README

PHPUnit tests Latest Version on Packagist Total Downloads Software License

codecov Coverage Status Quality Score

这个库是 纯 PHP 实现的 AMQP 0-9-1 协议。它已经在 RabbitMQ 上进行了测试。

该库被用于 RabbitMQ in Action 的 PHP 示例和 官方 RabbitMQ 教程

请注意,该项目以 贡献者行为准则 发布。通过参与此项目,您同意遵守其条款。

项目维护者

感谢 videlalvaropostalservice14 创建 php-amqplib

该软件包现在由 Ramūnas DrongaLuke Bakken 以及 RabbitMQ 上的几个 VMware 工程师维护。

支持的 RabbitMQ 版本

从版本 2.0 开始,此库默认使用 AMQP 0.9.1,因此需要 RabbitMQ 2.0 或更高版本。通常,服务器升级不需要对应用程序代码进行任何更改,因为协议更改非常不频繁,但在升级之前请自行进行测试。

支持的 RabbitMQ 扩展

由于该库使用 AMQP 0.9.1,我们添加了对以下 RabbitMQ 扩展的支持

  • 交换机到交换机绑定
  • 基本否定
  • 发布者确认
  • 消费者取消通知

修改现有方法(如 备用交换机)的扩展也得到支持。

相关库

  • enqueue/amqp-lib 是一个与 amqp interop 兼容的包装器。

  • AMQProxy 是一个具有连接和通道池/重用的代理库。这允许在使用 php-amqplib 时减少连接和通道的损耗,从而降低 RabbitMQ 的 CPU 使用率。

设置

请确保已安装 composer,然后运行以下命令

$ composer require php-amqplib/php-amqplib

这将获取库及其依赖项,并将其放在您的 vendor 文件夹中。然后您可以将以下内容添加到您的 .php 文件中,以便使用该库

require_once __DIR__.'/vendor/autoload.php';

然后您需要使用相关的类,例如

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

用法

当 RabbitMQ 运行时,打开两个终端,在第一个终端上执行以下命令以启动消费者

$ cd php-amqplib/demo
$ php amqp_consumer.php

然后在另一个终端上执行以下操作

$ cd php-amqplib/demo
$ php amqp_publisher.php some text to publish

您应该看到消息到达另一个终端上的进程

然后,要停止消费者,向其发送 quit 消息

$ php amqp_publisher.php quit

如果您需要监听连接到 RabbitMQ 的套接字,请参阅非阻塞消费者的示例。

$ php amqp_consumer_non_blocking.php

变更日志

有关最近更改的更多信息,请参阅 CHANGELOG

API 文档

http://php-amqplib.github.io/php-amqplib/

教程

为了不重复,如果您想了解更多关于这个库的信息,请参阅官方RabbitMQ教程

更多示例

  • amqp_ha_consumer.php:演示了镜像队列的使用。
  • amqp_consumer_exclusive.phpamqp_publisher_exclusive.php:演示了使用专用队列的fanout交换。
  • amqp_consumer_fanout_{1,2}.phpamqp_publisher_fanout.php:演示了使用命名队列的fanout交换。
  • amqp_consumer_pcntl_heartbeat.php:演示了基于信号的 heartbeat 发送器使用。
  • basic_get.php:演示了使用基本获取 AMQP 调用来从队列中获取消息。

多个主机连接

如果您有一个您的应用程序可以连接的多个节点集群,您可以使用主机数组开始连接。为此,您应使用create_connection静态方法。

例如

$connection = AMQPStreamConnection::create_connection([
    ['host' => HOST1, 'port' => PORT, 'user' => USER, 'password' => PASS, 'vhost' => VHOST],
    ['host' => HOST2, 'port' => PORT, 'user' => USER, 'password' => PASS, 'vhost' => VHOST]
],
$options);

此代码将首先尝试连接到HOST1,如果第一个连接失败,则连接到HOST2。该方法返回第一个成功连接的连接对象。如果所有连接都失败,它将抛出最后一次连接尝试的异常。

有关更多示例,请参阅demo/amqp_connect_multiple_hosts.php

批量发布

假设您有一个生成大量消息的过程,这些消息将被发布到相同的exchange,使用相同的routing_key和选项如mandatory。然后您可以使用batch_basic_publish库功能。您可以按如下方式批量消息

$msg = new AMQPMessage($msg_body);
$ch->batch_basic_publish($msg, $exchange);

$msg2 = new AMQPMessage($msg_body);
$ch->batch_basic_publish($msg2, $exchange);

然后按如下方式发送批量

$ch->publish_batch();

何时发布消息批量?

假设我们的程序需要从文件中读取然后每行发布一条消息。根据消息的大小,您将不得不决定何时发送批量。您可以在50条消息后发送,或者每100条发送。这取决于您。

优化消息发布

另一种加快消息发布速度的方法是重用AMQPMessage消息实例。您可以创建新的消息如下

$properties = array('content_type' => 'text/plain', 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT);
$msg = new AMQPMessage($body, $properties);
$ch->basic_publish($msg, $exchange);

现在假设您想在未来的消息中更改消息体,但是您将保留相同的属性,也就是说,您的消息仍然是text/plain,并且delivery_mode仍然是AMQPMessage::DELIVERY_MODE_PERSISTENT。如果您为每条发布的消息创建一个新的AMQPMessage实例,那么这些属性将不得不重新编码在AMQP二进制格式中。您可以通过重用AMQPMessage并按如下方式重置消息体来避免所有这些

$msg->setBody($body2);
$ch->basic_publish($msg, $exchange);

截断大消息

AMQP对消息的大小没有限制;如果消费者接收到一个非常大的消息,PHP的内存限制可能会在调用传递给basic_consume的回调之前在库中达到。

为了避免这种情况,您可以在您的Channel实例上调用方法AMQPChannel::setBodySizeLimit(int $bytes)。超过此限制的正文大小将被截断,并带有AMQPMessage::$is_truncated标志设置为true的正文大小交付给您的回调。属性AMQPMessage::$body_size将反映接收到的消息的真实正文大小,这将比strlen(AMQPMessage::getBody())高,如果消息已被截断。

请注意,所有超出限制的数据都是从AMQP Channel中读取的,并立即丢弃,因此无法在您的回调中检索它。如果您有另一个可以处理具有更大有效负载的消息的消费者,您可以使用basic_rejectbasic_nack告诉服务器(它仍然有一个完整的副本)将其转发到死信交换。

默认情况下,不会发生截断。要在已启用截断的通道上禁用截断,请将0(或null)传递给AMQPChannel::setBodySizeLimit()

连接恢复

一些RabbitMQ客户端使用自动连接恢复机制来重新连接并恢复网络错误时的通道和消费者。

由于此客户端使用单线程,您可以使用异常处理机制设置连接恢复。

可能抛出的连接错误异常

PhpAmqpLib\Exception\AMQPConnectionClosedException
PhpAmqpLib\Exception\AMQPIOException
\RuntimeException
\ErrorException

可能会抛出其他异常,但连接仍然存在。在重新连接之前处理异常时,始终清理旧连接是一个好主意。

例如,如果您想设置一个恢复连接

$connection = null;
$channel = null;
while(true){
    try {
        $connection = new AMQPStreamConnection(HOST, PORT, USER, PASS, VHOST);
        // Your application code goes here.
        do_something_with_connection($connection);
    } catch(AMQPRuntimeException $e) {
        echo $e->getMessage();
        cleanup_connection($connection);
        usleep(WAIT_BEFORE_RECONNECT_uS);
    } catch(\RuntimeException $e) {
        cleanup_connection($connection);
        usleep(WAIT_BEFORE_RECONNECT_uS);
    } catch(\ErrorException $e) {
        cleanup_connection($connection);
        usleep(WAIT_BEFORE_RECONNECT_uS);
    }
}

完整的示例在demo/connection_recovery_consume.php中。

此代码将在每次发生异常时重新连接并重试应用程序代码。某些异常仍然可能被抛出,不应将其作为重新连接过程的一部分处理,因为它们可能是应用程序错误。

此方法主要用于消费者应用程序,生产者需要一些额外的应用程序代码以避免多次发布相同的消息。

这是一个最简单的示例,在实际应用程序中,您可能希望控制重试次数并可能优雅地降低重新连接的等待时间。

您可以在#444中找到一个更极端的示例。

UNIX信号

如果您已安装PCNTL扩展,则当消费者未处理消息时将处理信号的调度。

$pcntlHandler = function ($signal) {
    switch ($signal) {
        case \SIGTERM:
        case \SIGUSR1:
        case \SIGINT:
            // some stuff before stop consumer e.g. delete lock etc
            pcntl_signal($signal, SIG_DFL); // restore handler
            posix_kill(posix_getpid(), $signal); // kill self with signal, see https://www.cons.org/cracauer/sigint.html
        case \SIGHUP:
            // some stuff to restart consumer
            break;
        default:
            // do nothing
    }
};

pcntl_signal(\SIGTERM, $pcntlHandler);
pcntl_signal(\SIGINT,  $pcntlHandler);
pcntl_signal(\SIGUSR1, $pcntlHandler);
pcntl_signal(\SIGHUP,  $pcntlHandler);

要禁用此功能,只需将常量AMQP_WITHOUT_SIGNALS定义为true

<?php
define('AMQP_WITHOUT_SIGNALS', true);

... more code

基于信号的心跳

如果您已安装PCNTL扩展且使用PHP 7.1或更高版本,则可以注册基于信号的时钟发送器。

<?php

$sender = new PCNTLHeartbeatSender($connection);
$sender->register();
... code
$sender->unregister();

调试

如果您想了解协议级别的操作,请将以下常量添加到您的代码中

<?php
define('AMQP_DEBUG', true);

... more code

?>

基准测试

要运行发布/消费基准测试,请键入

$ make benchmark

测试

要成功运行测试,您需要首先在本地上运行一个RabbitMQ代理。然后,运行类似以下的测试

$ make test

贡献

有关详细信息,请参阅CONTRIBUTING

使用AMQP 0.8

如果您仍然想使用旧版本的协议,您可以在配置代码中设置以下常量来完成此操作

define('AMQP_PROTOCOL', '0.8');

默认值是'0.9.1'

提供您自己的自动加载器

如果您出于某种原因不想使用composer,则需要为库类提供一个自动加载器。有人报告使用此自动加载器已成功。

原始README

以下为原始README文件内容。归功于原始作者。

实现高级消息队列协议(AMQP)的PHP库。

该库是将py-amqplib的python代码http://barryp.org/software/py-amqplib/移植到PHP。

它已与RabbitMQ服务器进行了测试。

项目主页:http://code.google.com/p/php-amqplib/

有关讨论,请加入以下小组

http://groups.google.com/group/php-amqplib-devel

有关错误报告,请在项目页面上使用错误跟踪系统。

欢迎补丁!

作者:Vadim Zaliva lord@crocodile.org