php-amqplib/rabbitmq-bundle

整合了 php-amqplib 与 Symfony 和 RabbitMq。以前是 emag-tech-labs/rabbitmq-bundle,老声/rabbitmq-bundle。

安装: 162,841,020

依赖者: 72

建议者: 3

安全: 0

星星: 1,219

关注者: 54

分支: 470

开放问题: 3

类型:symfony-bundle

2.16.0 2024-03-25 12:12 UTC

README

Latest Version Test Scrutinizer Code Quality Code Coverage PHPStan Join the chat at https://gitter.im/php-amqplib/RabbitMqBundle

关于

RabbitMqBundle 通过 RabbitMQ 使用 php-amqplib 库在您的应用程序中实现消息。

该组件实现了 Thumper 库中看到的几种消息模式。因此,从 Symfony 控制器发布到 RabbitMQ 的消息就像这样简单

$msg = array('user_id' => 1235, 'image_path' => '/path/to/new/pic.png');
$this->get('old_sound_rabbit_mq.upload_picture_producer')->publish(serialize($msg));

稍后当您想从 upload_pictures 队列中消费 50 条消息时,您只需在 CLI 上运行

$ ./app/console rabbitmq:consumer -m 50 upload_picture

所有示例都假定有一个正在运行的 RabbitMQ 服务器。

该组件在 Symfony Live Paris 2011 会议中展出。查看幻灯片 这里

版本 2

由于 Symfony >=4.4 导致的破坏性更改,发布了一个新标签,使组件与 Symfony >=4.4 兼容。

安装

对于 Symfony 框架 >= 4.4

使用 composer 需求组件及其依赖项

$ composer require php-amqplib/rabbitmq-bundle

注册组件

// app/AppKernel.php

public function registerBundles()
{
    $bundles = array(
        new OldSound\RabbitMqBundle\OldSoundRabbitMqBundle(),
    );
}

享受!

对于使用 Symfony Console、依赖注入和配置组件的控制台应用程序

如果您有一个用于运行 RabbitMQ 消费者的控制台应用程序,则不需要 Symfony HttpKernel 和 FrameworkBundle。从版本 1.6 开始,您可以使用依赖注入组件加载此组件配置和服务,然后使用消费者命令。

在您的 composer.json 文件中需求组件

{
    "require": {
        "php-amqplib/rabbitmq-bundle": "^2.0",
    }
}

注册扩展和编译器传递

use OldSound\RabbitMqBundle\DependencyInjection\OldSoundRabbitMqExtension;
use OldSound\RabbitMqBundle\DependencyInjection\Compiler\RegisterPartsPass;

// ...

$containerBuilder->registerExtension(new OldSoundRabbitMqExtension());
$containerBuilder->addCompilerPass(new RegisterPartsPass());

警告 - BC 破坏性更改

  • 自 2012-06-04 以来,某些在 "producers" 配置部分声明的交换的默认选项已更改为与 "consumers" 部分中声明的交换的默认值匹配。受影响的设置是

    • durablefalse 更改为 true,
    • auto_deletetrue 更改为 false.

    如果您的配置依赖于以前的默认值,则必须更新您的配置。

  • 自 2012-04-24 以来,ConsumerInterface::execute 方法的签名已更改

  • 自 2012-01-03 以来,消费者执行方法获取整个 AMQP 消息对象,而不仅仅是体。

使用

在您的配置文件中添加 old_sound_rabbit_mq 部分

old_sound_rabbit_mq:
    connections:
        default:
            host:     'localhost'
            port:     5672
            user:     'guest'
            password: 'guest'
            vhost:    '/'
            lazy:     false
            connection_timeout: 3
            read_write_timeout: 3
            
            # the timeout when waiting for a response from rabbitMQ (0.0 means waits forever)
            channel_rpc_timeout: 0.0

            # requires php-amqplib v2.4.1+ and PHP5.4+
            keepalive: false

            # requires php-amqplib v2.4.1+
            heartbeat: 0

            #requires php_sockets.dll
            use_socket: true # default false
            
            login_method: 'AMQPLAIN' # default 'AMQPLAIN', can be 'EXTERNAL' or 'PLAIN', see https://rabbitmq.cn/docs/access-control#mechanisms
            
        another:
            # A different (unused) connection defined by an URL. One can omit all parts,
            # except the scheme (amqp:). If both segment in the URL and a key value (see above)
            # are given the value from the URL takes precedence.
            # See https://rabbitmq.cn/uri-spec.html on how to encode values.
            url: 'amqp://guest:password@localhost:5672/vhost?lazy=1&connection_timeout=6'
    producers:
        upload_picture:
            connection:            default
            exchange_options:      {name: 'upload-picture', type: direct}
            service_alias:         my_app_service # no alias by default
            default_routing_key:   'optional.routing.key' # defaults to '' if not set
            default_content_type:  'content/type' # defaults to 'text/plain'
            default_delivery_mode: 2 # optional. 1 means non-persistent, 2 means persistent. Defaults to "2".
    consumers:
        upload_picture:
            connection:       default
            exchange_options: {name: 'upload-picture', type: direct}
            queue_options:    {name: 'upload-picture'}
            callback:         upload_picture_service
            options:
                no_ack:       false # optional. If set to "true", automatic acknowledgement mode will be used by this consumer. Default "false". See https://rabbitmq.cn/confirms.html for details.

在此,我们配置连接服务和我们的应用程序将拥有的消息端点。在此示例中,您的服务容器将包含服务 old_sound_rabbit_mq.upload_picture_producerold_sound_rabbit_mq.upload_picture_consumer。后者期望存在名为 upload_picture_service 的服务。

如果您未指定客户端的连接,则客户端将查找具有相同别名的连接。因此,对于我们的 upload_picture,服务容器将查找名为 upload_picture 的连接。

如果您需要添加可选队列参数,则您的队列选项可以是以下这样

queue_options: {name: 'upload-picture', arguments: {'x-ha-policy': ['S', 'all']}}

另一个具有 20 秒消息 TTL 的示例

queue_options: {name: 'upload-picture', arguments: {'x-message-ttl': ['I', 20000]}}

参数值必须是一个数据类型和值的列表。有效的数据类型有

  • S - 字符串
  • I - 整数
  • D - 小数
  • T - 时间戳
  • F - 表格
  • A - 数组
  • t - 布尔

根据您的需求调整 arguments

如果您想将队列与特定的路由键绑定,可以在生产者或消费者配置中声明。

queue_options:
    name: "upload-picture"
    routing_keys:
      - 'android.#.upload'
      - 'iphone.upload'

重要提示 - 懒连接

在 Symfony 环境中,从版本 >= 4.3 开始,您可以声明一个服务为懒加载(懒加载服务)。这个包仍然不支持新的懒加载服务功能,但您可以在连接配置中设置 lazy: true,以避免在每个请求中与消息代理建立不必要的连接。由于性能原因,强烈建议使用懒连接,尽管默认情况下禁用了懒连接选项,以避免可能破坏已经使用此包的应用程序。

重要提示 - 心跳

read_write_timeout 设置为心跳的两倍,这样您的套接字就会保持开启状态。如果您不这样做,或者使用不同的倍数,存在风险,消费者的套接字将超时。

请注意,如果您的任务通常运行时间超过心跳周期,您可能会遇到问题(链接)。考虑使用较大的心跳值,或者禁用心跳以启用 tcp 的 keepalive(客户端和服务器端)以及 graceful_max_execution_timeout 功能。

多个主机

您可以为连接提供多个主机。这将允许您使用具有多个节点的 RabbitMQ 集群。

  old_sound_rabbit_mq:
      connections:
          default:
              hosts:
                - host: host1
                  port: 3672
                  user: user1
                  password: password1
                  vhost: vhost1
                - url: 'amqp://guest:password@localhost:5672/vhost'
              connection_timeout: 3
              read_write_timeout: 3

请注意,您不能为每个主机分别指定

  connection_timeout 
  read_write_timeout
  use_socket
  ssl_context
  keepalive
  heartbeat
  connection_parameters_provider 

参数。

动态连接参数

有时您的连接信息可能需要是动态的。动态连接参数允许您通过服务以编程方式提供或覆盖参数。

例如,在一个场景中,连接的 vhost 参数取决于您定制的应用程序的当前租户,并且您不想(或不能)每次都更改其配置。

connection_parameters_provider 下定义一个实现 ConnectionParametersProviderInterface 的服务,并将其添加到适当的 connections 配置中。

connections:
    default:
        host:     'localhost'
        port:     5672
        user:     'guest'
        password: 'guest'
        vhost:    'foo' # to be dynamically overridden by `connection_parameters_provider`
        connection_parameters_provider: connection_parameters_provider_service

示例实现

class ConnectionParametersProviderService implements ConnectionParametersProvider {
    ...
    public function getConnectionParameters() {
        return array('vhost' => $this->getVhost());
    }
    ...
}

在这种情况下,vhost 参数将被 getVhost() 的输出覆盖。

生产者、消费者,什么?

在消息应用程序中,将消息发送到代理的过程称为 生产者,而接收这些消息的过程称为 消费者。在您的应用程序中,您将拥有几个这样的过程,您可以在配置中的相应条目下列出它们。

生产者

生产者将用于向服务器发送消息。在 AMQP 模型中,消息被发送到 交换机,这意味着在配置生产者时,您必须指定连接选项以及交换机选项,通常将是交换机的名称和类型。

现在假设您想处理后台的图片上传。在将图片移动到其最终位置后,您将向服务器发布包含以下信息的消息

public function indexAction($name)
{
    $msg = array('user_id' => 1235, 'image_path' => '/path/to/new/pic.png');
    $this->get('old_sound_rabbit_mq.upload_picture_producer')->publish(serialize($msg));
}

如您所见,如果您的配置中有一个名为 upload_picture 的生产者,那么在服务容器中,您将有一个名为 old_sound_rabbit_mq.upload_picture_producer 的服务。

除了消息本身外,OldSound\RabbitMqBundle\RabbitMq\Producer#publish() 方法还接受一个可选的路由键参数和一个可选的附加属性数组。附加属性数组允许您修改默认情况下使用 PhpAmqpLib\Message\AMQPMessage 对象构造时使用的属性。这样,例如,您可以更改应用程序头。

您可以使用 setContentTypesetDeliveryMode 方法来分别设置消息内容类型和消息投递模式,以覆盖“producers”配置部分中设置的任何默认值。如果没有被“producers”配置或对这些方法的显式调用(如下面的示例所示)覆盖,则默认值为内容类型 text/plain 和投递模式 2

$this->get('old_sound_rabbit_mq.upload_picture_producer')->setContentType('application/json');

如果您需要为生产者使用自定义类(该类应继承自 OldSound\RabbitMqBundle\RabbitMq\Producer),则可以使用 class 选项。

    ...
    producers:
        upload_picture:
            class: My\Custom\Producer
            connection: default
            exchange_options: {name: 'upload-picture', type: direct}
    ...

下一个难题是拥有一个消费者,它会从队列中取出消息并相应地处理它。

消费者

消费者将连接到服务器并开始一个 循环 等待接收到的消息进行处理。这样的消费者指定的 回调 将决定其行为。让我们回顾一下上面的消费者配置。

consumers:
    upload_picture:
        connection:       default
        exchange_options: {name: 'upload-picture', type: direct}
        queue_options:    {name: 'upload-picture'}
        callback:         upload_picture_service

正如我们所见,回调 选项引用了 upload_picture_service。当消费者从服务器接收到消息时,它将执行这样的回调。如果您需要指定不同的回调以进行测试或调试,则可以将其更改。

除了回调之外,我们还指定了要使用的连接方式,这与使用 生产者 的方式相同。其余选项是 exchange_optionsqueue_optionsexchange_options 应与用于 生产者 的相同。在 queue_options 中,我们将提供一个 队列名称。为什么?

正如我们所言,在 AMQP 中,消息被发布到 交换机 上。这并不意味着消息已经到达 队列。为了实现这一点,我们首先需要创建这样的 队列,然后将其绑定到 交换机 上。酷的地方在于您可以将多个 队列 绑定到一个 交换机 上,这样一条消息就可以到达多个目的地。这种方法的优点是生产者和消费者之间的 解耦。生产者不关心将有多少消费者处理他的消息。它只需要消息到达服务器。这样我们就可以在不更改控制器中的代码的情况下,扩展每次上传图片时执行的操作。

现在,如何运行消费者?有一个命令可以执行,如下所示

$ ./app/console rabbitmq:consumer -m 50 upload_picture

这意味着什么?我们正在执行 upload_picture 消费者,并告诉它只消费50条消息。每次消费者从服务器接收到消息时,它将执行配置的回调,将 AMQP 消息作为 PhpAmqpLib\Message\AMQPMessage 类的实例传递。可以通过调用 $msg->body 获取消息体。默认情况下,消费者将以 无限循环 的方式处理消息,其中“无限”有一个定义。

如果您想确保消费者在 Unix 信号上立即执行完毕,可以使用带标志 -w 的命令。

$ ./app/console rabbitmq:consumer -w upload_picture

然后消费者将立即执行完毕。

要使用带有此标志的命令,您需要安装带有 PCNTL 扩展的 PHP

如果您想设置消费者内存限制,可以使用标志 -l。以下示例中,此标志添加了256MB内存限制。消费者将在达到256MB之前停止5MB,以避免PHP“允许的内存大小”错误。

$ ./app/console rabbitmq:consumer -l 256

如果您想删除队列中等待的所有消息,可以执行此命令来清除此队列

$ ./app/console rabbitmq:purge --no-confirmation upload_picture

要删除消费者的队列,请使用此命令

$ ./app/console rabbitmq:delete --no-confirmation upload_picture

消费者事件

这在许多场景中都很有用。有3个AMQP事件

消费时
class OnConsumeEvent extends AMQPEvent
{
    const NAME = AMQPEvent::ON_CONSUME;

    /**
     * OnConsumeEvent constructor.
     *
     * @param Consumer $consumer
     */
    public function __construct(Consumer $consumer)
    {
        $this->setConsumer($consumer);
    }
}

假设您需要在新的应用程序部署时暂停/停止消费者。您可以监听 OldSound\RabbitMqBundle\Event\OnConsumeEvent` 并检查新的应用程序部署。

处理消息前
class BeforeProcessingMessageEvent extends AMQPEvent
{
    const NAME = AMQPEvent::BEFORE_PROCESSING_MESSAGE;

    /**
     * BeforeProcessingMessageEvent constructor.
     *
     * @param AMQPMessage $AMQPMessage
     */
    public function __construct(Consumer $consumer, AMQPMessage $AMQPMessage)
    {
        $this->setConsumer($consumer);
        $this->setAMQPMessage($AMQPMessage);
    }
}

在处理 AMQPMessage 之前引发的事件。

处理消息后
class AfterProcessingMessageEvent extends AMQPEvent
{
    const NAME = AMQPEvent::AFTER_PROCESSING_MESSAGE;

    /**
     * AfterProcessingMessageEvent constructor.
     *
     * @param AMQPMessage $AMQPMessage
     */
    public function __construct(Consumer $consumer, AMQPMessage $AMQPMessage)
    {
        $this->setConsumer($consumer);
        $this->setAMQPMessage($AMQPMessage);
    }
}

在处理 AMQPMessage 之后引发的事件。如果处理消息将抛出异常,则不会引发事件。

空闲消息
<?php
class OnIdleEvent extends AMQPEvent
{
    const NAME = AMQPEvent::ON_IDLE;

    /**
     * OnIdleEvent constructor.
     *
     * @param AMQPMessage $AMQPMessage
     */
    public function __construct(Consumer $consumer)
    {
        $this->setConsumer($consumer);
        
        $this->forceStop = true;
    }
}

wait 方法因超时而退出且未收到消息时引发的事件。为了使用此事件,消费者必须配置 idle_timeout。默认情况下,在空闲超时时退出,您可以通过在监听器中设置 $event->setForceStop(false) 来防止它。

空闲超时

如果您需要在一段时间内没有从队列中收到消息时设置超时,可以设置秒为单位的 idle_timeoutidle_timeout_exit_code 指定消费者在空闲超时时应返回的退出代码。如果不指定,则消费者将抛出 PhpAmqpLib\Exception\AMQPTimeoutException 异常。

consumers:
    upload_picture:
        connection:             default
        exchange_options:       {name: 'upload-picture', type: direct}
        queue_options:          {name: 'upload-picture'}
        callback:               upload_picture_service
        idle_timeout:           60
        idle_timeout_exit_code: 0

超时等待

设置秒为单位的 timeout_waittimeout_wait 指定消费者在确认当前连接仍然有效之前,在没有收到新消息的情况下将等待多长时间。

consumers:
    upload_picture:
        connection:             default
        exchange_options:       {name: 'upload-picture', type: direct}
        queue_options:          {name: 'upload-picture'}
        callback:               upload_picture_service
        idle_timeout:           60
        idle_timeout_exit_code: 0
        timeout_wait:           10

优雅的最大执行超时

如果您希望消费者运行一定时间然后优雅地退出,请设置秒为单位的 graceful_max_execution.timeout。 "优雅退出" 意味着消费者将在当前运行的任务完成后退出,或者当等待新任务时立即退出。graceful_max_execution.exit_code 指定消费者在优雅的最大执行超时时应返回的退出代码。如果不指定,则消费者将使用状态 0 退出。

此功能与supervisord配合使用时非常好,它可以允许定期清理内存泄漏、更新数据库/与RabbitMQ的连接等。

consumers:
    upload_picture:
        connection:             default
        exchange_options:       {name: 'upload-picture', type: direct}
        queue_options:          {name: 'upload-picture'}
        callback:               upload_picture_service

        graceful_max_execution:
            timeout: 1800 # 30 minutes 
            exit_code: 10 # default is 0 

公平分发

您可能已经注意到,分发仍然没有完全按我们的要求进行。例如,在有两个工作者的情况下,当所有奇数消息都很重而偶数消息都很轻时,一个工作者会一直忙碌,而另一个工作者几乎不做任何工作。RabbitMQ 对此一无所知,仍然会平均分发消息。

这是因为 RabbitMQ 只有在消息进入队列时才会分发消息。它不会查看消费者未确认的消息数量。它只是盲目地将每第 n 个消息分发给第 n 个消费者。

为了克服这一点,我们可以使用带有 prefetch_count=1 设置的基本.qos 方法。这告诉 RabbitMQ 不要一次给工作者发送超过一条消息。换句话说,不要在工作者处理并确认上一条消息之前发送新消息。相反,它将将其分发给下一个不忙碌的工作者。

来源:https://rabbitmq.cn/tutorials/tutorial-two-python.html

在实现公平调度时要注意,这会引入延迟,可能会影响性能(见这篇博客文章)。但是,实现它可以使你随着队列的增加动态地水平扩展。你应该根据博客文章的建议,评估与处理每条消息所需时间和你的网络性能相应的 prefetch_size 的正确值。

使用 RabbitMqBundle,你可以这样配置每个消费者的 qos_options

consumers:
    upload_picture:
        connection:       default
        exchange_options: {name: 'upload-picture', type: direct}
        queue_options:    {name: 'upload-picture'}
        callback:         upload_picture_service
        qos_options:      {prefetch_size: 0, prefetch_count: 1, global: false}

自动注入生产者和消费者

如果与 Symfony 4.2+ 框架的扩展一起使用,则在容器中声明生产者和常规消费者的别名集合。这些用于基于声明的类型和参数名称进行参数自动注入。这允许你将先前的生产者示例更改为

public function indexAction($name, ProducerInterface $uploadPictureProducer)
{
    $msg = array('user_id' => 1235, 'image_path' => '/path/to/new/pic.png');
    $uploadPictureProducer->publish(serialize($msg));
}

参数的名称由配置中的生产者或消费者名称构成,并根据类型后缀以生产者或消费者单词。与容器项目命名约定不同,如果名称已经后缀,则不会重复后缀单词(生产者或消费者)。upload_picture 生产者密钥将更改为 $uploadPictureProducer 参数名称。upload_picture_producer 生产者密钥也将别名到 $uploadPictureProducer 参数名称。最好避免这种相似名称。

所有生产者都别名为 OldSound\RabbitMqBundle\RabbitMq\ProducerInterface 和配置中的生产者类选项。在沙箱模式下,仅创建 ProducerInterface 别名。强烈建议在为生产者注入参数时使用 ProducerInterface 类。

所有消费者都别名为 OldSound\RabbitMqBundle\RabbitMq\ConsumerInterface%old_sound_rabbit_mq.consumer.class% 配置选项值。常规模式和沙箱模式之间没有区别。强烈建议在为客户端注入参数时使用 ConsumerInterface

回调

以下是一个回调示例

<?php

//src/Acme/DemoBundle/Consumer/UploadPictureConsumer.php

namespace Acme\DemoBundle\Consumer;

use OldSound\RabbitMqBundle\RabbitMq\ConsumerInterface;
use PhpAmqpLib\Message\AMQPMessage;

class UploadPictureConsumer implements ConsumerInterface
{
    public function execute(AMQPMessage $msg)
    {
        //Process picture upload.
        //$msg will be an instance of `PhpAmqpLib\Message\AMQPMessage` with the $msg->body being the data sent over RabbitMQ.

        $isUploadSuccess = someUploadPictureMethod();
        if (!$isUploadSuccess) {
            // If your image upload failed due to a temporary error you can return false
            // from your callback so the message will be rejected by the consumer and
            // requeued by RabbitMQ.
            // Any other value not equal to false will acknowledge the message and remove it
            // from the queue
            return false;
        }
    }
}

如你所见,这只需要实现一个方法:ConsumerInterface::execute

请注意,你的回调必须作为普通 Symfony 服务进行注册。在那里,你可以注入服务容器、数据库服务、Symfony 日志记录器等。

有关消息实例的更多详细信息,请参阅https://github.com/php-amqplib/php-amqplib/blob/master/doc/AMQPMessage.md

要停止消费者,回调可以抛出 StopConsumerException(最后消费的消息将不会被确认)或 AckStopConsumerException(消息将被确认)。如果使用 demonized,例如 supervisor,消费者实际上将重新启动。

总结

仅为了发送消息就做了很多工作,让我们总结一下,以更好地了解我们需要什么来生产/消费消息

  • 在配置中为消费者/生产者添加条目。
  • 实现你的回调。
  • 从 CLI 启动消费者。
  • 在控制器中添加发布消息的代码。

就是这样!

审计/日志记录

这是一个要求跟踪收到的/发布的消息。为了启用此功能,你需要向消费者或发布者添加 enable_logger 配置。

consumers:
    upload_picture:
        connection:       default
        exchange_options: {name: 'upload-picture', type: direct}
        queue_options:    {name: 'upload-picture'}
        callback:         upload_picture_service
        enable_logger: true

如果你想,你还可以使用 monolog 中的不同处理程序处理队列的日志记录,通过引用通道 phpamqplib

RPC 或回复/响应

到目前为止,我们只是向消费者发送了消息,但如果我们想要从他们那里得到回复怎么办?为了实现这一点,我们必须在我们的应用程序中实现 RPC 调用。这个扩展使使用 Symfony 实现此类事情变得非常简单。

让我们在配置中添加 RPC 客户端和服务器

rpc_clients:
    integer_store:
        connection: default #default: default
        unserializer: json_decode #default: unserialize
        lazy: true #default: false
        direct_reply_to: false
rpc_servers:
    random_int:
        connection: default
        callback:   random_int_server
        qos_options: {prefetch_size: 0, prefetch_count: 1, global: false}
        exchange_options: {name: random_int, type: topic}
        queue_options: {name: random_int_queue, durable: false, auto_delete: true}
        serializer: json_encode

有关完整的配置参考,请使用 php app/console config:dump-reference old_sound_rabbit_mq 命令。

这里有一个非常有用的服务器:它向客户端返回随机整数。用于处理请求的回调将是 random_int_server 服务。现在让我们看看如何从我们的控制器中调用它。

首先,我们需要在命令行中启动服务器

$ ./app/console_dev rabbitmq:rpc-server random_int

然后,将以下代码添加到我们的控制器中

public function indexAction($name)
{
    $client = $this->get('old_sound_rabbit_mq.integer_store_rpc');
    $client->addRequest(serialize(array('min' => 0, 'max' => 10)), 'random_int', 'request_id');
    $replies = $client->getReplies();
}

如您所见,如果我们的客户端 ID 是 integer_store,则服务名称将是 old_sound_rabbit_mq.integer_store_rpc。一旦我们得到该对象,我们将通过调用 addRequest 在服务器上放置一个请求,该函数期望三个参数

  • 要发送到远程过程调用的参数。
  • RPC 服务器的名称,在我们的例子中是 random_int
  • 我们调用时的请求标识符,在这个例子中是 request_id

我们发送的参数是 rand() 函数的 minmax 值。我们通过序列化一个数组来发送它们。如果我们的服务器期望 JSON 信息或 XML,我们将在这里发送这样的数据。

最后一部分是获取回复。我们的 PHP 脚本将阻塞,直到服务器返回一个值。变量 $replies 将是一个关联数组,其中每个来自服务器的回复都将包含在相应的 request_id 键中。

默认情况下,RPC 客户端期望响应是序列化的。如果您正在使用的服务器返回非序列化的结果,则将 RPC 客户端的 expect_serialized_response 选项设置为 false。例如,如果 integer_store 服务器没有序列化结果,则客户端设置如下

rpc_clients:
    integer_store:
        connection: default
        expect_serialized_response: false

您还可以为请求设置一个以毫秒为单位的过期时间,在此之后,消息将不再由服务器处理,客户端请求将简单地超时。设置消息过期时间仅适用于 RabbitMQ 3.x 及以上版本。有关更多信息,请访问 https://rabbitmq.cn/ttl.html#per-message-ttl

public function indexAction($name)
{
    $expiration = 5000; // milliseconds
    $client = $this->get('old_sound_rabbit_mq.integer_store_rpc');
    $client->addRequest($body, $server, $requestId, $routingKey, $expiration);
    try {
        $replies = $client->getReplies();
        // process $replies['request_id'];
    } catch (\PhpAmqpLib\Exception\AMQPTimeoutException $e) {
        // handle timeout
    }
}

正如你所猜的,我们也可以进行 并行 RPC 调用

并行 RPC

假设为了渲染某个网页,你需要执行两个数据库查询,一个需要 5 秒完成,另一个需要 2 秒 —— 非常昂贵的查询。如果你按顺序执行它们,那么你的页面将在大约 7 秒后准备好交付。如果你并行运行它们,那么你将在大约 5 秒内提供页面。使用 RabbitMqBundle,我们可以轻松地执行这样的并行调用。让我们在配置中定义一个并行客户端和另一个 RPC 服务器

rpc_clients:
    parallel:
        connection: default
rpc_servers:
    char_count:
        connection: default
        callback:   char_count_server
    random_int:
        connection: default
        callback:   random_int_server

然后这段代码应该放在我们的控制器中

public function indexAction($name)
{
    $client = $this->get('old_sound_rabbit_mq.parallel_rpc');
    $client->addRequest($name, 'char_count', 'char_count');
    $client->addRequest(serialize(array('min' => 0, 'max' => 10)), 'random_int', 'random_int');
    $replies = $client->getReplies();
}

这与前面的例子非常相似,我们只是多了一个 addRequest 调用。我们还提供了有意义的请求标识符,这样我们就可以在 $replies 数组中更容易地找到我们想要的回复。

直接回复到客户端

要启用 直接回复到客户端,您只需在客户端的 rpc_clients 配置中启用选项 direct_reply_to

此选项将在执行 RPC 调用时使用伪队列 amq.rabbitmq.reply-to。在 RPC 服务器上不需要进行任何修改。

优先队列

自 3.5.0 版本起,RabbitMQ 在核心中实现了优先队列。任何队列都可以使用客户端提供的可选参数(但与使用可选参数的其他功能不同,不是策略)转换为优先队列。该实现支持有限数量的优先级:255。建议使用 1 到 10 之间的值。请参阅文档

以下是如何声明一个优先队列的方法

    consumers:
        upload_picture:
            connection:       default
            exchange_options: {name: 'upload-picture', type: direct}
            queue_options:    {name: 'upload-picture', arguments: {'x-max-priority': ['I', 10]} }
            callback:         upload_picture_service

如果在运行 rabbitmq:setup-fabric 命令之前存在 upload-picture 队列,则必须先删除该队列

现在假设你想发送一条高优先级的消息,你必须发布消息并附加以下额外信息

public function indexAction()
{    
    $msg = array('user_id' => 1235, 'image_path' => '/path/to/new/pic.png');
    $additionalProperties = ['priority' => 10] ; 
    $routing_key = '';
    $this->get('old_sound_rabbit_mq.upload_picture_producer')->publish(serialize($msg), $routing_key , $additionalProperties );
}

多个消费者

对于逻辑分离,拥有许多队列是一种好习惯。使用简单的消费者,你将为每个队列创建一个工作者(消费者),当处理许多演变时可能会很难管理(忘记在supervisord配置中添加一行?)。这对于小型队列也很有用,因为你可能不想拥有与队列一样多的工作者,并且想要将一些任务重新组合在一起,同时不失灵活性和分离原则。

多个消费者允许你在同一个消费者上监听多个队列来处理这种情况。

以下是设置具有多个队列的消费者的方法

multiple_consumers:
    upload:
        connection:       default
        exchange_options: {name: 'upload', type: direct}
        queues_provider: queues_provider_service
        queues:
            upload-picture:
                name:     upload_picture
                callback: upload_picture_service
                routing_keys:
                    - picture
            upload-video:
                name:     upload_video
                callback: upload_video_service
                routing_keys:
                    - video
            upload-stats:
                name:     upload_stats
                callback: upload_stats

回调现在指定在每个队列下,必须像简单消费者一样实现ConsumerInterface。消费者中的所有queues-options选项都适用于每个队列。

请注意,所有队列都在同一个交换机上,设置正确的回调路由取决于你。

queues_provider是一个可选服务,它动态提供队列。它必须实现QueuesProviderInterface

请注意,队列提供者是负责对setDequeuer进行正确调用,并且回调是可调用的(不是ConsumerInterface)。如果提供队列的服务实现了DequeuerAwareInterface,则会在服务的定义中添加对setDequeuer的调用,其中DequeuerInterface目前是一个MultipleConsumer

任意绑定

你可能发现你的应用程序有一个复杂的流程,你需要有任意的绑定。任意绑定场景可能包括通过destination_is_exchange属性实现的交换到交换的绑定。

bindings:
    - {exchange: foo, destination: bar, routing_key: 'baz.*' }
    - {exchange: foo1, destination: foo, routing_key: 'baz.*', destination_is_exchange: true}

rabbitmq:setup-fabric命令将在创建任意绑定之前,声明在您的生产者、消费者和多消费者配置中定义的交换机和队列。然而,rabbitmq:setup-fabric将不会声明在绑定中定义的附加队列和交换机。确保交换机和队列已声明取决于你。

动态消费者

有时你必须动态更改消费者的配置。动态消费者允许你根据上下文程序化地定义消费者队列选项。

例如,在一个场景中,定义的消费者必须负责动态数量的主题,而你不想(或不能)每次都更改它的配置。

定义一个实现QueueOptionsProviderInterface的服务queue_options_provider,并将其添加到你的dynamic_consumers配置中。

dynamic_consumers:
    proc_logs:
        connection: default
        exchange_options: {name: 'logs', type: topic}
        callback: parse_logs_service
        queue_options_provider: queue_options_provider_service

示例用法

$ ./app/console rabbitmq:dynamic-consumer proc_logs server1

在这种情况下,proc_logs消费者为server1运行,并且它可以决定它使用的队列选项。

匿名消费者

现在,我们为什么需要匿名消费者呢?这听起来像某种互联网威胁或类似的东西...继续阅读。

在AMQP中,有一种称为topic的交换机类型,其中消息根据——你猜对了——消息的主题路由到队列。我们可以使用主题作为创建日志的主机名和日志的严重性将应用程序的日志发送到RabbitMQ主题交换机。消息体将是日志内容,我们的路由键将如下所示

  • server1.error
  • server2.info
  • server1.warning
  • ...

由于我们不希望将无限多的日志填满队列,我们可以做的是,当我们要监控系统时,可以启动一个消费者来创建队列,并基于某个主题附加到<强>日志交换机,例如,我们希望看到我们服务器报告的所有错误。路由键将类似于:<强>#.error。在这种情况下,我们需要想出一个队列名称,将其绑定到交换机,获取日志,取消绑定并删除队列。幸运的是,AMPQ提供了一种在声明和绑定队列时提供正确选项来自动完成此操作的方法。问题是你不想记住所有这些选项。因此,我们实现了<强>匿名消费者模式。

当我们启动匿名消费者时,它会处理这些细节,我们只需要考虑实现消息到达时的回调。之所以称为匿名,是因为它不会指定队列名称,但会等待RabbitMQ为其分配一个随机的名称。

现在,如何配置和运行这样的消费者?

anon_consumers:
    logs_watcher:
        connection:       default
        exchange_options: {name: 'app-logs', type: topic}
        callback:         logs_watcher

在这里,我们指定交换机名称和其类型,以及当消息到达时应执行的操作回调。

这个匿名消费者现在可以监听与同一交换机链接的主题类型的生产者。

    producers:
        app_logs:
            connection:       default
            exchange_options: {name: 'app-logs', type: topic}

要启动匿名消费者,我们使用以下命令

$ ./app/console_dev rabbitmq:anon-consumer -m 5 -r '#.error' logs_watcher

与之前看到的命令相比,唯一的新选项是指定<强>路由键:<代码>-r '#.error'

批量消费者

在某些情况下,你可能想要获取一批消息,然后对它们进行一些处理。批量消费者将允许你定义此类处理逻辑。

例如:假设你有一个队列,你收到一条消息,需要在数据库中插入一些信息,然后你意识到批量插入比逐个插入要好得多。

定义一个实现<代码>BatchConsumerInterface的回调服务,并将消费者的定义添加到你的配置中。

batch_consumers:
    batch_basic_consumer:
        connection:       default
        exchange_options: {name: 'batch', type: fanout}
        queue_options:    {name: 'batch'}
        callback:         batch.basic
        qos_options:      {prefetch_size: 0, prefetch_count: 2, global: false}
        timeout_wait:     5
        auto_setup_fabric: false
        idle_timeout_exit_code: -2
        keep_alive: false
        graceful_max_execution:
            timeout: 60

注意:如果将<代码>keep_alive选项设置为<代码>true,则忽略<代码>idle_timeout_exit_code,消费者进程将继续。

你可以实现一个批量消费者,它会在一个返回中确认所有消息,或者你可以控制确认哪些消息。

namespace AppBundle\Service;

use OldSound\RabbitMqBundle\RabbitMq\BatchConsumerInterface;
use PhpAmqpLib\Message\AMQPMessage;

class DevckBasicConsumer implements BatchConsumerInterface
{
    /**
     * @inheritDoc
     */
    public function batchExecute(array $messages)
    {
        echo sprintf('Doing batch execution%s', PHP_EOL);
        foreach ($messages as $message) {
            $this->executeSomeLogicPerMessage($message);
        }

        // you ack all messages got in batch
        return true; 
    }
}
namespace AppBundle\Service;

use OldSound\RabbitMqBundle\RabbitMq\BatchConsumerInterface;
use PhpAmqpLib\Message\AMQPMessage;

class DevckBasicConsumer implements BatchConsumerInterface
{
    /**
     * @inheritDoc
     */
    public function batchExecute(array $messages)
    {
        echo sprintf('Doing batch execution%s', PHP_EOL);
        $result = [];
        /** @var AMQPMessage $message */
        foreach ($messages as $message) {
            $result[$message->getDeliveryTag()] = $this->executeSomeLogicPerMessage($message);
        }

        // you ack only some messages that have return true
        // e.g:
        // $return = [
        //      1 => true,
        //      2 => true,
        //      3 => false,
        //      4 => true,
        //      5 => -1,
        //      6 => 2,
        //  ];
        // The following will happen:
        //  * ack: 1,2,4
        //  * reject and requeq: 3
        //  * nack and requeue: 6
        //  * reject and drop: 5
        return $result;
    }
}

如何运行以下批量消费者

    $ ./bin/console rabbitmq:batch:consumer batch_basic_consumer -w

重要:批量消费者不会有<代码>-m|messages选项可用 重要:如果只想消费特定数量的批次并停止消费者,批量消费者也可以有<代码>-b|batches选项可用。仅当您希望消费者在消费了这些批次消息后停止时才提供批次数量!

STDIN生产者

有一个命令从STDIN读取数据并将其发布到RabbitMQ队列。要使用它,首先必须在配置文件中配置一个<代码>producer服务,如下所示

producers:
    words:
      connection:       default
      exchange_options: {name: 'words', type: direct}

该生产者将消息发布到<代码>words直接交换机。当然,你可以根据自己的需要调整配置。

然后,假设你想要发布一些XML文件的内容,以便由消费者农场进行处理。你可以使用以下命令发布它们

$ find vendor/symfony/ -name "*.xml" -print0 | xargs -0 cat | ./app/console rabbitmq:stdin-producer words

这意味着你可以使用简单的Unix命令组合生产者。

让我们分解这个单行命令

$ find vendor/symfony/ -name "*.xml" -print0

该命令将在symfony文件夹内找到所有<代码>.xml文件,并将文件名打印出来。然后,每个文件名通过<代码>xargs 管道传递给<代码>cat

$ xargs -0 cat

最后,<代码>cat的输出直接传递到我们的生产者,该生产者被调用如下

$ ./app/console rabbitmq:stdin-producer words

它只接受一个参数,即你在 config.yml 文件中配置的生产者名称。

其他命令

设置 RabbitMQ 框架

本组件的目的是让您的应用程序能够生产消息并将它们发布到您已配置的一些交换机。

在某些情况下,即使您的配置正确,您正在生产的消息也可能不会被路由到任何队列,因为队列不存在。负责队列消费的消费者必须运行才能创建队列。

当消费者数量较多时,为每个消费者启动一个命令可能会成为一个噩梦。

为了一次性创建交换机、队列和绑定,并确保不会丢失任何消息,您可以运行以下命令

$ ./app/console rabbitmq:setup-fabric

当需要时,您可以配置您的消费者和生产者假设 RabbitMQ 框架已经定义。为此,请将以下内容添加到您的配置中

producers:
    upload_picture:
      auto_setup_fabric: false
consumers:
    upload_picture:
      auto_setup_fabric: false

默认情况下,消费者或生产者在启动时会使用 RabbitMQ 声明它需要的所有内容。使用时要小心,当交换机或队列未定义时,将出现错误。当您更改任何配置时,您需要运行上述 setup-fabric 命令来声明您的配置。

如何贡献

要贡献,只需提交一个包含您新代码的 Pull Request,同时注意,如果您添加了新功能或修改了现有功能,您必须在 README 中记录它们的功能。如果您破坏了 BC,您也必须记录它。此外,您还必须更新 CHANGELOG。因此

  • 记录新功能。
  • 更新 CHANGELOG。
  • 记录 BC 破坏性更改。

许可证

见:resources/meta/LICENSE.md

鸣谢

该组件的结构和文档部分基于 RedisBundle