pom86/php-amqplib

此包已被废弃,不再维护。未建议替代包。

此库是对 AMQP 协议的纯 PHP 实现。它已经过 RabbitMQ 的测试。

v2.5.1 2015-04-01 07:20 UTC

README

Build Status

此库是对 AMQP 协议的 纯 PHP 实现。它已经过 RabbitMQ 的测试。

需求:PHP 5.3,由于使用了 命名空间

该库被用于 RabbitMQ in Action 的 PHP 示例以及 官方 RabbitMQ 教程

支持的 RabbitMQ 版本

从 2.0 版本开始,此库默认使用 AMQP 0.9.1,因此需要 RabbitMQ 2.0 或更高版本。您不需要更改代码,但在升级前请进行测试。

支持的 RabbitMQ 扩展

由于该库使用 AMQP 0.9.1,我们添加了对以下 RabbitMQ 扩展的支持

  • 交换机到交换机绑定
  • 基本否定
  • 发布者确认
  • 消费者取消通知

修改现有方法(如 备用交换机)的扩展也得到支持。

设置

composer.json 文件添加到您的项目中

{
  "require": {
      "videlalvaro/php-amqplib": "2.2.*"
  }
}

假设您已安装 composer,则可以运行以下命令

$ composer.phar install

这将获取库及其依赖项并将其放在您的 vendor 文件夹中。然后您可以在 .php 文件中添加以下内容以使用该库

require_once __DIR__.'/vendor/autoload.php';

然后您需要使用相关类,例如

use PhpAmqpLib\Connection\AMQPConnection;
use PhpAmqpLib\Message\AMQPMessage;

用法

在 RabbitMQ 运行时,打开两个终端,并在第一个终端中执行以下命令以启动消费者

$ cd php-amqplib/demo
$ php amqp_consumer.php

然后在另一个终端中做

$ cd php-amqplib/demo
$ php amqp_publisher.php some text to publish

您应该会在另一个终端的进程中看到消息到达

然后,要停止消费者,向它发送 quit 消息

$ php amqp_publisher.php quit

如果您需要监听连接到 RabbitMQ 的套接字,请参阅非阻塞消费者的示例。

$ php amqp_consumer_non_blocking.php

教程

为了不重复,如果您想了解更多关于此库的信息,请参阅 官方 RabbitMQ 教程

更多示例

  • amqp_ha_consumer.php:演示了镜像队列的使用
  • amqp_consumer_exclusive.phpamqp_publisher_exclusive.php:演示了使用专用队列的扇出交换机。
  • amqp_consumer_fanout_{1,2}.phpamqp_publisher_fanout.php:演示了使用命名队列的扇出交换机。
  • basic_get.php:演示了通过使用 基本获取 AMQP 调用来从队列中获取消息。

批量发布

假设您有一个进程,它生成大量消息,这些消息将使用相同的 exchange、相同的 routing_key 以及如 mandatory 这样的选项进行发布。然后,您可以利用 batch_basic_publish 库功能。您可以像这样批量发送消息:

$msg = new AMQPMessage($msg_body);
$ch->batch_basic_publish($msg, $exchange);

$msg2 = new AMQPMessage($msg_body);
$ch->batch_basic_publish($msg2, $exchange);

然后像这样发送批次:

$ch->publish_batch();

何时发布消息批次?

假设我们的程序需要从文件中读取并逐行发布一条消息。根据消息大小,您将不得不决定何时发送批次。您可以每发送50条消息或每发送100条消息。这完全取决于您。

优化消息发布

另一种加快消息发布的方法是重用 AMQPMessage 消息实例。您可以创建新的消息如下:

$properties = array('content_type' => 'text/plain', 'delivery_mode' => 2);
$msg = new AMQPMessage($body, $properties);
$ch->basic_publish($msg, $exchange);

现在,假设您想要更改未来消息的消息正文,但保持相同的属性,即,您的消息仍然是 text/plain,并且 delivery_mode 仍然是 2。如果您为每条发布的消息创建一个新的 AMQPMessage 实例,那么这些属性将不得不在 AMQP 二进制格式中重新编码。您可以避免所有这些,只需重用 AMQPMessage 并按照以下方式重置消息正文:

$msg->setBody($body2);
$ch->basic_publish($msg, $exchange);

UNIX 信号

如果您已安装 PCNTL 扩展,则在消费者未处理消息时将处理信号的分派。

$pcntlHandler = function ($signal) {
    switch ($signal) {
        case \SIGTERM:
        case \SIGUSR1:
        case \SIGINT:
            // some stuff before stop consumer e.g. delete lock etc
            exit(0);
            break;
        case \SIGHUP:
            // some stuff to restart consumer
            break;
        default:
            // do nothing
    }
};

declare(ticks = 1) {
    pcntl_signal(\SIGTERM, $pcntlHandler);
    pcntl_signal(\SIGINT,  $pcntlHandler);
    pcntl_signal(\SIGUSR1, $pcntlHandler);
    pcntl_signal(\SIGHUP,  $pcntlHandler);
}

要禁用此功能,只需将常量 AMQP_WITHOUT_SIGNALS 定义为 true

<?php
define('AMQP_WITHOUT_SIGNALS', true);

... more code

调试

如果您想了解协议层面的情况,请在代码中添加以下常量:

<?php
define('AMQP_DEBUG', true);

... more code

?>

基准测试

要运行发布/消费基准测试,请输入:

$ make benchmark

测试

要成功运行测试,您需要首先设置测试 user 和测试 virtual host

您可以在启动 RabbitMQ 后运行以下命令来完成此操作:

$ rabbitmqctl add_vhost phpamqplib_testbed
$ rabbitmqctl add_user phpamqplib phpamqplib_password
$ rabbitmqctl set_permissions -p phpamqplib_testbed phpamqplib ".*" ".*" ".*"

一旦您的环境设置完成,您可以像这样运行您的测试:

$ make test

使用 AMQP 0.8

如果您仍然想使用旧版本的协议,可以通过在配置代码中设置以下常量来实现:

define('AMQP_PROTOCOL', '0.8');

默认值是 '0.9.1'

提供您自己的自动加载器

如果您出于某种原因不想使用 composer,则需要为库类提供一个自动加载器。有人 报告 使用此 自动加载器 成功。

原始 README

以下是原始 README 文件的内容。归功于原始作者。

PHP 库实现高级消息队列协议 (AMQP)。

该库是 py-amqplib 的 Python 代码的移植版本 http://barryp.org/software/py-amqplib/

已经与 RabbitMQ 服务器进行了测试。

项目主页: http://code.google.com/p/php-amqplib/

请加入以下组以进行讨论:

http://groups.google.com/group/php-amqplib-devel

有关错误报告,请使用项目页面上的错误跟踪系统。

欢迎提交补丁!

作者:Vadim Zaliva lord@crocodile.org