mouf/oo-amqp-client

在php-amqplib之上提供面向对象包装的对象,帮助以面向对象的方式处理RabbitMQ。

v1.1.2 2018-10-30 13:56 UTC

This package is auto-updated.

Last update: 2024-09-15 04:47:24 UTC


README

Latest Stable Version Total Downloads Latest Unstable Version License Scrutinizer Code Quality Build Status Coverage Status

关于面向对象AMQP客户端

此包包含一个在php-amqplib之上提供面向对象包装的对象,帮助以面向对象的方式处理RabbitMQ。

使用此包,交换绑定队列都表示为对象。这对于您想要将对象注入依赖注入容器的情况非常有用。

安装

composer require mouf/oo-amqp-client

使用方法

在使用此库之前,您应该熟悉AMQP概念。如果您不熟悉,我们强烈建议您从阅读RabbitMQ文档中的"AMQP 0-9-1模型解释"文档开始。

准备好了吗?让我们开始吧。

创建客户端

您首先想要创建的是Client对象。一个Client表示与RabbitMQ的连接(对于熟悉php-amqplib的您来说,它既是连接也是通道)。

use Mouf\AmqpClient\Client;

$client = new Client(
    $rabbitmq_host,
    $rabbitmq_port,
    $rabbitmq_user,
    $rabbitmq_password,
    $rabbitmq_vhost = '/',
    $rabbitmq_insist = false,
    $rabbitmq_login_method = 'AMQPLAIN',
    $rabbitmq_login_response = null,
    $rabbitmq_locale = 'en_US',
    $rabbitmq_connection_timeout = 3.0,
    $rabbitmq_read_write_timeout = 3.0,
    $rabbitmq_context = null,
    $rabbitmq_keepalive = false,
    $rabbitmq_heartbeat = 0
);

注意:Client类公开了多个有用的配置方法(如果您不知道它们的作用,则不需要使用它们)

public function setPrefetchSize($prefetchSize);
public function setPrefetchCount($prefetchCount);
public function setAGlobal($aGlobal);

创建交换

在AMQP中,交换是接收消息并将这些消息转发到队列的对象。因此,您必须定义一个Exchange对象来发送消息。

use Mouf\AmqpClient\Objects\Exchange;

$exchange = new Exchange($client, 'exchange_name', 'fanout');

在创建交换时,您将传递给构造函数的Client对象、交换名称和交换类型。

注意:交换将在客户端中自动注册

您可以使用配置方法应用高级配置

public function setPassive($passive);
public function setDurable($durable);
public function setAutoDelete($autoDelete);
public function setInternal($internal);
public function setNowait($nowait);
public function setArguments($arguments);
public function setTicket($ticket);

创建队列和绑定

到达交换的消息将通过绑定转发到一个队列

我们现在创建一个队列来存储我们的消息。

use Mouf\AmqpClient\Objects\Queue;

$queue = new Queue($client, 'queue_name', [
    new Consumer(function(AMQPMessage $msg) {
        // Do some stuff with the received message
    })
]);

在创建客户端时,您将传递给构造函数的Client对象、客户端名称以及一个包含Consumer对象的数组(实际上是一个实现ConsumerInterface的对象的数组)。

Consumer对象是包含每次收到消息时将被调用的代码的对象。

注意:队列将在客户端中自动注册

您可以使用这些配置方法对您的队列应用高级配置

public function setPassive($passive);
public function setDurable($durable);
public function setExclusive($exclusive);
public function setAutoDelete($autoDelete);
public function setNoWait($noWait);
public function setArguments($arguments);
public function setTicket($ticket);
public function setDeadLetterExchange(Exchange $exchange);
public function setConfirm($confirm);
public function setConsumerCancelNotify(Queue $consumerCancelNotify);
public function setAlternateExchange(Queue $alternateExchange);
public function setTtl($ttl);
public function setMaxLength($maxLength);
public function setMaxPriority($maxPriority);

如果您想使队列在接收器断电时存储消息,您肯定想使用setDurable方法。

此时,我们有一个交换和一个队列,但它们没有连接在一起。我们需要使用Binding对象来绑定它们。

use Mouf\AmqpClient\Objects\Binding;

$binding = new Binding($exchange, $queue);
$client->register($binding);

Binding将交换与队列链接。

重要:与ExchangeQueue不同,Binding不会在客户端中自动注册。您必须使用Client::register方法自己声明它。

准备好了吗?让我们发送和接收消息!

发送消息

为了发送消息,您只需使用Exchange::publish方法

$exchange->publish(new Message('your message body'), 'message_key');
// ... and that's it!

您还可以进一步配置消息的发送。Exchange::publish方法接受一些可选参数

public function publish(Message $message,
                        string $routingKey,
                        bool $mandatory = false,
                        bool $immediate = false,
                        $ticket = null);

此外,您还可以使用这些方法之一调整Message

public function setContentType(string $content_type);
public function setContentEncoding(string $content_encoding);
public function setApplicationHeaders(array $application_headers);
public function setDeliveryMode(int $delivery_mode);
public function setPriority(int $priority);
public function setCorrelationId(string $correlation_id);
public function setReplyTo(string $reply_to);
public function setExpiration(string $expiration);
public function setMessageId(string $message_id);
public function setTimestamp(\DateTimeInterface $timestamp);
public function setType(string $type);
public function setUserId(string $user_id);
public function setAppId(string $app_id);
public function setClusterId(string $cluster_id);

接收消息

正如我们已经看到的,接收消息的第一步是创建一个队列并将Consumer对象添加到该队列中。

我们还需要告诉PHP开始监听,否则,Consumer中的回调永远不会被调用。

这可以通过使用ConsumerService类来实现。

$consumerService = new ConsumerService($client, [
    $queue
]);

$consumerService->run();

ConsumerService构造函数接受客户端作为参数,以及必须监听的队列数组。

ConsumerService::run方法将开始监听到达的消息,在一个无限循环中。

注意,如果你想只监听一条消息并在之后返回,可以使用$consumerService->run(true);

确认和错误处理

当你收到一条消息时,在Consumer完成消息消费之前不会发送确认。

如果在Consumer中触发异常,则会向RabbitMQ发送一个nack

注意:如果你的消费者回调抛出实现了RetryableExceptionInterface接口的异常,则带有“requeue”标记的nack消息将被发送。该消息将被重新入队。

注意:如果你的消费者回调抛出实现了FatalExceptionInterface接口的异常,异常将被消费者传播(从而导致消费者脚本的崩溃)。否则,消费者将继续处理消息。

默认情况下,异常使用error_log函数进行记录。您可以通过将PSR-3兼容的记录器传递给AbstractConsumer构造函数来覆盖此行为。

将您的消费者编写为类

到目前为止,为了创建一个消费者,我们使用了接受回调作为第一个构造参数的Consumer类。

作为替代方案,您可以扩展AbstractConsumer类并实现onMessageReceived方法

class MyConsumer extends AbstractConsumer
{
    public function onMessageReceived($msg)
    {
        // Do some stuff.
    }
}

向特定队列发送消息

如果您想针对特定队列并直接向其发送消息,您有2种选择。

选项1:创建一个DefaultExchange对象并将队列名称作为消息的键。

use Mouf\AmqpClient\Objects\DefaultExchange;

$exchange = new DefaultExchange($client);
// Simply pass the queue name as the second parameter of "publish".
// Note: you do not need to bind the queue to the exchange. RabbitMQ does this automatically.
$exchange->publish(new Message('your message body'), 'name_of_the_target_queue');
// ... and that's it!

选项2:使用Queue对象的publish方法

use Mouf\AmqpClient\Objects\Queue;

$queue = new Queue($client, 'queue_name', [
    new Consumer(function(AMQPMessage $msg) {
        // Do some stuff with the received message
    })
]);

// Shazam! We are directly sending a message to the queue. No exchange needed!
$queue->publish(new Message('your message body'));

注意:这些是RabbitMQ特有的功能,可能不适用于其他AMQP总线。

Symfony控制台集成

此软件包附带2个Symfony命令,您可以使用它们发送和接收消息。

  • Mouf\AmqpClient\Commands\PublishCommand (amqp:publish) 允许您向交换发送任意消息(从文件或从STDIN读取)
  • Mouf\AmqpClient\Commands\ConsumeCommand (amqp:consume) 监听所有配置的队列

运行单元测试

此软件包使用PHPUnit进行单元测试。

要运行测试

vendor/bin/phpunit

显然,您需要一个正在运行的RabbitMQ服务器来测试此软件包。如果您使用Docker,可以使用以下命令启动一个:

docker run -p 5672:5672 -p 15672:15672 rabbitmq:management