troogle / php-amqplib
此库是AMQP协议的纯PHP实现。它已针对RabbitMQ进行了测试。
Requires
- php: >=5.3.0
- ext-bcmath: *
- ext-mbstring: *
Requires (Dev)
- phpunit/phpunit: ^3.7
- scrutinizer/ocular: ^1.1
Suggests
- ext-sockets: Use AMQPSocketConnection
This package is not auto-updated.
Last update: 2024-09-18 19:14:52 UTC
README
此分支具有心跳功能,其生命周期将持续到官方仓库批准心跳的pull请求。
此库是AMQP协议的纯PHP实现。它已针对RabbitMQ进行了测试。
需求:PHP 5.3,由于使用了命名空间
。
此库被用于RabbitMQ in Action和官方RabbitMQ教程中的PHP示例。
支持的RabbitMQ版本
从2.0版本开始,此库默认使用AMQP 0.9.1
,因此需要RabbitMQ 2.0或更高版本。您不需要更改代码,但在升级之前请进行测试。
支持的RabbitMQ扩展
由于此库使用AMQP 0.9.1
,我们添加了对以下RabbitMQ扩展的支持
- 交换到交换绑定
- 基本拒绝
- 发布者确认
- 消费者取消通知
修改现有方法(如替代交换
)的扩展也得到支持。
设置
将composer.json
文件添加到您的项目中
{ "require": { "videlalvaro/php-amqplib": "2.5.*" } }
然后,假设您已安装了composer,您可以运行以下命令
$ composer.phar install
这将获取库及其依赖项,并将其放在您的vendor文件夹中。然后,您可以在.php文件中添加以下内容以使用此库
require_once __DIR__.'/vendor/autoload.php';
然后,您需要使用相关的类,例如
use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage;
用法
当RabbitMQ正在运行时,打开两个终端,在第一个终端上执行以下命令以启动消费者
$ cd php-amqplib/demo
$ php amqp_consumer.php
然后在另一个终端上执行以下操作
$ cd php-amqplib/demo
$ php amqp_publisher.php some text to publish
您应该在另一个终端上的进程看到消息到达
然后,要停止消费者,向其发送quit
消息
$ php amqp_publisher.php quit
如果您需要监听连接到RabbitMQ的套接字,请参阅非阻塞消费者中的示例。
$ php amqp_consumer_non_blocking.php
变更日志
有关最近更改的更多信息,请参阅CHANGELOG。
教程
为了避免重复,如果您想了解更多关于此库的信息,请参阅官方RabbitMQ教程。
更多示例
amqp_ha_consumer.php
:演示了镜像队列的使用amqp_consumer_exclusive.php
和amqp_publisher_exclusive.php
:演示了使用独占队列的fanout交换amqp_consumer_fanout_{1,2}.php
和amqp_publisher_fanout.php
:演示了具有命名队列的fanout交换basic_get.php
:演示了使用基本获取 AMQP调用来从队列中获取消息。
批量发布
假设您有一个生成大量消息的过程,这些消息将被发布到同一个交换
,使用相同的路由键
和选项(如强制性
)。然后,您可以利用batch_basic_publish
库功能。您可以像这样批量处理消息
$msg = new AMQPMessage($msg_body); $ch->batch_basic_publish($msg, $exchange); $msg2 = new AMQPMessage($msg_body); $ch->batch_basic_publish($msg2, $exchange);
然后像这样发送批处理
$ch->publish_batch();
我们什么时候发布消息批次?
假设我们的程序需要从文件中读取,然后每行发布一条消息。根据消息大小,您需要决定何时发送批次。您可以每50条消息发送一次,或者每100条消息发送一次。这取决于您。
优化消息发布
另一种加快消息发布速度的方法是重用AMQPMessage
消息实例。您可以创建新的消息如下所示
$properties = array('content_type' => 'text/plain', 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT);
$msg = new AMQPMessage($body, $properties);
$ch->basic_publish($msg, $exchange);
现在假设您想更改未来消息的消息体,但保留相同的属性,即您的消息仍然是text/plain
,而delivery_mode
仍然是AMQPMessage::DELIVERY_MODE_PERSISTENT
。如果您为每条发布的消息创建一个新的AMQPMessage
实例,那么这些属性就需要在AMQP二进制格式中重新编码。您可以通过重用AMQPMessage
并按如下方式重置消息体来避免所有这些
$msg->setBody($body2); $ch->basic_publish($msg, $exchange);
截断大消息
AMQP不对消息的大小设置限制;如果消费者收到一个非常大的消息,PHP的内存限制可能会在调用传递给basic_consume
的回调之前在库中达到。
为了避免这种情况,您可以在您的Channel实例上调用方法AMQPChannel::setBodySizeLimit(int $bytes)
。超过此限制的消息体将被截断,并带有AMQPMessage::$is_truncated
标志设置为true
的回调交付。属性AMQPMessage::$body_size
将反映接收到的消息的真实体大小,如果消息已被截断,则将高于strlen(AMQPMessage::getBody())
。
请注意,超出限制的所有数据都将从AMQP通道中读取并立即丢弃,因此在您的回调中无法检索。如果您有另一个可以处理具有更大有效负载的消息的消费者,可以使用basic_reject
或basic_nack
告诉服务器(它仍然有一个完整的副本)将其转发到死信交换机。
默认情况下,不会发生截断。要禁用已启用截断的通道上的截断,请将0
(或null
)传递给AMQPChannel::setBodySizeLimit()
。
##UNIX信号##
如果您已安装PCNTL扩展,则在消费者未处理消息时将处理信号的分发。
$pcntlHandler = function ($signal) { switch ($signal) { case \SIGTERM: case \SIGUSR1: case \SIGINT: // some stuff before stop consumer e.g. delete lock etc exit(0); break; case \SIGHUP: // some stuff to restart consumer break; default: // do nothing } }; declare(ticks = 1) { pcntl_signal(\SIGTERM, $pcntlHandler); pcntl_signal(\SIGINT, $pcntlHandler); pcntl_signal(\SIGUSR1, $pcntlHandler); pcntl_signal(\SIGHUP, $pcntlHandler); }
要禁用此功能,只需将常量AMQP_WITHOUT_SIGNALS
定义为true
。
<?php define('AMQP_WITHOUT_SIGNALS', true); ... more code
调试
如果您想了解协议级别的操作情况,请将以下常量添加到您的代码中
<?php define('AMQP_DEBUG', true); ... more code ?>
基准测试
要运行发布/消费基准测试,请输入以下命令
$ make benchmark
测试
要成功运行测试,您需要首先设置测试user
和测试virtual host
。
您可以在启动RabbitMQ后运行以下命令来完成此操作
$ rabbitmqctl add_vhost phpamqplib_testbed $ rabbitmqctl add_user phpamqplib phpamqplib_password $ rabbitmqctl set_permissions -p phpamqplib_testbed phpamqplib ".*" ".*" ".*"
一旦您的环境设置完毕,您就可以按如下方式运行您的测试
$ make test
贡献
有关详细信息,请参阅CONTRIBUTING。
使用AMQP 0.8
如果您仍然想使用旧版本的协议,您可以在配置代码中设置以下常量来做到这一点
define('AMQP_PROTOCOL', '0.8');
默认值是'0.9.1'
。
提供您自己的自动加载器
如果您出于某些原因不想使用composer,那么您需要为库类提供一个自动加载器。人们已经报告使用此自动加载器成功。
原始README
以下为原始README文件内容。归功于原始作者。
实现高级消息队列协议(AMQP)的PHP库。
该库是py-amqplib(http://barryp.org/software/py-amqplib/)的Python代码的移植版本。
它已经在RabbitMQ服务器上进行了测试。
项目主页: http://code.google.com/p/php-amqplib/
如有讨论需求,请加入该群组
http://groups.google.com/group/php-amqplib-devel
如有错误报告,请使用项目页面上的错误跟踪系统。
非常欢迎提交补丁!
作者:Vadim Zaliva lord@crocodile.org