elementfani / yii2-rabbitmq
基于 php-amqplib 的包装器,通过 RabbitMQ 在 Yii2 应用程序中集成消息。受到 Symfony 2 的 RabbitMqBundle 的启发,这是一个非常棒的包。
Requires
- php: ^7.0 || ^8.0
- php-amqplib/php-amqplib: ^3.0
- yiisoft/yii2: ^2.0.13
Requires (Dev)
- php-coveralls/php-coveralls: ^1.0
- phpunit/phpunit: ^6.4
README
基于 php-amqplib 库的包装器,通过 RabbitMQ 在 Yii2 应用程序中集成消息。受到 Symfony 框架的 RabbitMqBundle 的启发。
本文档适用于版本 2.*,需要 PHP 版本 >=7.0。对于 PHP 5.4 及以上版本的旧版应用程序,请使用 此扩展的先前版本。
安装
安装此扩展的首选方式是通过 composer。
运行
php composer.phar require mikemadisonweb/yii2-rabbitmq
或添加
"mikemadisonweb/yii2-rabbitmq": "^2.2.0"
到您的 composer.json 文件的 require 部分。
配置
此扩展有助于创建 RabbitMQ 生产者和消费者 来满足您的特定需求。这是一个基本的配置示例
<?php return [ // should be in common.php 'components' => [ // ... 'rabbitmq' => [ 'class' => \mikemadisonweb\rabbitmq\Configuration::class, 'connections' => [ [ // You can pass these parameters as a single `url` option: https://rabbitmq.cn/uri-spec.html 'host' => 'YOUR_HOSTNAME', 'port' => '5672', 'user' => 'YOUR_USERNAME', 'password' => 'YOUR_PASSWORD', 'vhost' => '/', ] // When multiple connections is used you need to specify a `name` option for each one and define them in producer and consumer configuration blocks ], 'exchanges' => [ [ 'name' => 'YOUR_EXCHANGE_NAME', 'type' => 'direct' // Refer to Defaults section for all possible options ], ], 'queues' => [ [ 'name' => 'YOUR_QUEUE_NAME', // Queue can be configured here the way you want it: //'durable' => true, //'auto_delete' => false, ], [ 'name' => 'YOUR_ANOTHER_QUEUE_NAME', ], ], 'bindings' => [ [ 'queue' => 'YOUR_QUEUE_NAME', 'exchange' => 'YOUR_EXCHANGE_NAME', 'routing_keys' => ['YOUR_ROUTING_KEY'], ], ], 'producers' => [ [ 'name' => 'YOUR_PRODUCER_NAME', ], ], 'consumers' => [ [ 'name' => 'YOUR_CONSUMER_NAME', // Every consumer should define one or more callbacks for corresponding queues 'callbacks' => [ // queue name => callback class name 'YOUR_QUEUE_NAME' => \path\to\YourConsumer::class, ], ], ], ], ], ];
要使用此扩展,您应该熟悉 RabbitMQ 的基本概念。如果您对自己的知识不太自信,我建议阅读 这篇文章。
'callback' 参数可以是类名或来自 依赖注入容器 的服务名。从 Yii 版本 2.0.11 开始,您可以像这样配置您的容器
<?php use yii\di\Instance; return [ // ... 'container' => [ 'definitions' => [], 'singletons' => [ 'rabbitmq.import-data.consumer' => [ [ 'class' => \path\to\YourConsumer::class, ], [ // If dependency is needed 'some-dependency' => Instance::of('dependency-service-name'), ], ], ], ], ];
如果您需要多个消费者,可以在配置中列出相应的条目,但每个消费者都需要一个单独的工人(守护进程)。如果您处理的是小型队列,并且消息消费得非常快,那么将它们组合到一个工人中可能绝对是可以接受的。所以只需在消费者配置中列出您的回调,一个工人就会在多个队列上执行您的业务逻辑。
确保所有队列和交换机都在相应的绑定中定义,设置正确消息路由是您的责任。
生命周期事件
还实现了一些生命周期事件:before_consume, after_consume, before_publish, after_publish。您可以使用它们在消息被消费/发布前后执行任何附加工作。例如,确保 Yii 知道数据库连接已由超时关闭,因为消费者是一个长时间运行的过程
<?php // config/main.php return [ // ... 'components' => [ // ... 'rabbitmq' => [ // ... 'on before_consume' => function ($event) { if (\Yii::$app->has('db') && \Yii::$app->db->isActive) { try { \Yii::$app->db->createCommand('SELECT 1')->query(); } catch (\yii\db\Exception $exception) { \Yii::$app->db->close(); } } }, ], // ... ], ];
记录器
最后但并非最不重要的是记录器配置,这也是可选的
<?php // config/main.php return [ // ... 'components' => [ // ... 'rabbitmq' => [ // ... 'logger' => [ 'log' => true, 'category' => 'application', 'print_console' => false, 'system_memory' => false, ], ], // ... ], ];
默认禁用记录器。启用时,它将消息记录到主应用程序日志或您指定的相应类别名称的自己的日志目标。选项 'print_console' 在调试控制台中的消费者时提供额外信息。
示例
带有 RabbitMQ 扩展的 Yii2 基本模板的简单设置可在 此处 获取。请随意实验它,并以隔离的方式调试现有的配置。
控制台命令
扩展提供了几个控制台命令
- rabbitmq/consume - 运行消费者
- rabbitmq/declare-all - 根据配置创建 RabbitMQ 交换机、队列和绑定
- rabbitmq/declare-exchange - 创建配置中列出的交换机
- rabbitmq/declare-queue - 创建配置中列出的队列
- rabbitmq/delete-all - 删除配置中定义的所有RabbitMQ交换机和队列
- rabbitmq/delete-exchange - 删除交换机
- rabbitmq/delete-queue - 删除队列
- rabbitmq/publish - 从标准输入发布消息到队列
- rabbitmq/purge-queue - 从队列中删除所有消息
启动消费者
yii rabbitmq/consume YOUR_CONSUMER_NAME
在这种情况下,您可以使用进程控制系统,如Supervisor,来重启消费者进程,这样可以使您的工人持续运行。
消息限制
由于PHP守护进程特别是基于框架的可能会出现内存泄漏,因此限制要消费和停止的消息数量可能是合理的。
--memoryLimit, -l: (defaults to 0)
--messagesLimit, -m: (defaults to 0)
自动声明
默认情况下,扩展配置在自动声明模式,这意味着在每次发布消息的交换机、队列和绑定将会检查并创建如果缺失。如果性能对您的应用程序非常重要,您应该在配置中禁用该功能并使用控制台命令自行声明和删除路由模式。
用法
作为消费者工作者将从队列中读取消息,执行回调方法并将消息传递给它。
消费
为了使一个类成为回调,它应该实现ConsumerInterface接口
<?php namespace components\rabbitmq; use mikemadisonweb\rabbitmq\components\ConsumerInterface; use PhpAmqpLib\Message\AMQPMessage; class YourConsumer implements ConsumerInterface { /** * @param AMQPMessage $msg * @return bool */ public function execute(AMQPMessage $msg) { $data = $msg->body; // Apply your business logic here return ConsumerInterface::MSG_ACK; } }
您可以发布任何数据类型(对象、整型、数组等),尽管RabbitMQ在这里将负载作为字符串传输,但在消费者中的$msg->body您的数据将是相同类型它发送。
返回代码
关于返回代码,有许多代码可以用来控制代理对消息的后续处理
- ConsumerInterface::MSG_ACK - 确认消息(标记为已处理)并从队列中删除
- ConsumerInterface::MSG_REJECT - 拒绝并从队列中删除消息
- ConsumerInterface::MSG_REJECT_REQUEUE - 拒绝并将消息重新入队到RabbitMQ
发布
以下是如何发布消息的示例
$producer = \Yii::$app->rabbitmq->getProducer('YOUR_PRODUCER_NAME'); $msg = serialize(['dataset_id' => 657, 'linked_datasets' => []]); $producer->publish($msg, 'YOUR_EXCHANGE_NAME', 'YOUR_ROUTING_KEY');
路由键作为第三个参数是可选的,这对于扇出交换机是可能的。
默认情况下,只有在发布消息时才会建立与代理的连接,如果不需要,它不会尝试在每次HTTP请求时连接。
选项
所有配置选项
$rabbitmq_defaults = [ 'auto_declare' => true, 'connections' => [ [ 'name' => self::DEFAULT_CONNECTION_NAME, 'type' => AMQPLazyConnection::class, 'url' => null, 'host' => null, 'port' => 5672, 'user' => 'guest', 'password' => 'guest', 'vhost' => '/', 'connection_timeout' => 3, 'read_write_timeout' => 3, 'ssl_context' => null, 'keepalive' => false, 'heartbeat' => 0, 'channel_rpc_timeout' => 0.0 ], ], 'exchanges' => [ [ 'name' => null, 'type' => null, 'passive' => false, 'durable' => true, 'auto_delete' => false, 'internal' => false, 'nowait' => false, 'arguments' => null, 'ticket' => null, ], ], 'queues' => [ [ 'name' => '', 'passive' => false, 'durable' => true, 'exclusive' => false, 'auto_delete' => false, 'nowait' => false, 'arguments' => null, 'ticket' => null, ], ], 'bindings' => [ [ 'exchange' => null, 'queue' => null, 'to_exchange' => null, 'routing_keys' => [], ], ], 'producers' => [ [ 'name' => null, 'connection' => self::DEFAULT_CONNECTION_NAME, 'safe' => true, 'content_type' => 'text/plain', 'delivery_mode' => 2, 'serializer' => 'serialize', ], ], 'consumers' => [ [ 'name' => null, 'connection' => self::DEFAULT_CONNECTION_NAME, 'callbacks' => [], 'qos' => [ 'prefetch_size' => 0, 'prefetch_count' => 0, 'global' => false, ], 'idle_timeout' => 0, 'idle_timeout_exit_code' => null, 'proceed_on_exception' => false, 'deserializer' => 'unserialize', ], ], 'logger' => [ 'log' => false, 'category' => 'application', 'print_console' => true, 'system_memory' => false, ], ];
交换
例如,为了声明一个交换机,您应该提供其名称和类型。
使用arguments参数的一个很好的用例是创建死信交换机。
队列
至于队列声明,所有参数都是可选的。即使您没有为您的队列提供名称,服务器也会为您生成一个唯一的名称。
有关选项的完整说明、它们的默认值和有价值的信息,可以在AMQP 0-9-1 参考指南中找到。
请注意,不是所有这些选项都可以在“on-the-fly”更改,换句话说,在队列或交换机已创建之后。否则,您将收到一个错误。
破坏性变化
自1.*版本以来,此扩展已完全重写内部,可以认为是全新的。但是,以下关键差异可以区分开来
- PHP版本7.0及以上
- 配置格式已更改
- 所有扩展组件都使用Yii2 Bootstraping自动加载
- 支持不同的连接类型
- 所有扩展组件都在DIC中注册为单例
- 添加路由组件以控制代理中的模式
- 队列和交换机的默认选项已更改
- 控制台命令已合并到一个控制器类中,该类会自动添加,不需要配置
- 添加了新的控制台命令来操作路由模式
- 支持所有数据类型作为消息负载
- 消费者以可预测的方式处理控制信号