georgii-web / rabbitmq-bundle
集成了 php-amqplib 与 Symfony 及 RabbitMq。之前为 emag-tech-labs/rabbitmq-bundle,oldsound/rabbitmq-bundle。
Requires
- php: ^7.4|^8.0
- php-amqplib/php-amqplib: ^2.12.2|^3.0
- psr/log: ^1.0 || ^2.0 || ^3.0
- symfony/config: ^4.4|^5.3|^6.0
- symfony/console: ^4.4|^5.3|^6.0
- symfony/dependency-injection: ^4.4|^5.3|^6.0
- symfony/event-dispatcher: ^4.4|^5.3|^6.0
- symfony/framework-bundle: ^4.4|^5.3|^6.0
- symfony/http-kernel: ^4.4|^5.3|^6.0
- symfony/yaml: ^4.4|^5.3|^6.0
Requires (Dev)
- phpstan/phpstan: ^1.2
- phpstan/phpstan-phpunit: ^1.0
- phpunit/phpunit: ^9.5
- symfony/serializer: ^4.4|^5.3|^6.0
Suggests
- ext-pcntl: *
- symfony/framework-bundle: To use this lib as a full Symfony Bundle and to use the profiler data collector
Replaces
- emag-tech-labs/rabbitmq-bundle: 2.11.1
- oldsound/rabbitmq-bundle: 2.11.1
- 2.11.1
- 2.11.0
- 2.10.2
- 2.10.1
- 2.10.0
- 2.9.0
- 2.8.0
- 2.7.1
- 2.7.0
- 2.6.0
- 2.5.3
- 2.5.2
- 2.5.1
- v1.15.1
- v1.15.0
- v1.14.4
- v1.14.3
- v1.14.2
- v1.14.1
- v1.14.0
- v1.13.0
- v1.12.0
- v1.11.2
- v1.11.1
- v1.11.0
- dev-master / 1.10.x-dev
- v1.10.0
- v1.9.0
- v1.8.0
- v1.7.0
- v1.6.0
- v1.5.0
- v1.4.1
- v1.4.0
- v1.3.2
- v1.3.1
- v1.3.0
- v1.2.1
- v1.2.0
- v1.1.3
- v1.1.2
- v1.1.1
- v1.1.0
- v1.0.0
- dev-patch-2
- dev-patch-1
- dev-add-php-cs-fixer
This package is auto-updated.
Last update: 2024-09-24 12:05:09 UTC
README
关于
RabbitMqBundle
通过 RabbitMQ 使用 php-amqplib 库将消息集成到您的应用程序中。
该捆绑包实现了在 Thumper 库中看到的一些消息模式。因此,从 Symfony 控制器向 RabbitMQ 发布消息就像
$msg = array('user_id' => 1235, 'image_path' => '/path/to/new/pic.png'); $this->get('old_sound_rabbit_mq.upload_picture_producer')->publish(serialize($msg));
稍后,当您想从 upload_pictures
队列中消费 50 条消息时,只需在 CLI 上运行
$ ./app/console rabbitmq:consumer -m 50 upload_picture
所有示例都假定 RabbitMQ 服务器正在运行。
此捆绑包在 Symfony Live Paris 2011 会议中展出。请参阅幻灯片 此处。
版本 2
由于 Symfony >=4.4 引起的破坏性更改,发布了一个新的标签,使捆绑包与 Symfony >=4.4 兼容。
安装
对于 Symfony 框架 >= 4.4
使用 composer 需求捆绑包及其依赖项
$ composer require php-amqplib/rabbitmq-bundle
注册捆绑包
// app/AppKernel.php public function registerBundles() { $bundles = array( new OldSound\RabbitMqBundle\OldSoundRabbitMqBundle(), ); }
享受!
对于使用 Symfony Console,依赖注入和配置组件的控制台应用程序
如果您有一个用于运行 RabbitMQ 消费者的控制台应用程序,您不需要 Symfony HttpKernel 和 FrameworkBundle。从版本 1.6 开始,您可以使用依赖注入组件加载此捆绑包配置和服务,然后使用消费者命令。
在您的 composer.json 文件中需求捆绑包
{
"require": {
"php-amqplib/rabbitmq-bundle": "^2.0",
}
}
注册扩展和编译器传递
use OldSound\RabbitMqBundle\DependencyInjection\OldSoundRabbitMqExtension; use OldSound\RabbitMqBundle\DependencyInjection\Compiler\RegisterPartsPass; // ... $containerBuilder->registerExtension(new OldSoundRabbitMqExtension()); $containerBuilder->addCompilerPass(new RegisterPartsPass());
警告 - BC 破坏性更改
-
自 2012-06-04 以来,某些在 "producers" 配置部分声明的交换机的默认选项已更改,以匹配 "consumers" 部分中声明的交换机的默认选项。受影响的设置是
durable
已从false
更改为true
,auto_delete
已从true
更改为false
。
如果您依赖于之前的默认值,则必须更新您的配置。
-
自 2012-04-24 以来,ConsumerInterface::execute 方法签名已更改
-
自 2012-01-03 以来,消费者执行方法获取整个 AMQP 消息对象,而不仅仅是体
使用
在您的配置文件中添加 old_sound_rabbit_mq
部分
old_sound_rabbit_mq: connections: default: host: 'localhost' port: 5672 user: 'guest' password: 'guest' vhost: '/' lazy: false connection_timeout: 3 read_write_timeout: 3 # requires php-amqplib v2.4.1+ and PHP5.4+ keepalive: false # requires php-amqplib v2.4.1+ heartbeat: 0 #requires php_sockets.dll use_socket: true # default false another: # A different (unused) connection defined by an URL. One can omit all parts, # except the scheme (amqp:). If both segment in the URL and a key value (see above) # are given the value from the URL takes precedence. # See https://rabbitmq.cn/uri-spec.html on how to encode values. url: 'amqp://guest:password@localhost:5672/vhost?lazy=1&connection_timeout=6' producers: upload_picture: connection: default exchange_options: {name: 'upload-picture', type: direct} service_alias: my_app_service # no alias by default default_routing_key: 'optional.routing.key' # defaults to '' if not set default_content_type: 'content/type' # defaults to 'text/plain' default_delivery_mode: 2 # optional. 1 means non-persistent, 2 means persistent. Defaults to "2". consumers: upload_picture: connection: default exchange_options: {name: 'upload-picture', type: direct} queue_options: {name: 'upload-picture'} callback: upload_picture_service
在这里,我们配置连接服务和应用程序将拥有的消息端点。在这个例子中,您的服务容器将包含服务 old_sound_rabbit_mq.upload_picture_producer
和 old_sound_rabbit_mq.upload_picture_consumer
。后者期望存在一个名为 upload_picture_service
的服务。
如果您未指定客户端的连接,则客户端将寻找具有相同别名的连接。因此,对于我们的 upload_picture
,服务容器将寻找一个名为 upload_picture
的连接。
如果您需要添加可选的队列参数,则您的队列选项可以是如下所示
queue_options: {name: 'upload-picture', arguments: {'x-ha-policy': ['S', 'all']}}
另一个具有 20 秒消息 TTL 的示例
queue_options: {name: 'upload-picture', arguments: {'x-message-ttl': ['I', 20000]}}
参数值必须是一个数据类型和值的列表。有效数据类型是
S
- 字符串I
- 整数D
- 小数T
- 时间戳F
- 表A
- 数组t
- 布尔值
根据您的需求调整 arguments
。
如果您想将队列绑定到特定的路由键,您可以在生产者或消费者配置中声明。
queue_options: name: "upload-picture" routing_keys: - 'android.#.upload' - 'iphone.upload'
重要提示 - 懒连接
在 Symfony 环境中,所有服务在每个请求中都是完全启动的,从版本 >= 4.3 开始,您可以声明一个服务为懒加载(懒加载服务)。此捆绑包仍然不支持新的懒加载服务功能,但您可以在连接配置中将 lazy: true
设置为避免在每次请求中与您的消息代理建立不必要的连接。由于性能原因,强烈建议使用懒连接,尽管懒选项默认禁用,以避免可能破坏已使用此捆绑包的应用程序。
重要提示 - 心跳
将 read_write_timeout
设置为心跳的两倍,这样您的套接字就会保持开启状态。如果您不这样做,或者使用不同的乘数,存在风险,消费者 套接字将超时。
请记住,如果您的任务通常运行时间超过心跳周期,您可能会遇到问题(链接)。考虑使用较大的心跳值或禁用心跳以使用 tcp 的 keepalive
(客户端和服务器端)和 graceful_max_execution_timeout
功能。
多个主机
您可以为连接提供多个主机。这将允许您使用具有多个节点的 RabbitMQ 集群。
old_sound_rabbit_mq: connections: default: hosts: - host: host1 port: 3672 user: user1 password: password1 vhost: vhost1 - url: 'amqp://guest:password@localhost:5672/vhost' connection_timeout: 3 read_write_timeout: 3
请注意,您不能为每个主机指定
connection_timeout read_write_timeout use_socket ssl_context keepalive heartbeat connection_parameters_provider
参数。
动态连接参数
有时您的连接信息可能需要动态的。动态连接参数允许您通过服务以编程方式提供或覆盖参数。
例如,在以下场景中,连接的 vhost
参数取决于您白标应用程序的当前租户,并且您不希望(或无法)每次都更改其配置。
在 connection_parameters_provider
下定义一个服务,该服务实现 ConnectionParametersProviderInterface
,并将其添加到适当的 connections
配置中。
connections: default: host: 'localhost' port: 5672 user: 'guest' password: 'guest' vhost: 'foo' # to be dynamically overridden by `connection_parameters_provider` connection_parameters_provider: connection_parameters_provider_service
示例实现
class ConnectionParametersProviderService implements ConnectionParametersProvider { ... public function getConnectionParameters() { return array('vhost' => $this->getVhost()); } ... }
在这种情况下,vhost
参数将被 getVhost()
的输出覆盖。
生产者、消费者,什么?
在消息应用中,向代理发送消息的过程称为 生产者,而接收这些消息的过程称为 消费者。在您的应用程序中,您将拥有几个它们,您可以在配置的相应条目下列出它们。
生产者
生产者将被用来向服务器发送消息。在 AMQP 模型中,消息被发送到 交换机,这意味着在生产者的配置中,您必须指定连接选项以及交换机选项,这通常是交换机的名称和类型。
现在假设您想在后台处理图片上传。在将图片移动到最终位置后,您将向服务器发布以下信息
public function indexAction($name) { $msg = array('user_id' => 1235, 'image_path' => '/path/to/new/pic.png'); $this->get('old_sound_rabbit_mq.upload_picture_producer')->publish(serialize($msg)); }
如您所见,如果您的配置中有一个名为 upload_picture 的生产者,那么在服务容器中,您将有一个名为 old_sound_rabbit_mq.upload_picture_producer 的服务。
除了消息本身之外,OldSound\RabbitMqBundle\RabbitMq\Producer#publish()
方法还接受一个可选的路由键参数和一个可选的附加属性数组。附加属性数组允许您更改默认情况下由 PhpAmqpLib\Message\AMQPMessage
对象构造的属性。这样,例如,您可以更改应用程序头。
您可以使用 setContentType 和 setDeliveryMode 方法分别设置消息内容类型和消息投递模式,覆盖“producers”配置部分中设置的任何默认值。如果未由“producers”配置或对这些方法的显式调用(如下例所示)覆盖,则默认值是内容类型为 text/plain,投递模式为 2。
$this->get('old_sound_rabbit_mq.upload_picture_producer')->setContentType('application/json');
如果您需要为生产者使用自定义类(应继承自 OldSound\RabbitMqBundle\RabbitMq\Producer
),则可以使用 class
选项。
... producers: upload_picture: class: My\Custom\Producer connection: default exchange_options: {name: 'upload-picture', type: direct} ...
拼图的下一部分是拥有一个消费者,该消费者将从队列中取出消息并相应地处理。
消费者
消费者将连接到服务器并启动一个 循环,等待处理传入的消息。这种消费者的指定 回调 将决定其行为。让我们回顾一下上面的消费者配置
consumers: upload_picture: connection: default exchange_options: {name: 'upload-picture', type: direct} queue_options: {name: 'upload-picture'} callback: upload_picture_service
正如我们所看到的,回调 选项引用了 upload_picture_service。当消费者从服务器收到消息时,它将执行此回调。如果您需要指定不同的回调进行测试或调试,则可以在此处更改它。
除了回调之外,我们还指定了要使用的连接方式,就像我们对 生产者 所做的那样。其余选项是 exchange_options 和 queue_options。exchange_options 应与 生产者 使用的相同。在 queue_options 中,我们将提供一个 队列名称。为什么?
正如我们所说的,AMQP 中的消息是发布到 交换机 的。这并不意味着消息已经到达 队列。为了实现这一点,我们首先需要创建这样的 队列,然后将其绑定到 交换机。酷的地方在于,您可以绑定多个 队列 到一个 交换机,这样一条消息就可以到达多个目的地。这种方法的优点是解耦了生产者和消费者。生产者不在乎有多少消费者会处理他的消息。它只需要确保消息到达服务器。这样我们就可以在不需要更改控制器中的代码的情况下,扩展每次上传图片时执行的操作。
现在,如何运行一个消费者?有一个命令可以执行,如下所示
$ ./app/console rabbitmq:consumer -m 50 upload_picture
这意味着什么?我们正在执行 upload_picture 消费者,并指示它只消费 50 条消息。每次消费者从服务器接收到消息时,它都会执行配置的回调,将 AMQP 消息作为 PhpAmqpLib\Message\AMQPMessage
类的实例传递。可以通过调用 $msg->body
来获取消息体。默认情况下,消费者将以 无限循环 的方式处理消息,对于 无限 的定义。
如果您想确保消费者在 Unix 信号发生时立即执行完毕,可以使用带有标志 -w
的命令。
$ ./app/console rabbitmq:consumer -w upload_picture
然后消费者将立即执行完毕。
要使用带有此标志的命令,您需要安装带有 PCNTL 扩展的 PHP。
如果您想为消费者设置内存限制,可以使用标志 -l
。在以下示例中,此标志添加了 256 MB 的内存限制。消费者将在达到 256MB 前停止 5 MB,以避免 PHP 允许的内存大小错误。
$ ./app/console rabbitmq:consumer -l 256
如果您想清除队列中等待的所有消息,可以执行此命令以清除此队列
$ ./app/console rabbitmq:purge --no-confirmation upload_picture
要删除消费者的队列,请使用此命令
$ ./app/console rabbitmq:delete --no-confirmation upload_picture
消费者事件
这在许多场景中都很有用。有 3 个 AMQPEvents
CONSUME ON
class OnConsumeEvent extends AMQPEvent { const NAME = AMQPEvent::ON_CONSUME; /** * OnConsumeEvent constructor. * * @param Consumer $consumer */ public function __construct(Consumer $consumer) { $this->setConsumer($consumer); } }
假设您需要在新的应用程序部署时暂停/停止消费者。您可以通过监听 OldSound\RabbitMqBundle\Event\OnConsumeEvent` 来检查新的应用程序部署。
在处理消息之前
class BeforeProcessingMessageEvent extends AMQPEvent { const NAME = AMQPEvent::BEFORE_PROCESSING_MESSAGE; /** * BeforeProcessingMessageEvent constructor. * * @param AMQPMessage $AMQPMessage */ public function __construct(Consumer $consumer, AMQPMessage $AMQPMessage) { $this->setConsumer($consumer); $this->setAMQPMessage($AMQPMessage); } }
在处理 AMQPMessage
之前引发的事件。
在处理消息之后
class AfterProcessingMessageEvent extends AMQPEvent { const NAME = AMQPEvent::AFTER_PROCESSING_MESSAGE; /** * AfterProcessingMessageEvent constructor. * * @param AMQPMessage $AMQPMessage */ public function __construct(Consumer $consumer, AMQPMessage $AMQPMessage) { $this->setConsumer($consumer); $this->setAMQPMessage($AMQPMessage); } }
在处理 AMQPMessage
之后引发的事件。如果处理消息会抛出异常,则不会引发该事件。
空闲消息
<?php class OnIdleEvent extends AMQPEvent { const NAME = AMQPEvent::ON_IDLE; /** * OnIdleEvent constructor. * * @param AMQPMessage $AMQPMessage */ public function __construct(Consumer $consumer) { $this->setConsumer($consumer); $this->forceStop = true; } }
当 wait
方法因超时而退出而没有接收到消息时引发事件。为了使用此事件,消费者必须配置 idle_timeout
。默认情况下,在空闲超时时退出,您可以在监听器中通过设置 $event->setForceStop(false)
来防止它。
空闲超时
如果您需要在一段时间内没有从队列中接收到消息时设置超时,您可以在秒中设置 idle_timeout
。idle_timeout_exit_code
指定当空闲超时时消费者应返回的退出代码。如果没有指定,消费者将抛出 PhpAmqpLib\Exception\AMQPTimeoutException
异常。
consumers: upload_picture: connection: default exchange_options: {name: 'upload-picture', type: direct} queue_options: {name: 'upload-picture'} callback: upload_picture_service idle_timeout: 60 idle_timeout_exit_code: 0
超时等待
以秒为单位设置 timeout_wait
。timeout_wait
指定消费者在确定当前连接仍然有效之前将等待多长时间才能接收到新消息。
consumers: upload_picture: connection: default exchange_options: {name: 'upload-picture', type: direct} queue_options: {name: 'upload-picture'} callback: upload_picture_service idle_timeout: 60 idle_timeout_exit_code: 0 timeout_wait: 10
优雅的最大执行超时
如果您希望消费者运行一定时间然后优雅地退出,请将 graceful_max_execution.timeout
以秒为单位设置。 "优雅退出" 的意思是,消费者将在当前正在运行的任务之后或立即退出,当等待新任务时。graceful_max_execution.exit_code
指定当优雅的最大执行超时发生时消费者应返回的退出代码。如果没有指定,消费者将使用状态 0
退出。
此功能与 supervisord 结合使用时非常出色,这可以允许进行定期的内存泄漏清理、与数据库/rabbitmq 的连接更新等。
consumers: upload_picture: connection: default exchange_options: {name: 'upload-picture', type: direct} queue_options: {name: 'upload-picture'} callback: upload_picture_service graceful_max_execution: timeout: 1800 # 30 minutes exit_code: 10 # default is 0
公平分发
您可能已经注意到分发仍然不完全符合我们的期望。例如,在有两个工作者的情况下,当所有奇数消息都很重而偶数消息都很轻时,一个工作者将始终忙碌,而另一个工作者几乎不会做任何工作。嗯,RabbitMQ 对此一无所知,仍然会平均分发消息。
这是因为 RabbitMQ 只在消息进入队列时分发消息。它不查看消费者未确认的消息数量。它只是盲目地将每第 n 条消息分发给第 n 个消费者。
为了克服这一点,我们可以使用具有
prefetch_count=1
设置的基本.qos 方法。这告诉 RabbitMQ 不要一次给工作者发送超过一条消息。换句话说,不要在工作者处理和确认之前的消息之后将其发送给工作者。相反,它将将其发送给下一个不忙碌的工作者。
来源: https://rabbitmq.cn/tutorials/tutorial-two-python.html
请谨慎行事,因为实现公平分发会引入延迟,这会损害性能(请参阅 此博客文章)。但是,实现它允许您在队列增加时动态地进行水平扩展。您应该评估,正如博客文章建议的那样,根据处理每条消息所需的时间和您的网络性能相应地确定预取大小。
使用 RabbitMqBundle,您可以通过以下方式配置每个消费者的 qos_options
consumers: upload_picture: connection: default exchange_options: {name: 'upload-picture', type: direct} queue_options: {name: 'upload-picture'} callback: upload_picture_service qos_options: {prefetch_size: 0, prefetch_count: 1, global: false}
自动装配生产者和消费者
如果与 Symfony 4.2+ 包一起使用,会在容器中声明生产者和普通消费者的别名集合。这些别名用于基于声明的类型和参数名称进行自动注入。这允许您将之前的生产者示例改为
public function indexAction($name, ProducerInterface $uploadPictureProducer) { $msg = array('user_id' => 1235, 'image_path' => '/path/to/new/pic.png'); $uploadPictureProducer->publish(serialize($msg)); }
参数名称由配置中的生产者或消费者名称构成,并根据类型添加生产者或消费者后缀。与容器项目命名约定中的单词后缀(生产者或消费者)不同,如果名称已经添加了后缀,则不会重复。 upload_picture
生产者键将更改为 $uploadPictureProducer
参数名称。upload_picture_producer
生产者键也会被别名到 $uploadPictureProducer
参数名称。最好避免以这种方式相似的名字。
所有生产者都别名为 OldSound\RabbitMqBundle\RabbitMq\ProducerInterface
和配置中的生产者类选项。在沙盒模式下,只创建 ProducerInterface
别名。强烈建议在为生产者注入参数时使用 ProducerInterface
类。
所有消费者都别名为 OldSound\RabbitMqBundle\RabbitMq\ConsumerInterface
和 %old_sound_rabbit_mq.consumer.class%
配置选项值。普通模式和沙盒模式之间没有区别。强烈建议在为客户端注入参数时使用 ConsumerInterface
。
回调
以下是一个回调示例
<?php //src/Acme/DemoBundle/Consumer/UploadPictureConsumer.php namespace Acme\DemoBundle\Consumer; use OldSound\RabbitMqBundle\RabbitMq\ConsumerInterface; use PhpAmqpLib\Message\AMQPMessage; class UploadPictureConsumer implements ConsumerInterface { public function execute(AMQPMessage $msg) { //Process picture upload. //$msg will be an instance of `PhpAmqpLib\Message\AMQPMessage` with the $msg->body being the data sent over RabbitMQ. $isUploadSuccess = someUploadPictureMethod(); if (!$isUploadSuccess) { // If your image upload failed due to a temporary error you can return false // from your callback so the message will be rejected by the consumer and // requeued by RabbitMQ. // Any other value not equal to false will acknowledge the message and remove it // from the queue return false; } } }
如您所见,这只需要实现一个方法:ConsumerInterface::execute。
请注意,您的回调必须作为常规 Symfony 服务注册。在那里,您可以注入服务容器、数据库服务、Symfony 记录器等。
有关消息实例的更多详细信息,请参阅https://github.com/php-amqplib/php-amqplib/blob/master/doc/AMQPMessage.md。
要停止消费者,回调可以抛出 StopConsumerException
(最后消费的消息将不会确认)或 AckStopConsumerException
(消息将被确认)。如果使用守护进程,例如 supervisor,消费者实际上将重新启动。
总结
这看起来只是为了发送消息要做很多工作,让我们回顾一下,这是我们需要生产和消费消息的
- 在配置中添加消费者/生产者条目。
- 实现您的回调。
- 从 CLI 启动消费者。
- 在控制器中添加发布消息的代码。
就这样!
审计/记录
这是对接收/发布的消息进行可追溯性的要求。为了启用此功能,您需要在消费者或发布者中添加 enable_logger
配置。
consumers: upload_picture: connection: default exchange_options: {name: 'upload-picture', type: direct} queue_options: {name: 'upload-picture'} callback: upload_picture_service enable_logger: true
如果您愿意,您还可以使用 monolog 的不同处理器处理队列的日志,通过引用通道 phpamqplib
。
RPC 或回复/响应
到目前为止,我们只是向消费者发送了消息,但如果我们想从他们那里得到回复怎么办?为了实现这一点,我们必须在我们的应用程序中实现 RPC 调用。此包使您能够通过 Symfony 轻松实现此类操作。
让我们将 RPC 客户端和服务器添加到配置中
rpc_clients: integer_store: connection: default #default: default unserializer: json_decode #default: unserialize lazy: true #default: false direct_reply_to: false rpc_servers: random_int: connection: default callback: random_int_server qos_options: {prefetch_size: 0, prefetch_count: 1, global: false} exchange_options: {name: random_int, type: topic} queue_options: {name: random_int_queue, durable: false, auto_delete: true} serializer: json_encode
有关完整的配置参考,请使用 php app/console config:dump-reference old_sound_rabbit_mq
命令。
这里有一个非常有用的服务器:它向其客户端返回随机整数。用于处理请求的回调将是 random_int_server 服务。现在让我们看看如何从我们的控制器中调用它。
首先,我们必须从命令行启动服务器
$ ./app/console_dev rabbitmq:rpc-server random_int
然后,将以下代码添加到我们的控制器中
public function indexAction($name) { $client = $this->get('old_sound_rabbit_mq.integer_store_rpc'); $client->addRequest(serialize(array('min' => 0, 'max' => 10)), 'random_int', 'request_id'); $replies = $client->getReplies(); }
如您所见,如果我们的客户端 ID 是 integer_store,那么服务名称将是 old_sound_rabbit_mq.integer_store_rpc。一旦我们得到这个对象,我们通过调用 addRequest
将请求发送到服务器,该请求期望三个参数
- 要发送到远程过程调用的参数。
- RPC 服务器名称,在我们的情况下是 random_int。
- 我们通话的请求标识符,在本例中为 request_id。
我们发送的参数是 rand()
函数的 min 和 max 值。我们通过序列化一个数组来发送它们。如果我们的服务器期望 JSON 信息或 XML,我们将在这里发送此类数据。
最后一部分是获取回复。我们的 PHP 脚本将阻塞,直到服务器返回一个值。变量 $replies 将是一个关联数组,其中每个来自服务器的回复都将包含在相应的 request_id 键中。
默认情况下,RPC 客户端期望响应被序列化。如果您正在与之工作的服务器返回非序列化的结果,则将 RPC 客户端的 expect_serialized_response
选项设置为 false。例如,如果 integer_store 服务器没有序列化结果,客户端设置如下
rpc_clients: integer_store: connection: default expect_serialized_response: false
您还可以为请求设置毫秒级的过期时间,在此之后,消息将不再由服务器处理,客户端请求将简单地超时。设置消息过期时间仅适用于 RabbitMQ 3.x 及以上版本。有关更多信息,请访问 https://rabbitmq.cn/ttl.html#per-message-ttl
public function indexAction($name) { $expiration = 5000; // milliseconds $client = $this->get('old_sound_rabbit_mq.integer_store_rpc'); $client->addRequest($body, $server, $requestId, $routingKey, $expiration); try { $replies = $client->getReplies(); // process $replies['request_id']; } catch (\PhpAmqpLib\Exception\AMQPTimeoutException $e) { // handle timeout } }
正如你所猜到的,我们也可以进行 并行 RPC 调用。
并行 RPC
假设为了渲染某些网页,你需要执行两个数据库查询,一个需要 5 秒完成,另一个需要 2 秒 ——非常昂贵的查询。如果你按顺序执行它们,那么你的页面将在大约 7 秒后准备好交付。如果你并行执行它们,那么你的页面将在大约 5 秒内提供服务。使用 RabbitMqBundle
我们可以轻松地进行此类并行调用。让我们在配置中定义一个并行客户端和另一个 RPC 服务器
rpc_clients: parallel: connection: default rpc_servers: char_count: connection: default callback: char_count_server random_int: connection: default callback: random_int_server
然后这段代码应该放在我们的控制器中
public function indexAction($name) { $client = $this->get('old_sound_rabbit_mq.parallel_rpc'); $client->addRequest($name, 'char_count', 'char_count'); $client->addRequest(serialize(array('min' => 0, 'max' => 10)), 'random_int', 'random_int'); $replies = $client->getReplies(); }
与前面的例子非常相似,我们只是多了一个 addRequest
调用。我们还提供了有意义的请求标识符,这样我们稍后就可以更容易地在 $replies 数组中找到我们想要的回复。
直接回复到客户端
要启用 直接回复到客户端,你只需在客户端的 rpc_clients 配置中启用选项 direct_reply_to。
此选项将在执行 RPC 调用时使用伪队列 amq.rabbitmq.reply-to。在 RPC 服务器上不需要进行任何修改。
优先队列
从 3.5.0 版本开始,RabbitMQ 在核心中实现了优先队列。任何队列都可以通过客户端提供的可选参数(但,与使用可选参数的其他功能不同,不是策略)转换为优先队列。该实现支持有限数量的优先级:255。建议使用 1 到 10 之间的值。请查看文档
以下是声明优先队列的示例
consumers: upload_picture: connection: default exchange_options: {name: 'upload-picture', type: direct} queue_options: {name: 'upload-picture', arguments: {'x-max-priority': ['I', 10]} } callback: upload_picture_service
如果 upload-picture
队列在运行 rabbitmq:setup-fabric
命令之前存在,你必须先删除该队列
现在假设你想发送一个高优先级消息,你必须用以下附加信息发布消息
public function indexAction() { $msg = array('user_id' => 1235, 'image_path' => '/path/to/new/pic.png'); $additionalProperties = ['priority' => 10] ; $routing_key = ''; $this->get('old_sound_rabbit_mq.upload_picture_producer')->publish(serialize($msg), $routing_key , $additionalProperties ); }
多个消费者
对于逻辑分离来说,拥有很多队列是一种很好的做法。使用简单的消费者,你必须为每个队列创建一个工作器(消费者),在处理许多演变时可能会很困难(忘记在 supervisord 配置中添加一行?)。这对于小型队列也很有用,因为你可能不希望拥有与队列一样多的工作者,并且想将一些任务组合在一起,同时不失去灵活性和分离原则。
多个消费者允许您通过在同一个消费者上监听多个队列来处理此用例。
以下是设置具有多个队列的消费者的方法
multiple_consumers: upload: connection: default exchange_options: {name: 'upload', type: direct} queues_provider: queues_provider_service queues: upload-picture: name: upload_picture callback: upload_picture_service routing_keys: - picture upload-video: name: upload_video callback: upload_video_service routing_keys: - video upload-stats: name: upload_stats callback: upload_stats
现在,回调被指定在每个队列下,并且必须像简单消费者一样实现 ConsumerInterface
。消费者中的所有 queues-options
选项都适用于每个队列。
请注意,所有队列都在同一个交换机下,路由回调的正确性取决于您。
queues_provider
是一个可选的服务,可以动态提供队列。它必须实现 QueuesProviderInterface
。
请注意,队列提供者负责正确调用 setDequeuer
并且回调是可调用的(不是 ConsumerInterface
)。如果提供队列的服务实现了 DequeuerAwareInterface
,则将向服务的定义中添加对 setDequeuer
的调用,其中 DequeuerInterface
目前是 MultipleConsumer
。
任意绑定
您可能会发现您的应用程序有一个复杂的流程,并且您需要任意绑定。任意绑定场景可能包括通过 destination_is_exchange
属性进行交换到交换机的绑定。
bindings: - {exchange: foo, destination: bar, routing_key: 'baz.*' } - {exchange: foo1, destination: foo, routing_key: 'baz.*', destination_is_exchange: true}
在创建任意绑定之前,rabbitmq:setup-fabric
命令将声明您的生产者、消费者和多个消费者配置中定义的交换机和队列。然而,rabbitmq:setup-fabric
将 不会 声明在绑定中定义的附加队列和交换机。确保交换机/队列已声明取决于您。
动态消费者
有时您必须动态更改消费者的配置。动态消费者允许您根据上下文以编程方式定义消费者队列选项。
例如,在一个场景中,定义的消费者必须负责动态数量的主题,并且您不想(或不能)每次都更改其配置。
定义一个实现 QueueOptionsProviderInterface
的服务 queue_options_provider
,并将其添加到您的 dynamic_consumers
配置中。
dynamic_consumers: proc_logs: connection: default exchange_options: {name: 'logs', type: topic} callback: parse_logs_service queue_options_provider: queue_options_provider_service
示例用法
$ ./app/console rabbitmq:dynamic-consumer proc_logs server1
在这种情况下,proc_logs
消费者为 server1
运行,并且它可以决定它使用的队列选项。
匿名消费者
现在,我们为什么需要匿名消费者呢?这听起来像某种互联网威胁或类似的东西……继续阅读。
在 AMQP 中,有一种称为 topic 的交换机类型,其中消息根据——您猜对了——消息的主题路由到队列。我们可以将有关我们应用程序的日志发送到使用主机名和日志严重性作为主题的 RabbiMQ 主题交换机。消息体将是日志内容,我们的路由键将如下所示
- server1.error
- server2.info
- server1.warning
- ...
由于我们不想让无限数量的日志填满队列,我们可以在想要监控系统时启动一个消费者,它创建一个队列并基于某些主题将其附加到 logs 交换机,例如,我们想要查看我们的服务器报告的所有错误。路由键将类似于:#.error。在这种情况下,我们必须想出一个队列名称,将其绑定到交换机,获取日志,取消绑定并删除队列。幸运的是,AMPQ 提供了一种在声明和绑定队列时提供正确选项时自动执行此操作的方法。问题是您不想记住所有这些选项。因此,我们实现了 匿名消费者 模式。
当我们启动匿名消费者时,它会处理这些细节,我们只需考虑实现消息到达时的回调。它被称为匿名,因为它不会指定队列名称,但它将等待 RabbitMQ 分配一个随机的名称给它。
现在,如何配置和运行这样的消费者?
anon_consumers: logs_watcher: connection: default exchange_options: {name: 'app-logs', type: topic} callback: logs_watcher
在此,我们指定交换机的名称和类型,以及当消息到达时应执行回调函数。
此匿名消费者现在可以监听与同一交换机相关联且类型为topic的生产者。
producers: app_logs: connection: default exchange_options: {name: 'app-logs', type: topic}
要启动一个匿名消费者,我们使用以下命令
$ ./app/console_dev rabbitmq:anon-consumer -m 5 -r '#.error' logs_watcher
与我们之前看到的命令相比,唯一的新选项是指定路由键:-r '#.error'
。
批量消费者
在某些情况下,您可能希望获取一批消息,然后对它们进行一些处理。批量消费者将允许您定义此类处理的逻辑。
例如:想象一下,您有一个队列,您在其中接收一条消息以在数据库中插入某些信息,您意识到如果进行批量插入,会比逐个插入要好得多。
定义一个实现BatchConsumerInterface
的回调服务,并将消费者的定义添加到您的配置中。
batch_consumers: batch_basic_consumer: connection: default exchange_options: {name: 'batch', type: fanout} queue_options: {name: 'batch'} callback: batch.basic qos_options: {prefetch_size: 0, prefetch_count: 2, global: false} timeout_wait: 5 auto_setup_fabric: false idle_timeout_exit_code: -2 keep_alive: false graceful_max_execution: timeout: 60
注意:如果将keep_alive
选项设置为true
,则将忽略idle_timeout_exit_code
,消费者进程将继续。
您可以实现一个批量消费者,它将在一个返回中确认所有消息,或者您可以控制要确认哪些消息。
namespace AppBundle\Service; use OldSound\RabbitMqBundle\RabbitMq\BatchConsumerInterface; use PhpAmqpLib\Message\AMQPMessage; class DevckBasicConsumer implements BatchConsumerInterface { /** * @inheritDoc */ public function batchExecute(array $messages) { echo sprintf('Doing batch execution%s', PHP_EOL); foreach ($messages as $message) { $this->executeSomeLogicPerMessage($message); } // you ack all messages got in batch return true; } }
namespace AppBundle\Service; use OldSound\RabbitMqBundle\RabbitMq\BatchConsumerInterface; use PhpAmqpLib\Message\AMQPMessage; class DevckBasicConsumer implements BatchConsumerInterface { /** * @inheritDoc */ public function batchExecute(array $messages) { echo sprintf('Doing batch execution%s', PHP_EOL); $result = []; /** @var AMQPMessage $message */ foreach ($messages as $message) { $result[$message->getDeliveryTag()] = $this->executeSomeLogicPerMessage($message); } // you ack only some messages that have return true // e.g: // $return = [ // 1 => true, // 2 => true, // 3 => false, // 4 => true, // 5 => -1, // 6 => 2, // ]; // The following will happen: // * ack: 1,2,4 // * reject and requeq: 3 // * nack and requeue: 6 // * reject and drop: 5 return $result; } }
如何运行以下批量消费者
$ ./bin/console rabbitmq:batch:consumer batch_basic_consumer -w
重要:批量消费者将没有-m|messages
选项可用。重要:如果您只想消费特定数量的批次然后停止消费者,批量消费者也可以有-b|batches
选项可用。仅当您想消费者在消费了这些批次消息后停止时才提供批次数!
STDIN 生产者
有一个命令可以从STDIN读取数据并将其发布到RabbitMQ队列。要使用它,首先您必须在配置文件中配置一个producer
服务,如下所示
producers: words: connection: default exchange_options: {name: 'words', type: direct}
该生产者将消息发布到words
直接交换。当然,您可以根据需要调整配置。
然后假设您想发布一些XML文件的内容,以便由消费者农场进行处理。您可以使用以下命令来发布它们
$ find vendor/symfony/ -name "*.xml" -print0 | xargs -0 cat | ./app/console rabbitmq:stdin-producer words
这意味着您可以使用纯Unix命令来组合生产者。
让我们分解这个单行命令
$ find vendor/symfony/ -name "*.xml" -print0
该命令将在symfony文件夹内找到所有.xml
文件,并将文件名打印出来。然后,每个文件名都通过xargs
通过管道传递给cat
。
$ xargs -0 cat
最后,cat
的输出直接传递给我们的生产者,该生产者调用方式如下
$ ./app/console rabbitmq:stdin-producer words
它只需要一个参数,即您在config.yml
文件中配置的生产者名称。
其他命令
设置RabbitMQ结构
该组件的目的是让您的应用程序生产消息并将它们发布到您配置的某些交换机。
在某些情况下,即使配置正确,您产生的消息也可能不会被路由到任何队列,因为没有队列存在。负责队列消费的消费者必须运行,队列才会被创建。
当消费者数量较多时,为每个消费者启动命令可能是一个噩梦。
为了一次性创建交换机、队列和绑定,并确保不会丢失任何消息,您可以运行以下命令
$ ./app/console rabbitmq:setup-fabric
当需要时,您可以配置您的消费者和生产者,假设RabbitMQ结构已经定义。为此,请在您的配置中添加以下内容
producers: upload_picture: auto_setup_fabric: false consumers: upload_picture: auto_setup_fabric: false
默认情况下,消费者或生产者在启动时会向RabbitMQ声明所需的一切。使用时请小心,如果在没有定义交换机或队列的情况下使用,将会出现错误。当您更改任何配置时,需要运行上面的setup-fabric命令来声明您的配置。
如何贡献
要贡献,只需打开一个带有您新代码的Pull Request,注意如果您添加新功能或修改现有功能,您必须在README中说明它们的功能。如果您破坏了BC,您也必须进行说明。另外,您还需要更新CHANGELOG。所以
- 记录新功能。
- 更新CHANGELOG。
- 记录破坏性变更。
许可证
查看:resources/meta/LICENSE.md
致谢
该组件结构和文档部分基于RedisBundle。