oldsound / rabbitmq-bundle
以前为 oldsound/rabbitmq-bundle。集成了 php-amqplib 与 Symfony2 和 RabbitMq
Requires
- php: >=5.3.0
- symfony/config: ~2.3 || ~3.0
- symfony/console: ~2.3 || ~3.0
- symfony/dependency-injection: ~2.3 || ~3.0
- symfony/event-dispatcher: ~2.3 || ~3.0
- symfony/yaml: ~2.3 || ~3.0
- videlalvaro/php-amqplib: ~2.6
Requires (Dev)
- phpunit/phpunit: >=3.7.0
- symfony/debug: ~2.3 || ~3.0
- symfony/serializer: ~2.3 || ~3.0
Suggests
- symfony/framework-bundle: To use this lib as a full Symfony Bundle and to use the profiler data collector
README
关于
RabbitMqBundle 通过 RabbitMQ 和 php-amqplib 库在您的应用程序中实现消息传递。
此包实现了在 Thumper 库中看到的几个消息模式。因此,从 Symfony2 控制器向 RabbitMQ 发布消息就像
$msg = array('user_id' => 1235, 'image_path' => '/path/to/new/pic.png'); $this->get('old_sound_rabbit_mq.upload_picture_producer')->publish(serialize($msg));
稍后当您想从 upload_pictures
队列中消费 50 条消息时,只需在 CLI 上运行
$ ./app/console rabbitmq:consumer -m 50 upload_picture
所有示例都假设有一个正在运行的 RabbitMQ 服务器。
此包在 Symfony Live Paris 2011 会议中展示。查看幻灯片 这里。
安装
对于 Symfony 框架 >= 2.3
使用 composer 需要此包及其依赖项
$ composer require php-amqplib/rabbitmq-bundle
注册包
// app/AppKernel.php public function registerBundles() { $bundles = array( new OldSound\RabbitMqBundle\OldSoundRabbitMqBundle(), ); }
享受吧!
对于使用 Symfony Console、依赖注入和 Config 组件的控制台应用程序
如果您有一个用于运行 RabbitMQ 消费者的控制台应用程序,您不需要 Symfony HttpKernel 和 FrameworkBundle。从版本 1.6 开始,您可以使用依赖注入组件来加载此包配置和服务,然后使用消费者命令。
在您的 composer.json 文件中需要此包
{
"require": {
"php-amqplib/rabbitmq-bundle": "~1.6",
}
}
注册扩展和编译器传递
use OldSound\RabbitMqBundle\DependencyInjection\OldSoundRabbitMqExtension; use OldSound\RabbitMqBundle\DependencyInjection\Compiler\RegisterPartsPass; // ... $containerBuilder->registerExtension(new OldSoundRabbitMqExtension()); $containerBuilder->addCompilerPass(new RegisterPartsPass());
注意 - BC 破坏性更改
-
自 2012-06-04 以来,某些在“生产者”配置部分声明的交换机的默认选项已更改为与“消费者”部分中声明的交换机的默认选项匹配。受影响的设置是
durable
已从false
更改为true
,auto_delete
已从true
更改为false
。
如果您的配置依赖于以前的默认值,则必须更新您的配置。
-
自 2012-04-24 以来,ConsumerInterface::execute 方法签名已更改
-
自 2012-01-03 以来,消费者执行方法获取整个 AMQP 消息对象,而不仅仅是体。有关更多详细信息,请参阅 CHANGELOG 文件。
用法
在您的配置文件中添加 old_sound_rabbit_mq
部分
old_sound_rabbit_mq: connections: default: host: 'localhost' port: 5672 user: 'guest' password: 'guest' vhost: '/' lazy: false connection_timeout: 3 read_write_timeout: 3 # requires php-amqplib v2.4.1+ and PHP5.4+ keepalive: false # requires php-amqplib v2.4.1+ heartbeat: 0 producers: upload_picture: connection: default exchange_options: {name: 'upload-picture', type: direct} consumers: upload_picture: connection: default exchange_options: {name: 'upload-picture', type: direct} queue_options: {name: 'upload-picture'} callback: upload_picture_service
在这里,我们配置了连接服务和应用程序将拥有的消息端点。在此示例中,您的服务容器将包含服务 old_sound_rabbit_mq.upload_picture_producer
和 old_sound_rabbit_mq.upload_picture_consumer
。后者期望存在名为 upload_picture_service
的服务。
如果您没有为客户端指定连接,客户端将寻找具有相同别名的连接。所以对于我们的 upload_picture
,服务容器将寻找一个 upload_picture
连接。
如果您需要添加可选的队列参数,那么您的队列选项可以如下所示
queue_options: {name: 'upload-picture', arguments: {'x-ha-policy': ['S', 'all']}}
另一个示例,消息TTL为20秒
queue_options: {name: 'upload-picture', arguments: {'x-message-ttl': ['I', 20000]}}
参数值必须是数据类型和值的列表。有效数据类型有
S
- 字符串I
- 整数D
- 十进制T
- 时间戳F
- 表A
- 数组
根据您的需要调整 arguments
。
如果您想将队列与特定的路由键绑定,可以在生产者或消费者配置中声明它
queue_options: name: "upload-picture" routing_keys: - 'android.#.upload' - 'iphone.upload'
重要提示 - 懒连接
在Symfony环境中,所有服务在每次请求时都会完全启动,从版本 >= 2.3开始,您可以声明一个服务为懒加载(懒加载服务)。这个包仍然不支持新的懒加载服务功能,但您可以在连接配置中设置 lazy: true
以避免在每次请求中不必要的连接到消息代理。由于性能原因,强烈建议使用懒加载连接,尽管默认情况下禁用了懒加载选项,以避免在已使用此包的应用程序中可能出现的中断。
导入提示 - 心跳
将 read_write_timeout
设置为心跳的两倍是一个好主意,这样您的套接字就会保持打开。如果您不这样做或使用不同的乘数,存在风险,消费者套接字将超时。
生产者、消费者、什么?
在消息应用中,将消息发送到代理的过程称为 生产者,而接收这些消息的过程称为 消费者。在您的应用程序中,您将拥有几个它们,您可以在配置的相应条目下列出它们。
生产者
生产者将用于向服务器发送消息。在AMQP模型中,消息发送到 交换机,这意味着在生产者的配置中,您将需要指定连接选项以及交换机选项,这通常将是交换机的名称和类型。
现在假设您想要在后台处理图片上传。在将图片移动到其最终位置后,您将向服务器发布以下信息的消息
public function indexAction($name) { $msg = array('user_id' => 1235, 'image_path' => '/path/to/new/pic.png'); $this->get('old_sound_rabbit_mq.upload_picture_producer')->publish(serialize($msg)); }
如您所见,如果在您的配置中有一个名为 upload_picture 的生产者,那么在服务容器中,您将有一个名为 old_sound_rabbit_mq.upload_picture_producer 的服务。
除了消息本身之外,OldSound\RabbitMqBundle\RabbitMq\Producer#publish()
方法还接受一个可选的路由键参数和一个可选的附加属性数组。附加属性数组允许您更改默认情况下通过 PhpAmqpLib\Message\AMQPMessage
对象构建的属性。这样,例如,您可以更改应用程序头。
您可以使用 setContentType 和 setDeliveryMode 方法来分别设置消息内容类型和消息投递模式。默认值是内容类型为 text/plain 和投递模式为 2。
$this->get('old_sound_rabbit_mq.upload_picture_producer')->setContentType('application/json');
如果您需要使用一个自定义类作为生产者(该类应继承自 OldSound\RabbitMqBundle\RabbitMq\Producer
),则可以使用 class
选项
... producers: upload_picture: class: My\Custom\Producer connection: default exchange_options: {name: 'upload-picture', type: direct} ...
下一个难题是要有一个消费者,它会从队列中取出消息并相应地处理它。
消费者
消费者将连接到服务器并启动一个等待接收消息处理的循环。这样的消费者将根据指定的回调行为不同。让我们回顾一下上面的消费者配置
consumers: upload_picture: connection: default exchange_options: {name: 'upload-picture', type: direct} queue_options: {name: 'upload-picture'} callback: upload_picture_service
如我们所见,其中的回调选项引用了一个upload_picture_service。当消费者从服务器接收到消息时,它会执行这样的回调。如果您需要指定不同的回调以进行测试或调试,则可以在此处更改它。
除了回调外,我们还指定了要使用的连接,这与使用生产者的方式相同。其余选项是exchange_options和queue_options。exchange_options应该与用于生产者的相同。在queue_options中,我们将提供队列名称。为什么?
正如我们所说的,AMQP中的消息是发布到交换机的。这并不意味着消息已经到达队列。为此,我们首先需要创建这样的队列,并将其绑定到交换机。酷的地方在于,你可以将多个队列绑定到一个交换机上,这样一条消息就可以到达多个目的地。这种方法的优点是实现了生产者和消费者的解耦。生产者不在乎将有多少消费者处理他的消息。他只需要确保他的消息到达服务器。这样,我们就可以在不更改控制器中的代码的情况下,扩展每次上传图片时执行的操作。
现在,如何运行消费者?有一个命令可以执行,如下所示
$ ./app/console rabbitmq:consumer -m 50 upload_picture
这是什么意思?我们正在执行upload_picture
消费者,并指示它只消费50条消息。每次消费者从服务器接收到消息时,它将执行配置的回调,并将AMQP消息作为PhpAmqpLib\Message\AMQPMessage
类的实例传递。可以通过调用$msg->body
来获取消息正文。默认情况下,消费者将以一些定义的无限方式无限循环地处理消息。
如果您想确保消费者在Unix信号下立即完成执行,可以使用带有标志-w
的命令。
$ ./app/console rabbitmq:consumer -w upload_picture
然后消费者将立即完成执行。
要使用带有此标志的命令,您需要安装带有PCNTL扩展的PHP。
如果您想设置消费者内存限制,可以使用带有标志-l
来执行此操作。在下面的示例中,此标志添加了256MB的内存限制。消费者将在达到256MB之前停止5MB,以避免PHP内存大小限制错误。
$ ./app/console rabbitmq:consumer -l 256
如果您想删除队列中等待的所有消息,可以执行此命令来清空此队列
$ ./app/console rabbitmq:purge --no-confirmation upload_picture
要删除消费者的队列,请使用此命令
$ ./app/console rabbitmq:delete --no-confirmation upload_picture
空闲超时
如果您需要在一段时间内没有消息从您的队列中时设置超时,可以将秒数设置为idle_timeout
consumers: upload_picture: connection: default exchange_options: {name: 'upload-picture', type: direct} queue_options: {name: 'upload-picture'} callback: upload_picture_service idle_timeout: 60
公平调度
您可能已经注意到,调度仍然没有完全符合我们的期望。例如,在一个有两个工作者的情况下,如果所有奇数消息都很重而偶数消息都很轻,一个工作者会一直很忙,而另一个工作者几乎不会做任何工作。RabbitMQ对此一无所知,仍然会均匀地调度消息。
这种情况发生是因为RabbitMQ只在消息进入队列时才分发消息。它不会查看消费者未确认消息的数量。它只是盲目地将每第n条消息分发给第n个消费者。
为了克服这一点,我们可以使用basic.qos方法并设置prefetch_count=1。这告诉RabbitMQ一次不要给一个工作进程发送超过一条消息。换句话说,不要在处理和确认上一条消息之前给工作进程发送新消息。相反,它将把它分发给下一个没有忙碌的工作进程。
来源:https://rabbitmq.cn/tutorials/tutorial-two-python.html
请注意,实现公平分发会引入延迟,这会损害性能(请参阅这篇博客文章)。但是,实现它允许您随着队列的增加动态地进行水平扩展。您应该像博客文章建议的那样,根据处理每条消息所需的时间以及您的网络性能来评估prefetch_size的值。
使用RabbitMqBundle,您可以像这样为每个消费者配置qos_options
consumers: upload_picture: connection: default exchange_options: {name: 'upload-picture', type: direct} queue_options: {name: 'upload-picture'} callback: upload_picture_service qos_options: {prefetch_size: 0, prefetch_count: 1, global: false}
回调
以下是一个回调示例
<?php //src/Acme/DemoBundle/Consumer/UploadPictureConsumer.php namespace Acme\DemoBundle\Consumer; use OldSound\RabbitMqBundle\RabbitMq\ConsumerInterface; use PhpAmqpLib\Message\AMQPMessage; class UploadPictureConsumer implements ConsumerInterface { public function execute(AMQPMessage $msg) { //Process picture upload. //$msg will be an instance of `PhpAmqpLib\Message\AMQPMessage` with the $msg->body being the data sent over RabbitMQ. $isUploadSuccess = someUploadPictureMethod(); if (!$isUploadSuccess) { // If your image upload failed due to a temporary error you can return false // from your callback so the message will be rejected by the consumer and // requeued by RabbitMQ. // Any other value not equal to false will acknowledge the message and remove it // from the queue return false; } } }
如您所见,这只需要实现一个方法:ConsumerInterface::execute。
请记住,您的回调需要作为正常Symfony2服务进行注册。在那里,您可以注入服务容器、数据库服务、Symfony记录器等。
有关消息实例组成部分的更多详细信息,请参阅https://github.com/php-amqplib/php-amqplib/blob/master/doc/AMQPMessage.md。
总结
发送消息似乎要做很多工作,让我们回顾一下,以获得更好的概述。这是我们需要生产/消费消息的内容:
- 在配置中为消费者/生产者添加条目。
- 实现您的回调。
- 从CLI启动消费者。
- 在控制器中添加发布消息的代码。
就是这样!
RPC或回复/响应
到目前为止,我们只是向消费者发送消息,但如果我们想从他们那里获得回复怎么办?为了实现这一点,我们必须在我们的应用程序中实现RPC调用。这个包使得使用Symfony2实现这些事情变得非常容易。
让我们将RPC客户端和服务器添加到配置中
rpc_clients: integer_store: connection: default unserializer: json_decode rpc_servers: random_int: connection: default callback: random_int_server qos_options: {prefetch_size: 0, prefetch_count: 1, global: false} exchange_options: {name: random_int, type: topic} queue_options: {name: random_int_queue, durable: false, auto_delete: true} serializer: json_encode
有关完整的配置参考,请使用命令php app/console config:dump-reference old_sound_rabbit_mq
。
这里有一个非常有用的服务器:它向其客户端返回随机整数。用于处理请求的回调将是random_int_server服务。现在让我们看看如何从我们的控制器中调用它。
首先,我们必须从命令行启动服务器
$ ./app/console_dev rabbitmq:rpc-server random_int
然后,将以下代码添加到我们的控制器中
public function indexAction($name) { $client = $this->get('old_sound_rabbit_mq.integer_store_rpc'); $client->addRequest(serialize(array('min' => 0, 'max' => 10)), 'random_int', 'request_id'); $replies = $client->getReplies(); }
如您所见,如果我们的客户端ID是integer_store,那么服务名称将是old_sound_rabbit_mq.integer_store_rpc。一旦我们获得该对象,我们通过调用addRequest
向服务器发送一个请求,该请求期望三个参数:
- 要发送给远程过程调用的参数。
- RPC服务器的名称,在我们的例子中为random_int。
- 我们调用请求的唯一标识符,在这个例子中为request_id。
我们发送的参数是 rand()
函数的 最小值 和 最大值。我们通过序列化一个数组来发送它们。如果服务器期望 JSON 信息或 XML,我们将在这里发送这样的数据。
最后一步是获取回复。我们的 PHP 脚本将阻塞,直到服务器返回一个值。变量 $replies 将是一个关联数组,其中每个来自服务器的回复都包含在相应的 request_id 键中。
默认情况下,RPC 客户端期望响应被序列化。如果您正在工作的服务器返回非序列化的结果,请将 RPC 客户端 expect_serialized_response 选项设置为 false。例如,如果 integer_store 服务器未序列化结果,客户端将设置如下:
rpc_clients: integer_store: connection: default expect_serialized_response: false
您还可以为请求设置秒数,超过这个时间后,服务器和客户端将不再处理消息,请求将简单地超时。设置消息的超时时间仅适用于 RabbitMQ 3.x 及以上版本。更多信息请访问 https://rabbitmq.cn/ttl.html#per-message-ttl。
public function indexAction($name) { $expiration = 5; // seconds $client = $this->get('old_sound_rabbit_mq.integer_store_rpc'); $client->addRequest($body, $server, $requestId, $routingKey, $expiration); try { $replies = $client->getReplies(); // process $replies['request_id']; } catch (\PhpAmqpLib\Exception\AMQPTimeoutException $e) { // handle timeout } }
正如你所猜到的,我们也可以进行 并行 RPC 调用。
并行 RPC
假设为了渲染某个网页,你需要执行两个数据库查询,一个需要 5 秒完成,另一个需要 2 秒——非常昂贵的查询。如果你按顺序执行它们,那么你的页面大约在 7 秒后准备好交付。如果你并行运行它们,那么你的页面将在大约 5 秒内被提供。使用 RabbitMqBundle,我们可以轻松地进行这样的并行调用。让我们在配置中定义一个并行客户端和另一个 RPC 服务器。
rpc_clients: parallel: connection: default rpc_servers: char_count: connection: default callback: char_count_server random_int: connection: default callback: random_int_server
然后这段代码应该放在我们的控制器中
public function indexAction($name) { $client = $this->get('old_sound_rabbit_mq.parallel_rpc'); $client->addRequest($name, 'char_count', 'char_count'); $client->addRequest(serialize(array('min' => 0, 'max' => 10)), 'random_int', 'random_int'); $replies = $client->getReplies(); }
与前面的例子非常相似,我们只是多了一个 addRequest
调用。我们还提供了有意义的请求标识符,这样以后在 $replies 数组中找到我们想要的回复会更简单。
多个消费者
拥有很多队列以实现逻辑分离是一种好习惯。使用简单的消费者,你必须为每个队列创建一个工作者(消费者),当处理许多演变时这可能很难管理(忘记在你的 supervisord 配置中添加一行?)。这对于小型队列也很有用,因为你可能不想拥有与队列一样多的工作者,并希望将一些任务重新组合在一起,同时不失去灵活性和分离原则。
多个消费者允许你通过在同一个消费者上监听多个队列来处理这种情况。
以下是如何设置具有多个队列的消费者:
multiple_consumers: upload: connection: default exchange_options: {name: 'upload', type: direct} queues_provider: queues_provider_service queues: upload-picture: name: upload_picture callback: upload_picture_service routing_keys: - picture upload-video: name: upload_video callback: upload_video_service routing_keys: - video upload-stats: name: upload_stats callback: upload_stats
现在回调指定在每个队列下,并且必须像简单消费者一样实现 ConsumerInterface
。消费者中所有 queues-options
的选项都适用于每个队列。
请注意,所有队列都在同一个交换机下,设置正确的回调路由取决于你。
queues_provider
是一个可选的服务,它可以动态地提供队列。它必须实现 QueuesProviderInterface
。
请注意,队列提供者负责正确调用 setDequeuer
,并且回调是可调用的(不是 ConsumerInterface
)。如果提供队列的服务实现了 DequeuerAwareInterface
,则定义服务时会添加一个调用 setDequeuer
的操作,其中 DequeuerInterface
当前是一个 MultipleConsumer
。
动态消费者
有时你必须动态地更改消费者的配置。动态消费者允许你根据上下文程序化地定义消费者队列选项。
例如,在一个场景中,定义的消费者必须负责动态数量的主题,并且你不想(或不能)每次都更改它的配置。
定义一个名为 queue_options_provider
的服务,该服务实现了 QueueOptionsProviderInterface
接口,并将其添加到您的 dynamic_consumers
配置中。
dynamic_consumers: proc_logs: connection: default exchange_options: {name: 'logs', type: topic} callback: parse_logs_service queue_options_provider: queue_options_provider_service
使用示例
$ ./app/console rabbitmq:dynamic-consumer proc_logs server1
在这种情况下,proc_logs
消费者运行在 server1
上,它可以决定使用的队列选项。
匿名消费者
那么我们为什么会需要匿名消费者呢?这听起来像是一种互联网威胁或类似的东西… 继续阅读。
在 AMQP 中,有一种称为 主题 的交换类型,消息根据——你猜对了——消息的主题被路由到队列。我们可以将关于我们应用程序的日志发送到使用主机名和日志严重性作为主题的 RabbitMQ 主题交换。消息体将是日志内容,我们的路由键将是这样的:
- server1.error
- server2.info
- server1.warning
- ...
由于我们不想让队列被无限多的日志填满,我们可以做的是,当我们要监控系统时,我们可以启动一个创建队列并将其附加到基于某些主题的 logs 交换的消费者,例如,我们希望看到服务器报告的所有错误。路由键将类似于:#.error。在这种情况下,我们必须想出一个队列名称,将其绑定到交换,获取日志,取消绑定并删除队列。幸运的是,AMPQ 提供了一种在声明和绑定队列时提供正确选项时自动执行此操作的方法。问题是您不想记住所有这些选项。因此,我们实现了 匿名消费者 模式。
当我们启动匿名消费者时,它将处理这些细节,我们只需要考虑实现消息到达时的回调。它之所以被称为匿名,是因为它不会指定队列名称,但它将等待 RabbitMQ 为它分配一个随机的名称。
现在,如何配置和运行这样的消费者?
anon_consumers: logs_watcher: connection: default exchange_options: {name: 'app-logs', type: topic} callback: logs_watcher
我们在这里指定交换名称及其类型,以及当消息到达时应执行的回调。
这个匿名消费者现在可以监听与同一交换链接并属于 topic 类型的生产者。
producers: app_logs: connection: default exchange_options: {name: 'app-logs', type: topic}
要启动一个 匿名消费者,我们使用以下命令
$ ./app/console_dev rabbitmq:anon-consumer -m 5 -r '#.error' logs_watcher
与之前看到的命令相比,唯一的区别是它指定了 路由键:-r '#.error'
。
STDIN 生产者
有一个命令可以从 STDIN 读取数据并将其发布到 RabbitMQ 队列。要使用它,您首先需要在配置文件中配置一个 producer
服务,如下所示
producers: words: connection: default exchange_options: {name: 'words', type: direct}
该生产者将消息发布到 words
直接交换。当然,您可以根据需要调整配置。
然后假设您想发布某些 XML 文件的内容,以便它们被消费者群处理。您可以通过使用以下命令来发布它们
$ find vendor/symfony/ -name "*.xml" -print0 | xargs -0 cat | ./app/console rabbitmq:stdin-producer words
这意味着您可以使用普通的 Unix 命令组合生产者。
让我们分解这个单行命令
$ find vendor/symfony/ -name "*.xml" -print0
该命令将在 symfony 文件夹内找到所有 .xml
文件,并将文件名打印出来。然后,每个文件名都通过 xargs
通过 cat
管道传递。
$ xargs -0 cat
最后,cat
的输出直接发送到我们的生产者,该生产者调用方式如下
$ ./app/console rabbitmq:stdin-producer words
它只接受一个参数,即您在 config.yml
文件中配置的生产者名称。
其他命令
设置 RabbitMQ 架构
此插件包的目的是让您的应用程序生成消息并将它们发布到您已配置的一些交换机。
在某些情况下,即使您的配置正确,您生成的消息也不会被路由到任何队列,因为它们不存在。负责队列消费的消费者必须运行才能创建队列。
当消费者数量较多时,为每个消费者启动命令可能是一件噩梦。
为了一次性创建交换机、队列和绑定,并确保您不会丢失任何消息,您可以运行以下命令
$ ./app/console rabbitmq:setup-fabric
当需要时,您可以将您的消费者和生成者配置为假设RabbitMQ架构已经定义。为此,请在您的配置中添加以下内容
producers: upload_picture: auto_setup_fabric: false consumers: upload_picture: auto_setup_fabric: false
默认情况下,消费者或生成器在启动时会使用RabbitMQ声明它所需的一切。使用时要小心,如果交换机或队列未定义,将出现错误。当您更改任何配置时,您需要运行上述setup-fabric命令以声明您的配置。
如何贡献
要贡献,只需打开一个带有您新代码的拉取请求,注意如果您添加了新功能或修改了现有功能,您必须在本README中说明它们的功能。如果您破坏了BC,您也必须进行记录。此外,您还必须更新CHANGELOG。所以
- 记录新功能。
- 更新CHANGELOG。
- 记录BC破坏性更改。
许可
见:resources/meta/LICENSE.md
致谢
此插件的结构和文档部分基于RedisBundle