wecc / php-amqplib
之前为 videlalvaro/php-amqplib。这个库是对 AMQP 协议的纯 PHP 实现。已经对 RabbitMQ 进行了测试。
Requires
- php: >=5.6.3
- ext-mbstring: *
- ext-sockets: *
- phpseclib/phpseclib: ^2.0.0
Requires (Dev)
- ext-curl: *
- nategood/httpful: ^0.2.20
- phpunit/phpunit: ^5.7|^6.5|^7.0
- squizlabs/php_codesniffer: ^2.5
Conflicts
- php: 7.4.0 - 7.4.1
Replaces
- videlalvaro/php-amqplib: v2.12.1
- dev-master / 2.12.x-dev
- v2.12.1
- v2.12.0
- v2.11.3
- v2.11.2
- v2.11.1
- v2.11.0
- v2.10.1
- v2.10.0
- v2.10.0-rc1
- v2.9.2
- v2.9.1
- v2.9.0
- v2.9.0-rc2
- v2.9.0-rc1
- v2.9.0-beta.1
- v2.8.2-rc3
- v2.8.2-rc2
- v2.8.2-rc1
- v2.8.1
- v2.8.1-rc3
- v2.8.1-rc2
- v2.8.1-rc1
- v2.8.0
- v2.8.0-rc1
- v2.7.3
- v2.7.2.1
- v2.7.2
- v2.7.1
- v2.7.1-rc2
- v2.7.1-rc1
- v2.7.0
- v2.7.0-rc1
- v2.6.3
- v2.6.2
- v2.6.1
- v2.6.0
- v2.5.2
- v2.5.1
- v2.5.0
- v2.4.1
- v2.4.0
- v2.3.0
- v2.2.6
- v2.2.5
- v2.2.4
- v2.2.3
- v2.2.2
- v2.2.1
- v2.2.0
- v2.1.0
- v2.0.2
- v2.0.1
- v2.0.0
- v1.2.1
- v1.2.0
- v1.1
- dev-dev-3.x
This package is auto-updated.
Last update: 2024-09-04 20:04:02 UTC
README
🚧 实验性 PHP 8 支持。尚未准备好用于生产。
php-amqplib
这个库是对 AMQP 0-9-1 协议 的纯 PHP 实现。已经对 RabbitMQ 进行了测试。
该库被用于 RabbitMQ in Action 的 PHP 例子和 官方 RabbitMQ 教程。
请注意,本项目发布了 贡献者行为准则。通过参与本项目,您同意遵守其条款。
项目维护者
感谢 videlalvaro 和 postalservice14 对 php-amqplib 进行维护的辛勤工作!没有他们的努力,这个库不可能达到现在的水平。
现在,该包由 nubeiro 和几位在 RabbitMQ 及相关项目上工作的 Pivotal 工程师维护。
支持的 RabbitMQ 版本
从版本 2.0 开始,该库默认使用 AMQP 0.9.1
,因此需要 RabbitMQ 2.0 或更高版本。通常,服务器升级不需要修改任何应用程序代码,因为协议变更非常不频繁,但请在升级之前自行进行测试。
支持的 RabbitMQ 扩展
由于该库使用 AMQP 0.9.1
,我们添加了对以下 RabbitMQ 扩展的支持
- 交换到交换的绑定
- 基本否定
- 发布者确认
- 消费者取消通知
修改现有方法(如 备选交换
)的扩展也得到支持。
相关库
-
enqueue/amqp-lib 是一个与 amqp interop 兼容的包装器。
-
AMQProxy 是一个带有连接和通道池/重用的代理库。这允许在使用 php-amqplib 时降低连接和通道的损耗,从而减少 RabbitMQ 的 CPU 使用率。
设置
确保您已安装 composer,然后运行以下命令
$ composer require php-amqplib/php-amqplib
这将从您的 vendor 文件夹中获取库及其依赖项。然后您可以将以下内容添加到您的 .php 文件中,以便使用该库
require_once __DIR__.'/vendor/autoload.php';
然后您需要使用相关的类,例如
use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage;
使用方法
当 RabbitMQ 运行时,打开两个终端,在第一个终端中执行以下命令以启动消费者
$ cd php-amqplib/demo
$ php amqp_consumer.php
然后在另一个终端中做
$ cd php-amqplib/demo
$ php amqp_publisher.php some text to publish
您应该会在另一个终端上的进程看到消息到来
要停止消费者,向其发送 quit
消息
$ php amqp_publisher.php quit
如果您需要监听连接到 RabbitMQ 的套接字,请参阅非阻塞消费者中的示例。
$ php amqp_consumer_non_blocking.php
变更日志
请参阅变更日志以了解最近有哪些变化。
API 文档
http://php-amqplib.github.io/php-amqplib/
教程
为了不重复,如果您想了解更多关于这个库的信息,请参阅官方 RabbitMQ 教程。
更多示例
amqp_ha_consumer.php
:演示了镜像队列的使用。amqp_consumer_exclusive.php
和amqp_publisher_exclusive.php
:演示了使用独占队列的 fanout 交换。amqp_consumer_fanout_{1,2}.php
和amqp_publisher_fanout.php
:演示了带有命名队列的 fanout 交换。amqp_consumer_pcntl_heartbeat.php
:演示了基于信号的心跳发送器的使用。basic_get.php
:演示了使用 basic get AMQP 调用从队列中获取消息。
多个主机连接
如果您有一个应用可以连接到多个节点的集群,您可以使用主机数组启动连接。为此,您应该使用 create_connection
静态方法。
例如
$connection = AMQPStreamConnection::create_connection([ ['host' => HOST1, 'port' => PORT, 'user' => USER, 'password' => PASS, 'vhost' => VHOST], ['host' => HOST2, 'port' => PORT, 'user' => USER, 'password' => PASS, 'vhost' => VHOST] ], $options);
此代码将首先尝试连接到 HOST1
,如果第一个连接失败,则连接到 HOST2
。该方法返回第一个成功连接的连接对象。如果所有连接都失败,它将抛出最后一次连接尝试的异常。
有关更多示例,请参阅 demo/amqp_connect_multiple_hosts.php
。
批量发布
假设您有一个生成大量消息的过程,这些消息将使用相同的 exchange
和相同的 routing_key
以及如 mandatory
之类的选项进行发布。然后您可以利用 batch_basic_publish
库功能。您可以像这样批量处理消息
$msg = new AMQPMessage($msg_body); $ch->batch_basic_publish($msg, $exchange); $msg2 = new AMQPMessage($msg_body); $ch->batch_basic_publish($msg2, $exchange);
然后像这样发送批量消息
$ch->publish_batch();
何时发布消息批量?
假设我们的程序需要从文件中读取然后按行发布一条消息。根据消息大小,您将不得不决定何时发送批量。您可以每50条消息发送一次,或者每100条。这取决于您。
优化消息发布
加快消息发布速度的另一种方法是重用 AMQPMessage
消息实例。您可以像这样创建新消息
$properties = array('content_type' => 'text/plain', 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT); $msg = new AMQPMessage($body, $properties); $ch->basic_publish($msg, $exchange);
现在假设您在想要更改未来消息的消息体时,将保持相同的属性,即您的消息仍然是 text/plain
,而 delivery_mode
仍然是 AMQPMessage::DELIVERY_MODE_PERSISTENT
。如果您为每个发布的消息创建一个新的 AMQPMessage
实例,那么这些属性将必须重新编码在 AMQP 二进制格式中。您可以通过重用 AMQPMessage
并像这样重置消息体来避免所有这些
$msg->setBody($body2); $ch->basic_publish($msg, $exchange);
截断大型消息
AMQP 对消息大小没有限制;如果消费者收到一个非常大的消息,PHP 的内存限制可能在这个库调用 basic_consume
传递的回调之前就被达到。
为了避免这种情况,您可以在您的 Channel 实例上调用方法 AMQPChannel::setBodySizeLimit(int $bytes)
。超出此限制的消息体将被截断,并带有 AMQPMessage::$is_truncated
标志设置为 true
的消息传递给您的回调。属性 AMQPMessage::$body_size
将反映接收到的消息的真实大小,如果消息被截断,它将高于 strlen(AMQPMessage::getBody())
。
注意,所有超过限制的数据都将从 AMQP Channel 中读取并立即丢弃,因此您无法在回调中检索它。如果您有另一个可以处理更大有效负载的消息的消费者,您可以使用 basic_reject
或 basic_nack
告诉服务器(它仍然有一个完整的副本)将其转发到死信交换。
默认情况下,不会发生截断。要禁用已启用截断的 Channel 上的截断,请将 0
(或 null
)传递给 AMQPChannel::setBodySizeLimit()
。
连接恢复
一些RabbitMQ客户端使用自动连接恢复机制,在网络错误发生时重新连接并恢复通道和消费者。
由于该客户端使用单线程,您可以通过异常处理机制设置连接恢复。
可能抛出的连接错误异常
PhpAmqpLib\Exception\AMQPConnectionClosedException PhpAmqpLib\Exception\AMQPIOException \RuntimeException \ErrorException
可能还会抛出其他异常,但连接仍然存在。在重新连接之前,清理旧连接始终是一个好主意。
例如,如果您想设置一个恢复连接
$connection = null; $channel = null; while(true){ try { $connection = new AMQPStreamConnection(HOST, PORT, USER, PASS, VHOST); // Your application code goes here. do_something_with_connection($connection); } catch(AMQPRuntimeException $e) { echo $e->getMessage(); cleanup_connection($connection); usleep(WAIT_BEFORE_RECONNECT_uS); } catch(\RuntimeException $e) { cleanup_connection($connection); usleep(WAIT_BEFORE_RECONNECT_uS); } catch(\ErrorException $e) { cleanup_connection($connection); usleep(WAIT_BEFORE_RECONNECT_uS); } }
完整的示例在 demo/connection_recovery_consume.php
中。
此代码将在每次异常发生时重新连接并重试应用程序代码。某些异常仍然可能会抛出,并且不应作为重新连接过程的一部分进行处理,因为它们可能是应用程序错误。
这种方法主要适用于消费者应用程序,生产者将需要一些额外的应用程序代码来避免多次发布相同的消息。
这是一个最简单的示例,在现实生活中的应用程序中,您可能想要控制重试次数并可能优雅地降低重新连接的等待时间。
您可以在 #444 中找到更多示例。
UNIX信号
如果您已安装 PCNTL扩展,则在消费者未处理消息时将处理信号的派发。
$pcntlHandler = function ($signal) { switch ($signal) { case \SIGTERM: case \SIGUSR1: case \SIGINT: // some stuff before stop consumer e.g. delete lock etc pcntl_signal($signal, SIG_DFL); // restore handler posix_kill(posix_getpid(), $signal); // kill self with signal, see https://www.cons.org/cracauer/sigint.html case \SIGHUP: // some stuff to restart consumer break; default: // do nothing } }; pcntl_signal(\SIGTERM, $pcntlHandler); pcntl_signal(\SIGINT, $pcntlHandler); pcntl_signal(\SIGUSR1, $pcntlHandler); pcntl_signal(\SIGHUP, $pcntlHandler);
要禁用此功能,只需定义常量 AMQP_WITHOUT_SIGNALS
为 true
<?php define('AMQP_WITHOUT_SIGNALS', true); ... more code
基于信号的Heartbeat
如果您已安装 PCNTL扩展 并使用PHP 7.1或更高版本,则可以注册基于信号的 heartbeat 发送器。
<?php $sender = new PCNTLHeartbeatSender($connection); $sender->register(); ... code $sender->unregister();
调试
如果您想了解协议层面的情况,请将以下常量添加到您的代码中
<?php define('AMQP_DEBUG', true); ... more code ?>
基准测试
要运行发布/消费基准测试,请输入
$ make benchmark
测试
要成功运行测试,您首先需要在本地运行一个RabbitMQ代理。然后,运行测试,如下所示
$ make test
贡献
有关详细信息,请参阅 CONTRIBUTING
使用AMQP 0.8
如果您仍然想使用旧版本的协议,则可以在配置代码中设置以下常量来实现。
define('AMQP_PROTOCOL', '0.8');
默认值为 '0.9.1'
。
提供自己的自动加载器
如果您出于某种原因不想使用composer,则需要为库类提供自动加载器。人们已经 报告 使用此 自动加载器 成功。
原始README
以下是原始README文件的内容。归功于原始作者。
实现高级消息队列协议(AMQP)的PHP库。
该库是py-amqplib(http://barryp.org/software/py-amqplib/)的python代码的移植。
它已在RabbitMQ服务器上进行了测试。
项目主页: http://code.google.com/p/php-amqplib/
有关讨论,请加入该组
http://groups.google.com/group/php-amqplib-devel
有关错误报告,请使用项目页面上的错误跟踪系统。
欢迎补丁!
作者:Vadim Zaliva lord@crocodile.org