romainrg / codeigniter-php-amqplib

这个库是AMQP协议的纯PHP实现。它已经过与RabbitMQ的测试。

安装次数: 14,686

依赖者: 1

建议者: 0

安全: 0

星星: 1

关注者: 2

分支: 1,027

类型:codeigniter-third-party

v2.6.2 2015-12-23 12:21 UTC

README

Latest Version on Packagist Software License Build Status Coverage Status Quality Score Total Downloads

这个库是AMQP协议的纯PHP实现。它已经过与RabbitMQ的测试。

要求: PHP 5.3,因为使用了命名空间

这个库被用于RabbitMQ in Action的PHP示例和官方RabbitMQ教程

支持的RabbitMQ版本

从2.0版本开始,这个库默认使用AMQP 0.9.1,因此需要RabbitMQ 2.0或更高版本。您不需要更改代码,但在升级前请进行测试。

支持的RabbitMQ扩展

由于这个库使用了AMQP 0.9.1,我们添加了对以下RabbitMQ扩展的支持

  • Exchange到Exchange绑定
  • 基本Nack
  • 发布者确认
  • 消费者取消通知

修改现有方法(如替代交换)的扩展也得到支持。

设置

composer.json文件添加到您的项目中

{
  "require": {
      "videlalvaro/php-amqplib": "2.5.*"
  }
}

然后假设您已安装composer,您可以运行以下命令

$ composer.phar install

这将从您的vendor文件夹中获取库及其依赖项。然后您可以将以下内容添加到您的.php文件中,以便使用该库

require_once __DIR__.'/vendor/autoload.php';

然后您需要使用相关的类,例如

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

使用

在RabbitMQ运行时,打开两个终端,在第一个终端中执行以下命令以启动消费者

$ cd php-amqplib/demo
$ php amqp_consumer.php

然后在另一个终端中执行

$ cd php-amqplib/demo
$ php amqp_publisher.php some text to publish

您应该会在另一个终端上的进程中看到消息到达

然后要停止消费者,向它发送quit消息

$ php amqp_publisher.php quit

如果您需要监听连接到RabbitMQ的套接字,请参阅非阻塞消费者的示例。

$ php amqp_consumer_non_blocking.php

变更日志

有关最近更改的更多信息,请参阅CHANGELOG

教程

为了避免重复,如果您想了解有关此库的更多信息,请参阅官方RabbitMQ教程

更多示例

  • amqp_ha_consumer.php:演示了使用镜像队列的使用
  • amqp_consumer_exclusive.phpamqp_publisher_exclusive.php:演示了使用独占队列的fanout交换
  • amqp_consumer_fanout_{1,2}.phpamqp_publisher_fanout.php:演示了带有命名队列的fanout交换
  • basic_get.php:演示了使用basic get AMQP调用从队列中获取消息

批量发布

假设您有一个生成大量消息的过程,这些消息将使用相同的exchange和相同的routing_key以及选项如mandatory发布到同一地方。然后您可以使用batch_basic_publish库功能。您可以像这样批量处理消息

$msg = new AMQPMessage($msg_body);
$ch->batch_basic_publish($msg, $exchange);

$msg2 = new AMQPMessage($msg_body);
$ch->batch_basic_publish($msg2, $exchange);

然后像这样发送批次

$ch->publish_batch();

我们何时发布消息批次?

假设我们的程序需要从文件中读取,然后每行发布一条消息。根据消息的大小,您需要决定何时发送批次更好。您可以每50条消息发送一次,或者每100条。这取决于您。

优化消息发布

另一种加快消息发布速度的方法是重用AMQPMessage消息实例。您可以创建新消息如下

$properties = array('content_type' => 'text/plain', 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT);
$msg = new AMQPMessage($body, $properties);
$ch->basic_publish($msg, $exchange);

现在假设您想更改未来消息的消息体,但将保持相同的属性,即您的消息仍然是text/plain,而delivery_mode仍然是AMQPMessage::DELIVERY_MODE_PERSISTENT。如果您为每条发布的消息创建一个新的AMQPMessage实例,那么这些属性将必须重新编码为AMQP二进制格式。您可以通过重用AMQPMessage并按如下方式重置消息体来避免所有这些

$msg->setBody($body2);
$ch->basic_publish($msg, $exchange);

截断大型消息

AMQP对消息大小没有限制;如果消费者收到非常大的消息,PHP的内存限制可能在调用传递给basic_consume的回调之前达到。

为了避免这种情况,您可以在您的Channel实例上调用方法AMQPChannel::setBodySizeLimit(int $bytes)。超过此限制的正文大小将被截断,并通过将AMQPMessage::$is_truncated标志设置为true交付给您的回调。属性AMQPMessage::$body_size将反映接收到的消息的真实正文大小,如果消息被截断,则该大小将大于strlen(AMQPMessage::getBody())

请注意,所有超过限制的数据都将从AMQP Channel中读取并立即丢弃,因此您无法在回调中检索它。如果您有另一个可以处理具有更大有效负载的消息的消费者,您可以使用basic_rejectbasic_nack告诉服务器(服务器仍然有一个完整的副本)将其转发到死信交换机。

默认情况下,不会发生截断。要禁用已启用的Channel上的截断,请将0(或null)传递给AMQPChannel::setBodySizeLimit()

##UNIX信号##

如果您已安装PCNTL扩展,则在消费者不处理消息时将处理信号的分派。

$pcntlHandler = function ($signal) {
    switch ($signal) {
        case \SIGTERM:
        case \SIGUSR1:
        case \SIGINT:
            // some stuff before stop consumer e.g. delete lock etc
            exit(0);
            break;
        case \SIGHUP:
            // some stuff to restart consumer
            break;
        default:
            // do nothing
    }
};

declare(ticks = 1) {
    pcntl_signal(\SIGTERM, $pcntlHandler);
    pcntl_signal(\SIGINT,  $pcntlHandler);
    pcntl_signal(\SIGUSR1, $pcntlHandler);
    pcntl_signal(\SIGHUP,  $pcntlHandler);
}

要禁用此功能,只需将常量AMQP_WITHOUT_SIGNALS定义为true

<?php
define('AMQP_WITHOUT_SIGNALS', true);

... more code

调试

如果您想了解协议层面的情况,请在您的代码中添加以下常量

<?php
define('AMQP_DEBUG', true);

... more code

?>

基准测试

要运行发布/消费基准测试,请输入以下命令

$ make benchmark

测试

要成功运行测试,您需要首先设置测试user和测试virtual host

您可以通过启动RabbitMQ后运行以下命令来完成此操作

$ rabbitmqctl add_vhost phpamqplib_testbed
$ rabbitmqctl add_user phpamqplib phpamqplib_password
$ rabbitmqctl set_permissions -p phpamqplib_testbed phpamqplib ".*" ".*" ".*"

一旦您的环境设置完毕,您就可以按如下方式运行您的测试

$ make test

贡献

有关详细信息,请参阅CONTRIBUTING

使用AMQP 0.8

如果您仍然想使用旧版本的协议,您可以通过在配置代码中设置以下常量来实现

define('AMQP_PROTOCOL', '0.8');

默认值是'0.9.1'

提供自己的自动加载器

如果您出于某种原因不想使用composer,那么您需要一个自动加载器来为库类提供。人们已经报告使用此自动加载器成功。

原始README

以下是原始README文件的内容。归功于原始作者。

实现高级消息队列协议(AMQP)的PHP库。

该库是py-amqplib的Python代码移植版本,py-amqplib的网址为:http://barryp.org/software/py-amqplib/

它已经在RabbitMQ服务器上进行了测试。

项目主页: http://code.google.com/p/php-amqplib/

如有讨论需求,请加入该群组

http://groups.google.com/group/php-amqplib-devel

如有错误报告,请在项目页面的错误跟踪系统中使用。

欢迎提交补丁!

作者:Vadim Zaliva lord@crocodile.org