mirecx /
原名为 videlalvaro/php-amqplib。这是一个纯PHP实现的AMQP协议库。它已针对RabbitMQ进行了测试。
Requires
- php: >=5.3.0
- ext-bcmath: *
- ext-mbstring: *
Requires (Dev)
- phpdocumentor/phpdocumentor: ^2.9
- phpunit/phpunit: ^4.8
- scrutinizer/ocular: ^1.1
- squizlabs/php_codesniffer: ^2.5
Suggests
- ext-sockets: Use AMQPSocketConnection
Replaces
- videlalvaro/php-amqplib: v2.7.2
- dev-master / 2.7.x-dev
- v2.7.2
- v2.7.1
- v2.7.1-rc2
- v2.7.1-rc1
- v2.7.0
- v2.7.0-rc1
- v2.6.3
- v2.6.2
- v2.6.1
- v2.6.0
- v2.5.2
- v2.5.1
- v2.5.0
- v2.4.1
- v2.4.0
- v2.3.0
- v2.2.6
- v2.2.5
- v2.2.4
- v2.2.3
- v2.2.2
- v2.2.1
- v2.2.0
- v2.1.0
- v2.0.2
- v2.0.1
- v2.0.0
- v1.2.1
- v1.2.0
- v1.1
- dev-revert-460-HHVM-compat-bugfix
- dev-channel_connection_closed
This package is not auto-updated.
Last update: 2024-09-19 11:03:39 UTC
README
本库是一个纯PHP实现的 AMQP 0-9-1 协议。它已针对 RabbitMQ 进行了测试。
要求:PHP 5.3,因为使用了 命名空间。
要求:bcmath 和 mbstring 扩展 该库使用了 bcmath 和 mbstring PHP 扩展。安装步骤因PHP版本和底层操作系统而异。以下示例显示了如何在Ubuntu 15.10上添加现有的PHP安装
sudo apt-get install php7.0-mbstring sudo apt-get install php7.0-bcmath
该库被用于 RabbitMQ in Action 和 官方 RabbitMQ 教程 的 PHP 示例。
请注意,该项目发布时附带了一个 贡献者行为准则。通过参与本项目,您同意遵守其条款。
项目维护者
感谢 videlalvaro 和 postalservice14 对 php-amqplib 的辛勤维护!没有他们的努力,这个库不可能达到现在的水平。
该项目现在由 nubeiro 和几位在RabbitMQ和相关项目上工作的Pivotal工程师维护。
支持的 RabbitMQ 版本
从版本2.0开始,本库默认使用 AMQP 0.9.1,因此需要 RabbitMQ 2.0 或更高版本。通常,服务器升级不需要更改任何应用程序代码,因为协议变化非常少,但在升级之前请自行进行测试。
支持的 RabbitMQ 扩展
由于库使用了 AMQP 0.9.1,我们增加了对以下 RabbitMQ 扩展的支持
- 交换到交换绑定
- 基本否定
- 发布者确认
- 消费者取消通知
修改现有方法(如 备用交换)的扩展也得到支持。
相关库
- enqueue/amqp-lib 是一个与 amqp interop 兼容的包装器。
设置
请确保已安装 composer,然后运行以下命令
$ composer require php-amqplib/php-amqplib
这将获取库及其依赖项,并将其放置在您的 vendor 文件夹中。然后您可以将以下内容添加到您的 .php 文件中,以便使用该库
require_once __DIR__.'/vendor/autoload.php';
然后您需要使用相关的类,例如
use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage;
用法
当 RabbitMQ 运行时,打开两个终端,在第一个终端中执行以下命令以启动消费者
$ cd php-amqplib/demo
$ php amqp_consumer.php
然后在另一个终端中执行
$ cd php-amqplib/demo
$ php amqp_publisher.php some text to publish
您应该看到消息到达另一个终端上的进程
然后,要停止消费者,向其发送 quit 消息
$ php amqp_publisher.php quit
如果您需要监听连接到 RabbitMQ 的套接字,请参阅非阻塞消费者的示例。
$ php amqp_consumer_non_blocking.php
变更日志
请参阅CHANGELOG以获取有关最近更改的更多信息。
教程
为了避免重复,如果您想了解更多关于这个库的信息,请参阅官方RabbitMQ教程。
更多示例
amqp_ha_consumer.php:演示了镜像队列的使用。amqp_consumer_exclusive.php和amqp_publisher_exclusive.php:演示了使用专用队列的fanout交换。amqp_consumer_fanout_{1,2}.php和amqp_publisher_fanout.php:演示了具有命名队列的fanout交换。basic_get.php:演示了通过使用基本获取AMQP调用来从队列中获取消息。
批量发布
假设您有一个生成大量消息的过程,这些消息将被发布到相同的exchange,使用相同的routing_key和选项如mandatory。然后您可以使用batch_basic_publish库功能。您可以像这样批量发送消息
$msg = new AMQPMessage($msg_body); $ch->batch_basic_publish($msg, $exchange); $msg2 = new AMQPMessage($msg_body); $ch->batch_basic_publish($msg2, $exchange);
然后像这样发送批次
$ch->publish_batch();
何时发布消息批次?
假设我们的程序需要从文件中读取并逐行发布一条消息。根据消息的大小,您将不得不决定何时发送批次。您可以每50条消息发送一次,或者每100条。这取决于您。
优化消息发布
加快消息发布速度的另一种方法是重用AMQPMessage消息实例。您可以像这样创建新消息
$properties = array('content_type' => 'text/plain', 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT);
$msg = new AMQPMessage($body, $properties);
$ch->basic_publish($msg, $exchange);
现在假设您想在将来更改消息体,但会保持相同的属性,即您的消息仍然是text/plain,而delivery_mode仍然是AMQPMessage::DELIVERY_MODE_PERSISTENT。如果您为每条发布的消息创建一个新的AMQPMessage实例,那么这些属性将必须重新编码在AMQP二进制格式中。您可以通过重用AMQPMessage并像这样重置消息体来避免所有这些
$msg->setBody($body2); $ch->basic_publish($msg, $exchange);
截断大消息
AMQP不对消息大小施加限制;如果一个非常大的消息被消费者接收,PHP的内存限制可能会在调用传递给basic_consume的回调之前在库中达到。
为了避免这种情况,您可以在Channel实例上调用方法AMQPChannel::setBodySizeLimit(int $bytes)。超过此限制的正文大小将被截断,并通过将AMQPMessage::$is_truncated标志设置为true将其传递给您的回调。属性AMQPMessage::$body_size将反映接收到的消息的真实正文大小,如果消息已被截断,则将高于strlen(AMQPMessage::getBody())。
注意,超过限制的所有数据都将从AMQP通道中读取并立即丢弃,因此无法在回调中检索它。如果您有另一个可以处理具有更大有效负载的消息的消费者,您可以使用basic_reject或basic_nack来告诉服务器(它仍然有一个完整的副本)将其转发到死信交换。
默认情况下,不会发生截断。要禁用已启用的通道上的截断,将0(或null)传递给AMQPChannel::setBodySizeLimit()。
UNIX信号
如果您已安装PCNTL扩展,则在消费者未处理消息时将处理信号的分派。
$pcntlHandler = function ($signal) { switch ($signal) { case \SIGTERM: case \SIGUSR1: case \SIGINT: // some stuff before stop consumer e.g. delete lock etc pcntl_signal($signal, SIG_DFL); // restore handler posix_kill(posix_getpid(), $signal); // kill self with signal, see https://www.cons.org/cracauer/sigint.html case \SIGHUP: // some stuff to restart consumer break; default: // do nothing } }; pcntl_signal(\SIGTERM, $pcntlHandler); pcntl_signal(\SIGINT, $pcntlHandler); pcntl_signal(\SIGUSR1, $pcntlHandler); pcntl_signal(\SIGHUP, $pcntlHandler);
要禁用此功能,只需将常量AMQP_WITHOUT_SIGNALS定义为true
<?php define('AMQP_WITHOUT_SIGNALS', true); ... more code
调试
如果您想了解协议级别发生的事情,请将以下常量添加到您的代码中
<?php define('AMQP_DEBUG', true); ... more code ?>
基准测试
运行发布/消费基准测试类型
$ make benchmark
测试
要成功运行测试,您首先需要设置测试 用户 和测试 虚拟主机。
您可以在启动 RabbitMQ 后运行以下命令来完成此操作
$ rabbitmqctl add_vhost phpamqplib_testbed $ rabbitmqctl add_user phpamqplib phpamqplib_password $ rabbitmqctl set_permissions -p phpamqplib_testbed phpamqplib ".*" ".*" ".*"
一旦您的环境设置完毕,您就可以这样运行测试
$ make test
贡献
有关详细信息,请参阅 CONTRIBUTING
使用 AMQP 0.8
如果您仍然想使用旧版协议,则可以在您的配置代码中设置以下常量来实现
define('AMQP_PROTOCOL', '0.8');
默认值是 '0.9.1'。
提供自己的自动加载器
如果您出于某种原因不想使用 composer,那么您需要为库类提供一个自动加载器。有人 报告 使用此 自动加载器 成功。
原始 README
以下是原始 README 文件的内容。归功于原始作者。
实现高级消息队列协议 (AMQP) 的 PHP 库。
该库是 py-amqplib 的 python 代码的移植版本 http://barryp.org/software/py-amqplib/
已经与 RabbitMQ 服务器进行过测试。
项目主页: http://code.google.com/p/php-amqplib/
请加入以下小组进行讨论
http://groups.google.com/group/php-amqplib-devel
请使用项目页面上的错误跟踪系统提交错误报告。
欢迎提交补丁!
作者:Vadim Zaliva lord@crocodile.org