ecentria/rabbitmq-bundle

原名 oldsound/rabbitmq-bundle。集成了 php-amqplib、Symfony2 和 RabbitMq

安装: 0

依赖者: 0

建议者: 0

安全: 0

星星: 0

关注者: 8

分支: 469

类型:symfony-bundle

v1.8.0 2015-12-11 17:57 UTC

README

关于

RabbitMqBundle 通过 RabbitMQphp-amqplib 库将消息集成到您的应用程序中。

该组件实现了 Thumper 库中看到的一些消息模式。因此,从 Symfony2 控制器向 RabbitMQ 发布消息就像这样

$msg = array('user_id' => 1235, 'image_path' => '/path/to/new/pic.png');
$this->get('old_sound_rabbit_mq.upload_picture_producer')->publish(serialize($msg));

稍后,当您想从 upload_pictures 队列中消费 50 条消息时,您只需在 CLI 中运行

$ ./app/console rabbitmq:consumer -m 50 upload_picture

所有示例都假设有一个正在运行的 RabbitMQ 服务器。

该组件在 Symfony Live Paris 2011 会议中展出。查看幻灯片 此处

Build Status

安装

对于 Symfony 框架 >= 2.3

使用 composer 需要该组件及其依赖项

$ composer require php-amqplib/rabbitmq-bundle

注册组件

// app/AppKernel.php

public function registerBundles()
{
    $bundles = array(
        new OldSound\RabbitMqBundle\OldSoundRabbitMqBundle(),
    );
}

享受!

对于使用 Symfony Console、依赖注入和 Config 组件的控制台应用程序

如果您有一个用于运行 RabbitMQ 消费者的控制台应用程序,则不需要 Symfony HttpKernel 和 FrameworkBundle。从版本 1.6 开始,您可以使用依赖注入组件来加载此组件配置和服务,然后使用消费者命令。

在您的 composer.json 文件中需要组件

{
    "require": {
        "php-amqplib/rabbitmq-bundle": "~1.6",
    }
}

注册扩展和编译器传递

use OldSound\RabbitMqBundle\DependencyInjection\OldSoundRabbitMqExtension;
use OldSound\RabbitMqBundle\DependencyInjection\Compiler\RegisterPartsPass;

// ...

$containerBuilder->registerExtension(new OldSoundRabbitMqExtension());
$containerBuilder->addCompilerPass(new RegisterPartsPass());

警告 - BC 破坏性变更

  • 自 2012-06-04 以来,"producers" 配置部分中声明的交换机的某些默认选项已更改,以匹配 "consumers" 部分中声明的交换机的默认选项。受影响的设置包括

    • durable 已从 false 更改为 true,
    • auto_delete 已从 true 更改为 false.

    如果依赖以前的默认值,则必须更新您的配置。

  • 自 2012-04-24 以来,ConsumerInterface::execute 方法签名已更改

  • 自 2012-01-03 以来,消费者执行方法获取整个 AMQP 消息对象,而不仅仅是主体。有关更多详细信息,请参阅 CHANGELOG 文件。

使用方法

在您的配置文件中添加 old_sound_rabbit_mq 部分

old_sound_rabbit_mq:
    connections:
        default:
            host:     'localhost'
            port:     5672
            user:     'guest'
            password: 'guest'
            vhost:    '/'
            lazy:     false
            connection_timeout: 3
            read_write_timeout: 3

            # requires php-amqplib v2.4.1+ and PHP5.4+
            keepalive: false

            # requires php-amqplib v2.4.1+
            heartbeat: 0
    producers:
        upload_picture:
            connection:       default
            exchange_options: {name: 'upload-picture', type: direct}
    consumers:
        upload_picture:
            connection:       default
            exchange_options: {name: 'upload-picture', type: direct}
            queue_options:    {name: 'upload-picture'}
            callback:         upload_picture_service

在此,我们配置了连接服务和我们的应用程序将拥有的消息端点。在此示例中,您的服务容器将包含服务 old_sound_rabbit_mq.upload_picture_producerold_sound_rabbit_mq.upload_picture_consumer。后者期望存在一个名为 upload_picture_service 的服务。

如果您没有为客户端指定连接,则客户端将查找具有相同别名的连接。因此,对于我们的 upload_picture,服务容器将查找一个名为 upload_picture 的连接。

如果您需要添加可选的队列参数,则您的队列选项可以是这样的

queue_options: {name: 'upload-picture', arguments: {'x-ha-policy': ['S', 'all']}}

另一个具有 20 秒消息 TTL 的示例

queue_options: {name: 'upload-picture', arguments: {'x-message-ttl': ['I', 20000]}}

参数值必须是一个数据类型和值的列表。有效数据类型包括

  • S - 字符串
  • I - 整数
  • D - 十进制
  • T - 时间戳
  • F - 表
  • A - 数组

根据您的需求调整 arguments

如果您想将队列绑定到特定的路由键,您可以在生产者或消费者配置中声明它。

queue_options:
    name: "upload-picture"
    routing_keys:
      - 'android.#.upload'
      - 'iphone.upload'

重要提醒 - 懒连接

在Symfony环境中,所有服务在每个请求中都是完全启动的,从版本 >= 2.3 开始,您可以声明一个懒服务(Lazy Services)。这个包目前还不支持新的懒服务功能,但您可以在连接配置中设置 lazy: true 以避免在每个请求中不必要的连接到您的消息代理。出于性能原因,强烈建议使用懒连接,尽管默认情况下禁用了懒连接选项,以避免破坏已使用此包的应用程序。

重要提醒 - 心跳

read_write_timeout 设置为心跳的2倍,这样您的套接字就会保持打开状态。如果您不这样做,或者使用不同的乘数,存在风险,消费者套接字将超时。

生产者、消费者,什么是?

在消息传递应用程序中,将消息发送到代理的过程称为生产者,而接收这些消息的过程称为消费者。在您的应用程序中,您将拥有几个它们,您可以在配置的相应条目下列出它们。

生产者

生产者将用于将消息发送到服务器。在AMQP模型中,消息是发送到交换的,这意味着在生产者的配置中,您必须指定连接选项以及交换选项,通常将是交换的名字和类型。

现在假设您想后台处理图片上传。在将图片移动到最终位置后,您将向服务器发布以下信息的消息

public function indexAction($name)
{
    $msg = array('user_id' => 1235, 'image_path' => '/path/to/new/pic.png');
    $this->get('old_sound_rabbit_mq.upload_picture_producer')->publish(serialize($msg));
}

如您所见,如果您的配置中有一个名为 upload_picture 的生产者,那么在服务容器中,您将有一个名为 old_sound_rabbit_mq.upload_picture_producer 的服务。

除了消息本身外,OldSound\RabbitMqBundle\RabbitMq\Producer#publish() 方法还接受一个可选的路由键参数和一个可选的附加属性数组。附加属性数组允许您更改默认构造 PhpAmqpLib\Message\AMQPMessage 对象时使用的属性。这样,例如,您可以更改应用程序的头部。

您可以使用 setContentTypesetDeliveryMode 方法分别设置消息的内容类型和消息的投递模式。默认值是内容类型为 text/plain 和投递模式为 2

$this->get('old_sound_rabbit_mq.upload_picture_producer')->setContentType('application/json');

如果您需要为生产者使用一个自定义类(该类应继承自 OldSound\RabbitMqBundle\RabbitMq\Producer),您可以使用 class 选项

    ...
    producers:
        upload_picture:
            class: My\Custom\Producer
            connection: default
            exchange_options: {name: 'upload-picture', type: direct}
    ...

下一个难题是要有一个消费者,它将从队列中取出消息并相应地处理它。

消费者

消费者将连接到服务器,并启动一个等待处理传入消息的循环。根据为该消费者指定的回调,将决定其行为。让我们回顾上面的消费者配置

consumers:
    upload_picture:
        connection:       default
        exchange_options: {name: 'upload-picture', type: direct}
        queue_options:    {name: 'upload-picture'}
        callback:         upload_picture_service

正如我们所看到的,回调 选项引用了 upload_picture_service。当消费者从服务器接收到消息时,它将执行这样的回调。如果您需要为测试或调试目的指定不同的回调,则可以更改它。

除了回调之外,我们还指定了要使用的连接方式,就像我们为生产者做的那样。其余选项是 exchange_optionsqueue_optionsexchange_options 应该与用于 生产者 的相同。在 queue_options 中,我们将提供 队列名称。为什么?

正如我们所说的,AMQP中的消息是发布到交换器的。这并不意味着消息已经到达队列。为了实现这一点,首先我们需要创建这样的队列,然后将其绑定到交换器上。酷的地方在于,你可以将多个队列绑定到一个交换器上,这样一条消息就可以到达多个目的地。这种方法的优点是解耦了生产者和消费者。生产者不关心有多少消费者会处理他的消息。他只需要确保消息到达服务器。这样我们就可以在不需要更改控制器中的代码的情况下,扩展每次上传图片时执行的操作。

现在,如何运行消费者?有一个命令可以执行,如下所示

$ ./app/console rabbitmq:consumer -m 50 upload_picture

这意味着什么?我们正在执行upload_picture消费者,并告诉它只消费50条消息。每次消费者从服务器接收到消息时,它都会执行配置的回调,并将AMQP消息作为PhpAmqpLib\Message\AMQPMessage类的实例传递。消息体可以通过调用$msg->body来获取。默认情况下,消费者将以无限循环的方式处理消息,这里的无限有特定的定义。

如果你想确保消费者在接收到Unix信号时立即执行完毕,你可以使用带有标志-w的命令。

$ ./app/console rabbitmq:consumer -w upload_picture

然后消费者将立即执行完毕。

要使用带有此标志的命令,你需要安装带有PCNTL扩展的PHP。

如果你想设置消费者内存限制,可以使用标志-l。在下面的示例中,此标志增加了256MB的内存限制。消费者将在达到256MB前的5MB处停止,以避免PHP内存大小错误。

$ ./app/console rabbitmq:consumer -l 256

如果你想删除队列中等待的所有消息,可以执行此命令来清空队列

$ ./app/console rabbitmq:purge --no-confirmation upload_picture

要删除消费者的队列,使用此命令

$ ./app/console rabbitmq:delete --no-confirmation upload_picture

空闲超时

如果你需要在一段时间内没有消息来自你的队列时设置超时,你可以设置idle_timeout(以秒为单位)

consumers:
    upload_picture:
        connection:       default
        exchange_options: {name: 'upload-picture', type: direct}
        queue_options:    {name: 'upload-picture'}
        callback:         upload_picture_service
        idle_timeout:     60

公平调度

你可能已经注意到,调度仍然没有完全按我们的意愿工作。例如,在有两个工作者的情况下,当所有奇数消息都很重而偶数消息都很轻时,一个工作者将一直忙碌,而另一个工作者几乎不做任何工作。然而,RabbitMQ对此一无所知,仍然会平均分配消息。

这是因为RabbitMQ只在消息进入队列时调度消息。它不会查看消费者未确认消息的数量。它只是盲目地将每n条消息分发给第n个消费者。

为了解决这个问题,我们可以使用带有prefetch_count=1设置的basic.qos方法。这告诉RabbitMQ一次不要给工作者发送超过一条消息。换句话说,在工作者处理并确认了前一条消息之前,不要向工作者发送新消息。相反,它会将消息发送给下一个不忙碌的工作者。

来源:https://rabbitmq.cn/tutorials/tutorial-two-python.html

请注意,实现公平调度会引入延迟,这会影响性能(请参阅这篇博客)。但是,实现它允许你在队列增加时动态地水平扩展。你应该根据每条消息的处理时间和网络性能,相应地评估prefetch_size的正确值。

使用RabbitMqBundle,你可以这样为每个消费者配置qos_options

consumers:
    upload_picture:
        connection:       default
        exchange_options: {name: 'upload-picture', type: direct}
        queue_options:    {name: 'upload-picture'}
        callback:         upload_picture_service
        qos_options:      {prefetch_size: 0, prefetch_count: 1, global: false}

回调

以下是一个回调示例

<?php

//src/Acme/DemoBundle/Consumer/UploadPictureConsumer.php

namespace Acme\DemoBundle\Consumer;

use OldSound\RabbitMqBundle\RabbitMq\ConsumerInterface;
use PhpAmqpLib\Message\AMQPMessage;

class UploadPictureConsumer implements ConsumerInterface
{
    public function execute(AMQPMessage $msg)
    {
        //Process picture upload.
        //$msg will be an instance of `PhpAmqpLib\Message\AMQPMessage` with the $msg->body being the data sent over RabbitMQ.

        $isUploadSuccess = someUploadPictureMethod();
        if (!$isUploadSuccess) {
            // If your image upload failed due to a temporary error you can return false
            // from your callback so the message will be rejected by the consumer and
            // requeued by RabbitMQ.
            // Any other value not equal to false will acknowledge the message and remove it
            // from the queue
            return false;
        }
    }
}

如你所见,这就像实现一个方法:ConsumerInterface::execute一样简单。

请注意,您的回调函数需要作为正常的Symfony2服务进行注册。在那里,您可以注入服务容器、数据库服务、Symfony记录器等。

有关消息实例组成部分的更多详细信息,请参阅https://github.com/php-amqplib/php-amqplib/blob/master/doc/AMQPMessage.md

总结

仅发送消息就需要做很多工作,让我们回顾一下,以便有更好的概览。这是我们需要生成/消费消息的内容

  • 在配置中为消费者/生产者添加条目。
  • 实现您的回调函数。
  • 从命令行启动消费者。
  • 将发布消息的代码添加到控制器中。

就是这样!

RPC或回复/响应

到目前为止,我们只是向消费者发送消息,但如果我们想从他们那里获得回复怎么办?为了实现这一点,我们必须在我们的应用程序中实现RPC调用。这个包使得在Symfony2中实现这样的功能变得非常容易。

让我们在配置中添加RPC客户端和服务器。

rpc_clients:
    integer_store:
        connection: default
        unserializer: json_decode
rpc_servers:
    random_int:
        connection: default
        callback:   random_int_server
        qos_options: {prefetch_size: 0, prefetch_count: 1, global: false}
        exchange_options: {name: random_int, type: topic}
        queue_options: {name: random_int_queue, durable: false, auto_delete: true}
        serializer: json_encode

有关完整的配置参考,请使用php app/console config:dump-reference old_sound_rabbit_mq命令。

这里有一个非常有用的服务器:它向其客户端返回随机整数。用于处理请求的回调函数将是random_int_server服务。现在让我们看看如何从我们的控制器中调用它。

首先,我们必须从命令行启动服务器

$ ./app/console_dev rabbitmq:rpc-server random_int

然后,将以下代码添加到我们的控制器中

public function indexAction($name)
{
    $client = $this->get('old_sound_rabbit_mq.integer_store_rpc');
    $client->addRequest(serialize(array('min' => 0, 'max' => 10)), 'random_int', 'request_id');
    $replies = $client->getReplies();
}

如你所见,如果我们的客户端ID是integer_store,那么服务名称将是old_sound_rabbit_mq.integer_store_rpc。一旦我们获得这个对象,我们通过调用期望三个参数的addRequest方法向服务器发送请求

  • 要发送到远程过程调用的参数。
  • RPC服务器名称,在我们的情况下为random_int
  • 我们调用的请求标识符,在这种情况下为request_id

我们要发送的参数是rand()函数的minmax值。我们通过序列化数组来发送它们。如果我们的服务器期望JSON信息或XML,我们将在这里发送这样的数据。

最后一部分是获取回复。我们的PHP脚本将阻塞,直到服务器返回一个值。变量$replies将是一个关联数组,其中每个来自服务器的回复都将包含在相应的request_id键中。

默认情况下,RPC客户端期望响应被序列化。如果服务器返回非序列化的结果,则将RPC客户端的expect_serialized_response选项设置为false。例如,如果integer_store服务器没有序列化结果,客户端将如下设置

rpc_clients:
    integer_store:
        connection: default
        expect_serialized_response: false

您还可以为请求设置秒数的过期时间,在此之后,服务器将不再处理消息,客户端请求将简单地超时。设置消息过期时间仅适用于RabbitMQ 3.x及更高版本。有关更多信息,请访问https://rabbitmq.cn/ttl.html#per-message-ttl

public function indexAction($name)
{
    $expiration = 5; // seconds
    $client = $this->get('old_sound_rabbit_mq.integer_store_rpc');
    $client->addRequest($body, $server, $requestId, $routingKey, $expiration);
    try {
        $replies = $client->getReplies();
        // process $replies['request_id'];
    } catch (\PhpAmqpLib\Exception\AMQPTimeoutException $e) {
        // handle timeout
    }
}

正如你所猜测的,我们也可以进行并行RPC调用

并行RPC

假设为了渲染某些网页,你需要执行两个数据库查询,一个需要5秒完成,另一个需要2秒——非常昂贵的查询。如果你按顺序执行它们,那么你的页面将在大约7秒后准备好交付。如果你并行运行它们,那么你将在大约5秒内提供服务。使用RabbitMqBundle,我们可以轻松地执行此类并行调用。让我们在配置中定义一个并行客户端和另一个RPC服务器

rpc_clients:
    parallel:
        connection: default
rpc_servers:
    char_count:
        connection: default
        callback:   char_count_server
    random_int:
        connection: default
        callback:   random_int_server

然后,此代码应放在我们的控制器中

public function indexAction($name)
{
    $client = $this->get('old_sound_rabbit_mq.parallel_rpc');
    $client->addRequest($name, 'char_count', 'char_count');
    $client->addRequest(serialize(array('min' => 0, 'max' => 10)), 'random_int', 'random_int');
    $replies = $client->getReplies();
}

与上一个例子非常相似,我们只是多调用了一次 addRequest。我们还提供了有意义的请求标识符,这样我们以后在 $replies 数组中找到我们想要的回复就会更容易。

多个消费者

为逻辑分离创建很多队列是一种好的实践。使用简单的消费者,你需要为每个队列创建一个工作进程(消费者),当处理许多演变时(忘记在 supervisord 配置中添加一行?)可能会很困难。这也适用于小队列,因为你可能不想有与队列数量一样多的工作者,并且希望将一些任务重新组合在一起,同时不失灵活性和分离原则。

多个消费者允许你在同一个消费者上监听多个队列来处理这种情况。

以下是设置具有多个队列的消费者方法

multiple_consumers:
    upload:
        connection:       default
        exchange_options: {name: 'upload', type: direct}
        queues_provider: queues_provider_service
        queues:
            upload-picture:
                name:     upload_picture
                callback: upload_picture_service
                routing_keys:
                    - picture
            upload-video:
                name:     upload_video
                callback: upload_video_service
                routing_keys:
                    - video
            upload-stats:
                name:     upload_stats
                callback: upload_stats

回调现在指定在每个队列下,必须像简单消费者一样实现 ConsumerInterface。消费者中所有 queues-options 的选项都适用于每个队列。

请注意,所有队列都在同一个交换机下,设置回调的正确路由取决于你。

queues_provider 是一个可选的服务,它可以动态提供队列。它必须实现 QueuesProviderInterface

请注意,队列提供者负责对 setDequeuer 进行适当的调用,并且回调是可调用的(不是 ConsumerInterface)。如果提供队列的服务实现了 DequeuerAwareInterface,则会添加一个对 setDequeuer 的调用,该调用带有当前为 MultipleConsumerDequeuerInterface

动态消费者

有时你必须动态地更改消费者的配置。动态消费者允许你根据上下文以编程方式定义消费者队列选项。

例如,在一个场景中,定义的消费者必须负责动态数量的主题,并且你不想(或不能)每次都更改其配置。

定义一个实现 QueueOptionsProviderInterface 的服务 queue_options_provider,并将其添加到你的 dynamic_consumers 配置中。

dynamic_consumers:
    proc_logs:
        connection: default
        exchange_options: {name: 'logs', type: topic}
        callback: parse_logs_service
        queue_options_provider: queue_options_provider_service

示例用法

$ ./app/console rabbitmq:dynamic-consumer proc_logs server1

在这种情况下,proc_logs 消费者为 server1 运行,并且它可以决定它使用的队列选项。

匿名消费者

现在,我们为什么要使用匿名消费者呢?这听起来像某种互联网威胁或类似的东西……继续阅读。

在 AMQP 中,有一种名为 topic 的交换类型,消息根据——你猜对了——消息的主题被路由到队列。我们可以将有关我们应用程序的日志发送到使用主题的 RabbitMQ 主题交换机,该主题是创建日志的主机名和日志的严重性。消息体将是日志内容,我们的路由键将如下所示

  • server1.error
  • server2.info
  • server1.warning
  • ...

由于我们不希望用无限量的日志填满队列,所以当我们想要监控系统时,我们可以启动一个消费者,该消费者创建一个队列并将其绑定到基于某些主题的 logs 交换机,例如,我们希望看到我们的服务器报告的所有错误。路由键将类似于:#.error。在这种情况下,我们必须想出一个队列名称,将其绑定到交换机,获取日志,解绑它并删除队列。幸运的是,AMQP 提供了一种自动完成此操作的方法,前提是在声明和绑定队列时提供正确的选项。问题是,你不希望记住所有这些选项。因此,我们实现了 匿名消费者 模式。

当我们启动一个匿名消费者时,它会处理这些细节,我们只需考虑实现当消息到达时的回调。为什么叫匿名消费者,因为它不会指定队列名称,但会等待RabbitMQ给它分配一个随机的名称。

那么,如何配置和运行这样的消费者呢?

anon_consumers:
    logs_watcher:
        connection:       default
        exchange_options: {name: 'app-logs', type: topic}
        callback:         logs_watcher

在这里,我们指定了交换机名称和类型,以及当消息到达时应执行哪个回调。

现在,这个匿名消费者可以监听与相同交换机链接的、类型为topic的生产者。

    producers:
        app_logs:
            connection:       default
            exchange_options: {name: 'app-logs', type: topic}

要启动一个匿名消费者,我们使用以下命令

$ ./app/console_dev rabbitmq:anon-consumer -m 5 -r '#.error' logs_watcher

与之前看到的命令相比,唯一的新选项是指定路由键-r '#.error'

STDIN 生产者

有一个命令可以从STDIN读取数据并将其发布到RabbitMQ队列。要使用它,首先需要在配置文件中配置一个producer服务,如下所示

producers:
    words:
      connection:       default
      exchange_options: {name: 'words', type: direct}

该生产者将消息发布到words直接交换机。当然,你可以根据需要调整配置。

然后假设你想要发布一些XML文件的内容,以便由消费者农场处理。你可以使用如下命令来发布它们

$ find vendor/symfony/ -name "*.xml" -print0 | xargs -0 cat | ./app/console rabbitmq:stdin-producer words

这意味着你可以使用普通的Unix命令组合生产者。

让我们分解这一行

$ find vendor/symfony/ -name "*.xml" -print0

该命令将在symfony文件夹内找到所有的.xml文件,并打印出文件名。然后,每个文件名通过xargs传递给cat

$ xargs -0 cat

最后,cat的输出直接传递给我们的生产者,该生产者以如下方式调用

$ ./app/console rabbitmq:stdin-producer words

它只接受一个参数,即你在config.yml文件中配置的生产者名称。

其他命令

设置RabbitMQ结构

该捆绑包的目的是让您的应用程序产生消息并将它们发布到您配置的某些交换机。

在某些情况下,即使配置正确,您产生的消息也不会路由到任何队列,因为没有队列存在。负责队列消费的消费者必须运行才能创建队列。

当消费者数量很高时,为每个消费者启动一个命令可能是一个噩梦。

为了一次性创建交换机、队列和绑定,并确保不会丢失任何消息,您可以运行以下命令

$ ./app/console rabbitmq:setup-fabric

当需要时,您可以配置您的消费者和生产者,使它们假定RabbitMQ结构已经定义。为此,请将以下内容添加到您的配置中

producers:
    upload_picture:
      auto_setup_fabric: false
consumers:
    upload_picture:
      auto_setup_fabric: false

默认情况下,消费者或生产者在启动时会声明它需要的所有内容。请小心使用此功能,当交换机或队列未定义时,将出现错误。当您更改任何配置时,您需要运行上述setup-fabric命令来声明您的配置。

如何贡献

要贡献,只需提交一个包含您新代码的Pull Request,注意如果您添加了新功能或修改了现有功能,您必须在本README中说明它们的功能。如果您破坏了BC,您也必须记录它。此外,您还必须更新CHANGELOG。所以

  • 记录新功能。
  • 更新CHANGELOG。
  • 记录BC破坏性更改。

许可证

见:resources/meta/LICENSE.md

鸣谢

该捆绑包的结构和文档部分基于RedisBundle