mikemadisonweb / yii2-rabbitmq
基于php-amqplib的包装器,用于通过RabbitMQ将消息集成到您的Yii2应用程序中。灵感来源于Symfony 2的RabbitMqBundle,是一个非常棒的包。
Requires
- php: ^7.1||^8.0
- php-amqplib/php-amqplib: ^3.1
- yiisoft/yii2: ^2.0.43
Requires (Dev)
- php-coveralls/php-coveralls: ^2.5
- phpunit/phpunit: ^9.5.10
README
基于php-amqplib库的包装器,用于通过RabbitMQ将消息集成到您的Yii2应用程序中。灵感来源于Symfony框架的RabbitMqBundle。
本文档适用于版本2.*,需要PHP版本>=7.0。对于PHP版本>=5.4的旧版应用程序,请使用此扩展的先前版本。
安装
安装此扩展的首选方式是通过composer。
运行
php composer.phar require mikemadisonweb/yii2-rabbitmq
或添加
"mikemadisonweb/yii2-rabbitmq": "^2.2.0"
到您的composer.json
文件的require部分。
配置
此扩展简化了创建RabbitMQ生产者和消费者以满足您特定需求的操作。以下是一个基本配置示例
<?php return [ // should be in common.php 'components' => [ // ... 'rabbitmq' => [ 'class' => \mikemadisonweb\rabbitmq\Configuration::class, 'connections' => [ [ // You can pass these parameters as a single `url` option: https://rabbitmq.cn/uri-spec.html 'host' => 'YOUR_HOSTNAME', 'port' => '5672', 'user' => 'YOUR_USERNAME', 'password' => 'YOUR_PASSWORD', 'vhost' => '/', ] // When multiple connections is used you need to specify a `name` option for each one and define them in producer and consumer configuration blocks ], 'exchanges' => [ [ 'name' => 'YOUR_EXCHANGE_NAME', 'type' => 'direct' // Refer to Defaults section for all possible options ], ], 'queues' => [ [ 'name' => 'YOUR_QUEUE_NAME', // Queue can be configured here the way you want it: //'durable' => true, //'auto_delete' => false, ], [ 'name' => 'YOUR_ANOTHER_QUEUE_NAME', ], ], 'bindings' => [ [ 'queue' => 'YOUR_QUEUE_NAME', 'exchange' => 'YOUR_EXCHANGE_NAME', 'routing_keys' => ['YOUR_ROUTING_KEY'], ], ], 'producers' => [ [ 'name' => 'YOUR_PRODUCER_NAME', ], ], 'consumers' => [ [ 'name' => 'YOUR_CONSUMER_NAME', // Every consumer should define one or more callbacks for corresponding queues 'callbacks' => [ // queue name => callback class name 'YOUR_QUEUE_NAME' => \path\to\YourConsumer::class, ], ], ], ], ], ];
要使用此扩展,您应该熟悉RabbitMQ的基本概念。如果您对此知识不太自信,建议阅读这篇文章。
'callback'参数可以是类名或依赖注入容器中的服务名。从Yii版本2.0.11开始,您可以像这样配置容器
<?php use yii\di\Instance; return [ // ... 'container' => [ 'definitions' => [], 'singletons' => [ 'rabbitmq.import-data.consumer' => [ [ 'class' => \path\to\YourConsumer::class, ], [ // If dependency is needed 'some-dependency' => Instance::of('dependency-service-name'), ], ], ], ], ];
如果您需要多个消费者,可以在配置中列出相应的条目,但这将需要一个单独的工作进程(守护进程)来处理每个消费者。如果您处理的是小型队列并且消息的消费速度非常快,那么将它们分组到一个工作进程中可能非常好。所以只需在消费者配置中列出您的回调,然后一个工作进程将在多个队列上执行您的业务逻辑。
请确保在相应的绑定中定义了所有队列和交换机,正确的消息路由设置取决于您。
生命周期事件
还实现了一些生命周期事件:before_consume、after_consume、before_publish、after_publish。您可以使用它们在消息被消费/发布前后执行任何额外的工作。例如,确保在消费者作为长时间运行的过程时,Yii知道数据库连接已被超时关闭。
<?php // config/main.php return [ // ... 'components' => [ // ... 'rabbitmq' => [ // ... 'on before_consume' => function ($event) { if (\Yii::$app->has('db') && \Yii::$app->db->isActive) { try { \Yii::$app->db->createCommand('SELECT 1')->query(); } catch (\yii\db\Exception $exception) { \Yii::$app->db->close(); } } }, ], // ... ], ];
记录器
最后但同样重要的是,记录器配置也是可选的
<?php // config/main.php return [ // ... 'components' => [ // ... 'rabbitmq' => [ // ... 'logger' => [ 'log' => true, 'category' => 'application', 'print_console' => false, 'system_memory' => false, ], ], // ... ], ];
默认禁用记录器。启用时,它将消息记录到主应用程序日志或您指定的相应类别名称的自定义日志目标。'print_console'选项在调试控制台中的消费者时提供更多信息。
示例
简单的Yii2基本模板的设置,以及RabbitMQ扩展,可在此处找到。您可以随意实验它,并以隔离的方式调试现有的配置。
控制台命令
扩展提供了几个控制台命令
- rabbitmq/consume - 运行消费者
- rabbitmq/declare-all - 根据配置创建RabbitMQ交换机、队列和绑定
- rabbitmq/declare-exchange - 创建配置中列出的交换机
- rabbitmq/declare-queue - 创建配置中列出的队列
- rabbitmq/delete-all - 删除配置中定义的所有RabbitMQ交换机和队列
- rabbitmq/delete-exchange - 删除交换机
- rabbitmq/delete-queue - 删除队列
- rabbitmq/publish - 从标准输入发布消息到队列
- rabbitmq/purge-queue - 从队列中删除所有消息
启动消费者
yii rabbitmq/consume YOUR_CONSUMER_NAME
在这种情况下,您可以使用进程控制系统,如Supervisor,来重启消费者进程,这样就可以使工作进程持续运行。
消息限制
由于PHP守护进程,尤其是基于框架的守护进程可能容易发生内存泄漏,因此限制要消费和停止的消息数量可能是合理的。
--memoryLimit, -l: (defaults to 0)
--messagesLimit, -m: (defaults to 0)
自动声明
默认情况下,扩展配置在自动声明模式下,这意味着在每次发布消息的交换机、队列和绑定都会进行检查和创建(如果缺少)。如果性能对您的应用程序非常重要,您应该禁用此功能,并在配置中使用控制台命令自行声明和删除路由模式。
用法
由于消费者工作进程将读取队列中的消息,执行回调方法并将消息传递给它。
消费
为了使一个类成为回调,它应该实现ConsumerInterface接口
<?php namespace components\rabbitmq; use mikemadisonweb\rabbitmq\components\ConsumerInterface; use PhpAmqpLib\Message\AMQPMessage; class YourConsumer implements ConsumerInterface { /** * @param AMQPMessage $msg * @return bool */ public function execute(AMQPMessage $msg) { $data = $msg->body; // Apply your business logic here return ConsumerInterface::MSG_ACK; } }
您可以发布任何数据类型(对象、int、数组等),尽管RabbitMQ将有效负载作为字符串传输,但在消费者中 $msg->body 的数据将与发送时的数据类型相同。
返回代码
关于返回代码,有一系列它们以控制您通过代理的消息的后续处理
- ConsumerInterface::MSG_ACK - 确认消息(标记为已处理)并从队列中删除它
- ConsumerInterface::MSG_REJECT - 拒绝并从队列中删除消息
- ConsumerInterface::MSG_REJECT_REQUEUE - 拒绝并将消息重新入队到RabbitMQ
发布
以下是一个如何发布消息的示例
$producer = \Yii::$app->rabbitmq->getProducer('YOUR_PRODUCER_NAME'); $msg = serialize(['dataset_id' => 657, 'linked_datasets' => []]); $producer->publish($msg, 'YOUR_EXCHANGE_NAME', 'YOUR_ROUTING_KEY');
路由键作为第三个参数是可选的,这适用于扇出交换机。
默认情况下,只有在发布消息时才会建立与代理的连接,如果没有必要,它不会在每次HTTP请求上尝试连接。
选项
所有配置选项
$rabbitmq_defaults = [ 'auto_declare' => true, 'connections' => [ [ 'name' => self::DEFAULT_CONNECTION_NAME, 'type' => AMQPLazyConnection::class, 'url' => null, 'host' => null, 'port' => 5672, 'user' => 'guest', 'password' => 'guest', 'vhost' => '/', 'connection_timeout' => 3, 'read_write_timeout' => 3, 'ssl_context' => null, 'keepalive' => false, 'heartbeat' => 0, 'channel_rpc_timeout' => 0.0 ], ], 'exchanges' => [ [ 'name' => null, 'type' => null, 'passive' => false, 'durable' => true, 'auto_delete' => false, 'internal' => false, 'nowait' => false, 'arguments' => null, 'ticket' => null, ], ], 'queues' => [ [ 'name' => '', 'passive' => false, 'durable' => true, 'exclusive' => false, 'auto_delete' => false, 'nowait' => false, 'arguments' => null, 'ticket' => null, ], ], 'bindings' => [ [ 'exchange' => null, 'queue' => null, 'to_exchange' => null, 'routing_keys' => [], ], ], 'producers' => [ [ 'name' => null, 'connection' => self::DEFAULT_CONNECTION_NAME, 'safe' => true, 'content_type' => 'text/plain', 'delivery_mode' => 2, 'serializer' => 'serialize', ], ], 'consumers' => [ [ 'name' => null, 'connection' => self::DEFAULT_CONNECTION_NAME, 'callbacks' => [], 'qos' => [ 'prefetch_size' => 0, 'prefetch_count' => 0, 'global' => false, ], 'idle_timeout' => 0, 'idle_timeout_exit_code' => null, 'proceed_on_exception' => false, 'deserializer' => 'unserialize', ], ], 'logger' => [ 'log' => false, 'category' => 'application', 'print_console' => true, 'system_memory' => false, ], ];
交换机
例如,要声明一个交换机,您应该提供其名称和类型。
使用 arguments
参数的一个好例子可以创建一个 死信交换机。
队列
至于队列声明,所有参数都是可选的。即使您不提供队列名称,服务器也会为您生成一个唯一的名称。
有关选项、它们的默认值和有价值的信息的完整说明可以在 AMQP 0-9-1 参考指南 中找到。
请注意,并非所有这些选项都可以“即时”更改,也就是说,在队列或交换机创建之后。否则,您将收到一个错误。
破坏性变更
从版本 1.* 开始,此扩展完全重写了内部结构,可以被认为是全新的。然而,以下关键区别可以区分开来
- PHP版本 7.0 及以上版本要求
- 配置格式已更改
- 所有扩展组件都通过 Yii2 启动 自动加载
- 支持不同的连接类型
- 所有扩展组件都在DIC中注册为单例
- 添加了路由组件以控制代理中的模式
- 队列和交换机的默认选项已更改
- 控制台命令已合并到单个控制器类中,该类会自动添加,无需配置
- 添加了新的控制台命令以操作路由模式
- 支持所有数据类型作为消息有效负载
- 消费者以可预测的方式处理控制信号