josemaestre/rabbitmq-bundle

以前为 oldsound/rabbitmq-bundle。集成了 php-amqplib、Symfony2 和 RabbitMq

安装: 11

依赖者: 1

建议者: 0

安全: 0

星级: 0

关注者: 1

分支: 469

类型:symfony-bundle

v1.8.0 2015-12-11 17:57 UTC

This package is auto-updated.

Last update: 2024-09-20 19:19:54 UTC


README

关于

RabbitMqBundle 通过 RabbitMQ 使用 php-amqplib 库将消息集成到您的应用程序中。

该组件实现了 Thumper 库中看到的几个消息模式。因此,从 Symfony2 控制器向 RabbitMQ 发布消息就像这样

$msg = array('user_id' => 1235, 'image_path' => '/path/to/new/pic.png');
$this->get('old_sound_rabbit_mq.upload_picture_producer')->publish(serialize($msg));

稍后当您想要从 upload_pictures 队列中消费 50 条消息时,只需在 CLI 上运行

$ ./app/console rabbitmq:consumer -m 50 upload_picture

所有示例都假定有一个正在运行的 RabbitMQ 服务器。

此组件在 Symfony Live Paris 2011 会议中展出。查看幻灯片 这里

Build Status

安装

对于 Symfony 框架 >= 2.3

使用 composer 需求组件及其依赖项

$ composer require php-amqplib/rabbitmq-bundle

注册组件

// app/AppKernel.php

public function registerBundles()
{
    $bundles = array(
        new OldSound\RabbitMqBundle\OldSoundRabbitMqBundle(),
    );
}

享受!

对于使用 Symfony Console、依赖注入和配置组件的控制台应用程序

如果您有一个用于运行 RabbitMQ 消费者的控制台应用程序,则不需要 Symfony HttpKernel 和 FrameworkBundle。从版本 1.6 开始,您可以使用依赖注入组件来加载此组件配置和服务,然后使用消费者命令。

在您的 composer.json 文件中要求组件

{
    "require": {
        "php-amqplib/rabbitmq-bundle": "~1.6",
    }
}

注册扩展和编译器传递

use OldSound\RabbitMqBundle\DependencyInjection\OldSoundRabbitMqExtension;
use OldSound\RabbitMqBundle\DependencyInjection\Compiler\RegisterPartsPass;

// ...

$containerBuilder->registerExtension(new OldSoundRabbitMqExtension());
$containerBuilder->addCompilerPass(new RegisterPartsPass());

警告 - 兼容性破坏更改

  • 自 2012-06-04 以来,"producers" 配置部分中声明的交换机的某些默认选项已更改,以匹配 "consumers" 部分中声明的交换机的默认值。受影响的设置是

    • durablefalse 更改为 true
    • auto_deletetrue 更改为 false

    如果您依赖于以前的默认值,则必须更新您的配置。

  • 自 2012-04-24 以来,ConsumerInterface::execute 方法签名已更改

  • 自 2012-01-03 以来,消费者执行方法获取整个 AMQP 消息对象,而不仅仅是正文。有关更多详细信息,请参阅 CHANGELOG 文件。

用法

在您的配置文件中添加 old_sound_rabbit_mq 部分

old_sound_rabbit_mq:
    connections:
        default:
            host:     'localhost'
            port:     5672
            user:     'guest'
            password: 'guest'
            vhost:    '/'
            lazy:     false
            connection_timeout: 3
            read_write_timeout: 3

            # requires php-amqplib v2.4.1+ and PHP5.4+
            keepalive: false

            # requires php-amqplib v2.4.1+
            heartbeat: 0
    producers:
        upload_picture:
            connection:       default
            exchange_options: {name: 'upload-picture', type: direct}
    consumers:
        upload_picture:
            connection:       default
            exchange_options: {name: 'upload-picture', type: direct}
            queue_options:    {name: 'upload-picture'}
            callback:         upload_picture_service

在这里,我们配置连接服务和我们应用程序将拥有的消息端点。在这个例子中,您的服务容器将包含服务 old_sound_rabbit_mq.upload_picture_producerold_sound_rabbit_mq.upload_picture_consumer。后者期望存在一个名为 upload_picture_service 的服务。

如果您没有指定客户端的连接,客户端将寻找具有相同别名的连接。所以对于我们的 upload_picture,服务容器将寻找一个 upload_picture 连接。

如果您需要添加可选的队列参数,则您的队列选项可以是这样的

queue_options: {name: 'upload-picture', arguments: {'x-ha-policy': ['S', 'all']}}

另一个示例,具有 20 秒的消息 TTL

queue_options: {name: 'upload-picture', arguments: {'x-message-ttl': ['I', 20000]}}

参数值必须是一个数据类型和值的列表。有效数据类型是

  • S - 字符串
  • I - 整数
  • D - 小数
  • T - 时间戳
  • F - 表格
  • A - 数组

根据您的需求调整 arguments

如果您想将队列与特定的路由键绑定,您可以在生产者或消费者配置中声明它

queue_options:
    name: "upload-picture"
    routing_keys:
      - 'android.#.upload'
      - 'iphone.upload'

重要提示 - 懒连接

在 Symfony 环境中,所有服务在每个请求时都会完全启动,从版本 >= 2.3 开始,您可以声明一个服务为延迟加载(延迟加载服务)。这个包仍然不支持新的延迟加载服务功能,但您可以在连接配置中设置 lazy: true 来避免在每个请求中不必要地连接到您的消息代理。出于性能考虑,强烈建议使用延迟连接,尽管默认情况下禁用了延迟选项,以避免影响已使用此包的应用程序。

导入注意事项 - 心跳

read_write_timeout 设置为心跳的两倍是一个好主意,这样您的套接字就会保持开启状态。如果您不这样做,或者使用不同的乘数,存在 消费者 套接字超时的风险。

生产者、消费者,什么是?

在消息应用程序中,将消息发送到代理的过程称为 生产者,而接收这些消息的过程称为 消费者。在您的应用程序中,您将拥有多个它们,您可以在配置中的相应条目下列出它们。

生产者

生产者将用于向服务器发送消息。在 AMQP 模型中,消息被发送到 交换机,这意味着在生产者的配置中,您将必须指定连接选项以及交换机选项,这通常将是交换机的名称和类型。

现在假设您想在后台处理图片上传。在将图片移动到其最终位置后,您将向服务器发布以下信息的消息

public function indexAction($name)
{
    $msg = array('user_id' => 1235, 'image_path' => '/path/to/new/pic.png');
    $this->get('old_sound_rabbit_mq.upload_picture_producer')->publish(serialize($msg));
}

如您所见,如果您的配置中有一个名为 upload_picture 的生产者,那么在服务容器中,您将有一个名为 old_sound_rabbit_mq.upload_picture_producer 的服务。

除了消息本身之外,OldSound\RabbitMqBundle\RabbitMq\Producer#publish() 方法还接受一个可选的路由键参数和一个可选的额外属性数组。额外属性数组允许您更改默认情况下通过 PhpAmqpLib\Message\AMQPMessage 对象构造的属性。这样,例如,您可以更改应用程序头信息。

您可以使用 setContentTypesetDeliveryMode 方法分别设置消息内容类型和消息投递模式。默认值是内容类型为 text/plain,投递模式为 2

$this->get('old_sound_rabbit_mq.upload_picture_producer')->setContentType('application/json');

如果您需要为生产者使用自定义类(应继承自 OldSound\RabbitMqBundle\RabbitMq\Producer),则可以使用 class 选项

    ...
    producers:
        upload_picture:
            class: My\Custom\Producer
            connection: default
            exchange_options: {name: 'upload-picture', type: direct}
    ...

拼图中下一部分是要有一个消费者,它会从队列中取出消息并相应地处理它。

消费者

消费者将连接到服务器,并开始一个 循环,等待传入的消息进行处理。根据指定的消费者 回调,其行为将有所不同。让我们回顾一下上面的消费者配置

consumers:
    upload_picture:
        connection:       default
        exchange_options: {name: 'upload-picture', type: direct}
        queue_options:    {name: 'upload-picture'}
        callback:         upload_picture_service

正如我们所看到的,callback 选项引用了 upload_picture_service。当消费者从服务器接收到消息时,它将执行该回调。如果您需要指定不同的回调以进行测试或调试,则可以更改它。

除了回调之外,我们还指定了要使用的连接,方式与使用 生产者 时相同。其余选项是 exchange_optionsqueue_optionsexchange_options 应与用于 生产者 的相同。在 queue_options 中,我们将提供 队列名称。为什么?

正如我们所言,AMQP中的消息是发布到交换机的。这并不意味着消息已经到达队列。为了实现这一点,我们首先需要创建这样一个队列,然后将其绑定到交换机。酷的地方在于,您可以绑定多个队列到一个交换机,这样一条消息就可以到达多个目的地。这种方法的优点是实现生产者和消费者的解耦。生产者不关心将有多少消费者处理他的消息。它所需要的只是他的消息到达服务器。这样我们就可以在不需要更改控制器代码的情况下,扩展每次图片上传时执行的操作。

现在,如何运行消费者?有一个命令可以执行,如下所示

$ ./app/console rabbitmq:consumer -m 50 upload_picture

这意味着什么?我们正在执行upload_picture消费者,并指示它只消费50条消息。每次消费者从服务器接收消息时,它都会执行配置的回调,将AMQP消息作为PhpAmqpLib\Message\AMQPMessage类的实例传递。可以通过调用$msg->body来获取消息体。默认情况下,消费者将以无限循环的方式处理消息,这里的无限有一些定义。

如果您想确保消费者在Unix信号到来时立即执行完毕,可以使用带有标志-w的命令。

$ ./app/console rabbitmq:consumer -w upload_picture

然后消费者将立即执行完毕。

要使用带有此标志的命令,您需要安装带有PCNTL扩展的PHP。

如果您想设置消费者内存限制,可以使用标志-l。在下面的示例中,此标志添加了256MB的内存限制。消费者将在达到256MB前停止5MB,以避免PHP允许的内存大小错误。

$ ./app/console rabbitmq:consumer -l 256

如果您想删除队列中所有等待的消息,可以执行此命令来清空此队列

$ ./app/console rabbitmq:purge --no-confirmation upload_picture

要删除消费者的队列,使用此命令

$ ./app/console rabbitmq:delete --no-confirmation upload_picture

空闲超时

如果您需要在一段时间内没有来自队列的消息时设置超时,可以设置idle_timeout(以秒为单位)

consumers:
    upload_picture:
        connection:       default
        exchange_options: {name: 'upload-picture', type: direct}
        queue_options:    {name: 'upload-picture'}
        callback:         upload_picture_service
        idle_timeout:     60

公平分发

您可能已经注意到,分发仍然没有完全按我们的预期工作。例如,在有两个工人的情况下,当所有奇数消息都很重而偶数消息都很轻时,一个工人会一直很忙,而另一个工人几乎不做任何工作。RabbitMQ对此一无所知,仍然会平均分发消息。

这是因为RabbitMQ只在消息进入队列时分发消息。它不会查看消费者未确认的消息数量。它只是盲目地将每n条消息分发给第n个消费者。

为了克服这一点,我们可以使用带有prefetch_count=1设置的basic.qos方法。这告诉RabbitMQ一次不要给一个工人发送多于一条消息。换句话说,在工人处理并确认了之前的消息之前,不要给工人发送新消息。相反,它会将其分发给下一个不忙碌的工人。

来源:https://rabbitmq.cn/tutorials/tutorial-two-python.html

请注意,实现公平分发会引入延迟,这会影响性能(请参阅这篇博客文章)。但实现它允许您在队列增加时动态水平扩展。您应该像博客文章建议的那样,根据处理每条消息所需的时间和您的网络性能来评估prefetch_size的正确值。

使用RabbitMqBundle,您可以按以下方式为每个消费者配置qos_options

consumers:
    upload_picture:
        connection:       default
        exchange_options: {name: 'upload-picture', type: direct}
        queue_options:    {name: 'upload-picture'}
        callback:         upload_picture_service
        qos_options:      {prefetch_size: 0, prefetch_count: 1, global: false}

回调

以下是一个回调示例

<?php

//src/Acme/DemoBundle/Consumer/UploadPictureConsumer.php

namespace Acme\DemoBundle\Consumer;

use OldSound\RabbitMqBundle\RabbitMq\ConsumerInterface;
use PhpAmqpLib\Message\AMQPMessage;

class UploadPictureConsumer implements ConsumerInterface
{
    public function execute(AMQPMessage $msg)
    {
        //Process picture upload.
        //$msg will be an instance of `PhpAmqpLib\Message\AMQPMessage` with the $msg->body being the data sent over RabbitMQ.

        $isUploadSuccess = someUploadPictureMethod();
        if (!$isUploadSuccess) {
            // If your image upload failed due to a temporary error you can return false
            // from your callback so the message will be rejected by the consumer and
            // requeued by RabbitMQ.
            // Any other value not equal to false will acknowledge the message and remove it
            // from the queue
            return false;
        }
    }
}

如您所见,这非常简单,只需实现一个方法:ConsumerInterface::execute

请注意,您的回调必须作为正常Symfony2服务进行注册。在那里您可以注入服务容器、数据库服务、Symfony记录器等。

有关消息实例的详细信息,请参阅https://github.com/php-amqplib/php-amqplib/blob/master/doc/AMQPMessage.md

摘要

仅仅为了发送消息似乎要做很多工作,让我们回顾一下,以便更好地了解我们需要什么来生产/消费消息。

  • 在配置中添加消费者/生产者的条目。
  • 实现您的回调。
  • 从命令行启动消费者。
  • 将发布消息的代码添加到控制器中。

就这样!

RPC或回复/响应

到目前为止,我们只是向消费者发送了消息,但如果我们想从他们那里获取回复呢?为了实现这一点,我们必须在我们的应用程序中实现RPC调用。这个包使得使用Symfony2实现此类事情变得非常简单。

让我们将RPC客户端和服务器添加到配置中。

rpc_clients:
    integer_store:
        connection: default
        unserializer: json_decode
rpc_servers:
    random_int:
        connection: default
        callback:   random_int_server
        qos_options: {prefetch_size: 0, prefetch_count: 1, global: false}
        exchange_options: {name: random_int, type: topic}
        queue_options: {name: random_int_queue, durable: false, auto_delete: true}
        serializer: json_encode

有关完整配置参考,请使用php app/console config:dump-reference old_sound_rabbit_mq命令。

这里有一个非常有用的服务器:它向其客户端返回随机整数。用于处理请求的回调将是random_int_server服务。现在让我们看看如何从我们的控制器中调用它。

首先,我们必须从命令行启动服务器

$ ./app/console_dev rabbitmq:rpc-server random_int

然后,将以下代码添加到我们的控制器中

public function indexAction($name)
{
    $client = $this->get('old_sound_rabbit_mq.integer_store_rpc');
    $client->addRequest(serialize(array('min' => 0, 'max' => 10)), 'random_int', 'request_id');
    $replies = $client->getReplies();
}

如您所见,如果我们的客户端ID是integer_store,那么服务名称将是old_sound_rabbit_mq.integer_store_rpc。一旦我们得到这个对象,我们就通过调用addRequest来向服务器发送请求,它期望三个参数

  • 要发送给远程过程调用的参数。
  • RPC服务器的名称,在我们的例子中是random_int
  • 我们调用的请求标识符,在这个例子中是request_id

我们发送的参数是rand()函数的minmax值。我们通过序列化一个数组来发送它们。如果我们的服务器期望JSON信息或XML,我们将在这里发送此类数据。

最后一部分是获取回复。我们的PHP脚本将阻塞,直到服务器返回一个值。变量$replies将是一个关联数组,其中每个来自服务器的回复都包含在相应的request_id键中。

默认情况下,RPC客户端期望响应被序列化。如果您的工作服务器返回非序列化的结果,则将RPC客户端的expect_serialized_response选项设置为false。例如,如果integer_store服务器没有序列化结果,则客户端将设置如下

rpc_clients:
    integer_store:
        connection: default
        expect_serialized_response: false

您还可以为请求设置秒数过期时间,在此之后,服务器将不再处理消息,客户端请求将简单地超时。消息的过期设置仅适用于RabbitMQ 3.x及更高版本。有关更多信息,请访问https://rabbitmq.cn/ttl.html#per-message-ttl

public function indexAction($name)
{
    $expiration = 5; // seconds
    $client = $this->get('old_sound_rabbit_mq.integer_store_rpc');
    $client->addRequest($body, $server, $requestId, $routingKey, $expiration);
    try {
        $replies = $client->getReplies();
        // process $replies['request_id'];
    } catch (\PhpAmqpLib\Exception\AMQPTimeoutException $e) {
        // handle timeout
    }
}

如你所料,我们也可以进行并行RPC调用

并行RPC

假设在渲染某个网页时,您需要进行两次数据库查询,一次需要5秒,另一次需要2秒——非常昂贵的查询。如果您按顺序执行它们,那么您的页面将在大约7秒后准备好交付。如果您并行运行它们,那么您将在大约5秒内得到您的页面。使用RabbitMqBundle,我们可以轻松地执行这样的并行调用。让我们在配置中定义一个并行客户端,另一个RPC服务器。

rpc_clients:
    parallel:
        connection: default
rpc_servers:
    char_count:
        connection: default
        callback:   char_count_server
    random_int:
        connection: default
        callback:   random_int_server

然后这段代码应该放在我们的控制器中。

public function indexAction($name)
{
    $client = $this->get('old_sound_rabbit_mq.parallel_rpc');
    $client->addRequest($name, 'char_count', 'char_count');
    $client->addRequest(serialize(array('min' => 0, 'max' => 10)), 'random_int', 'random_int');
    $replies = $client->getReplies();
}

与上一个例子非常相似,我们只是多了一个 addRequest 调用。我们还提供了有意义的请求标识符,这样以后在 $replies 数组中找到我们想要的回复就会容易得多。

多个消费者

拥有许多队列以实现逻辑分离是一种很好的做法。使用简单的消费者,您必须为每个队列创建一个工作者(消费者),在处理许多演变(忘记在您的supervisord配置中添加一行)时可能会很困难。这对于小型队列也很有用,您可能不想有与队列一样多的工作者,并希望将一些任务分组在一起,同时不失灵活性和分离原则。

多个消费者允许您在同一个消费者上监听多个队列。

以下是如何设置具有多个队列的消费者。

multiple_consumers:
    upload:
        connection:       default
        exchange_options: {name: 'upload', type: direct}
        queues_provider: queues_provider_service
        queues:
            upload-picture:
                name:     upload_picture
                callback: upload_picture_service
                routing_keys:
                    - picture
            upload-video:
                name:     upload_video
                callback: upload_video_service
                routing_keys:
                    - video
            upload-stats:
                name:     upload_stats
                callback: upload_stats

回调现在指定在每个队列下,必须像简单消费者一样实现 ConsumerInterface。消费者中 queues-options 的所有选项都适用于每个队列。

请注意,所有队列都在同一个交换机下,设置正确的回调路由取决于您。

queues_provider 是一个可选的服务,它动态提供队列。它必须实现 QueuesProviderInterface

请注意,队列提供者负责对 setDequeuer 进行适当的调用,并且回调是可调用的(不是 ConsumerInterface)。如果提供队列的服务实现了 DequeuerAwareInterface,则会在服务的定义中添加对 setDequeuer 的调用,其中 DequeuerInterface 当前是一个 MultipleConsumer

动态消费者

有时您必须动态更改消费者的配置。动态消费者允许您根据上下文以编程方式定义消费者队列选项。

例如,在定义的消费者必须负责动态数量的主题,并且您不想(或不能)每次都更改其配置的场景中。

定义一个实现 QueueOptionsProviderInterface 的服务 queue_options_provider,并将其添加到 dynamic_consumers 配置中。

dynamic_consumers:
    proc_logs:
        connection: default
        exchange_options: {name: 'logs', type: topic}
        callback: parse_logs_service
        queue_options_provider: queue_options_provider_service

示例用法

$ ./app/console rabbitmq:dynamic-consumer proc_logs server1

在这种情况下,proc_logs 消费者为 server1 运行,并且它可以根据其使用的队列选项做出决定。

匿名消费者

现在,我们为什么需要匿名消费者呢?这听起来像某种互联网威胁或类似的东西...继续阅读。

在AMQP中,有一种名为 topic 的交换机类型,其中消息根据——您猜对了——消息的主题被路由到队列。我们可以将关于我们应用程序的日志发送到RabbitMQ主题交换机,将主题设置为创建日志的主机名和日志的严重性。消息体将是日志内容,我们的路由键将是这样的

  • server1.error
  • server2.info
  • server1.warning
  • ...

由于我们不希望用无限多的日志填满队列,因此我们可以在需要监控系统时启动一个消费者,该消费者创建一个队列并基于某个主题附加到日志交换机,例如,我们想查看服务器报告的所有错误。路由键将类似于:#.error。在这种情况下,我们需要想出一个队列名称,将其绑定到交换机,获取日志,解绑并删除队列。幸运的是,AMPQ提供了一种在声明和绑定队列时提供正确选项来自动执行此操作的方法。问题是您不需要记住所有这些选项。因此,我们实现了匿名消费者模式。

当我们启动匿名消费者时,它将处理这些细节,我们只需要考虑实现消息到达时的回调。之所以称为匿名,是因为它不会指定队列名称,但会等待RabbitMQ随机分配一个。

现在,如何配置和运行这样的消费者?

anon_consumers:
    logs_watcher:
        connection:       default
        exchange_options: {name: 'app-logs', type: topic}
        callback:         logs_watcher

在这里,我们指定交换机名称及其类型,以及当消息到达时应执行回调。

现在,这个匿名消费者能够监听与相同交换机链接的、类型为topic的生产者。

    producers:
        app_logs:
            connection:       default
            exchange_options: {name: 'app-logs', type: topic}

要启动匿名消费者,请使用以下命令

$ ./app/console_dev rabbitmq:anon-consumer -m 5 -r '#.error' logs_watcher

与之前看到的命令相比,唯一的区别是指定了路由键-r '#.error'

STDIN 生产者

有一个命令从STDIN读取数据并将其发布到RabbitMQ队列。要使用它,首先您需要在配置文件中配置一个producer服务,如下所示

producers:
    words:
      connection:       default
      exchange_options: {name: 'words', type: direct}

该生产者将消息发布到words直接交换机。当然,您可以根据需要调整配置。

然后,假设您想发布一些XML文件的内容,以便它们被消费者处理。您可以使用以下命令发布它们

$ find vendor/symfony/ -name "*.xml" -print0 | xargs -0 cat | ./app/console rabbitmq:stdin-producer words

这意味着您可以使用普通的Unix命令组合生产者。

让我们分解这个单行命令

$ find vendor/symfony/ -name "*.xml" -print0

该命令将在symfony文件夹中查找所有.xml文件,并将文件名打印出来。然后,这些文件名通过xargs逐个传递给cat

$ xargs -0 cat

最后,cat的输出直接传递到我们的生产者,该生产者的调用方式如下

$ ./app/console rabbitmq:stdin-producer words

它只接受一个参数,即生产者名称,您可以在您的config.yml文件中配置它。

其他命令

设置RabbitMQ fabric

该bundle的目的是让您的应用程序生产消息并将它们发布到您配置的某些交换机上。

在某些情况下,即使您的配置正确,您生产的消息也可能不会路由到任何队列,因为没有队列存在。负责队列消费的消费者必须运行,队列才会被创建。

当消费者数量很多时,为每个消费者启动一个命令可能是一个噩梦。

为了一次性创建交换机、队列和绑定,并确保不会丢失任何消息,您可以运行以下命令

$ ./app/console rabbitmq:setup-fabric

当需要时,您可以配置您的消费者和生产者假设RabbitMQ fabric已经定义。为此,请将以下内容添加到您的配置中

producers:
    upload_picture:
      auto_setup_fabric: false
consumers:
    upload_picture:
      auto_setup_fabric: false

默认情况下,消费者或生产者在启动时将声明它所需的所有内容与RabbitMQ。请小心使用此功能,当交换机或队列未定义时,将出现错误。当您更改任何配置时,您需要运行上述setup-fabric命令来声明您的配置。

如何贡献

要贡献代码,只需提交一个包含新代码的拉取请求。请注意,如果您添加新功能或修改现有功能,您必须在README中说明它们的功能。如果您破坏了向后兼容性(BC),也必须进行说明。此外,您还需要更新CHANGELOG。因此

  • 记录新功能。
  • 更新CHANGELOG。
  • 记录破坏向后兼容性的更改。

许可证

请参阅:资源/元/LICENSE.md

致谢

捆绑结构和文档部分基于RedisBundle