rickcy / yii2-rabbitmq
基于 php-amqplib 的包装器,通过 RabbitMQ 在 Yii2 应用程序中集成消息。
Requires
- php: ^7.1
- ext-json: *
- ext-pcntl: *
- php-amqplib/php-amqplib: ^2
- yiisoft/yii2: ^2.0.32
Requires (Dev)
- php-coveralls/php-coveralls: ^2
- phpunit/phpunit: ^8
- roave/security-advisories: dev-master
README
基于 php-amqplib 库的包装器,通过 RabbitMQ 在 Yii2 应用程序中集成消息。
本文档适用于版本 2.*,需要 PHP 版本 >=7.0
安装
安装此扩展的最佳方式是通过 composer。
运行以下命令之一:
php composer.phar require rickcy/yii2-rabbitmq
或添加以下内容到您的 composer.json
文件的 require 部分。
"rickcy/yii2-rabbitmq": "^1.0"
配置
此扩展简化了 RabbitMQ 生产者和消费者 的创建,以满足您的特定需求。以下是一个基本配置示例:
要使用此扩展,您应该熟悉 RabbitMQ 的基本概念。如果您对此不太自信,建议阅读 这篇文章。
<?php return [ // should be in common.php 'components' => [ // ... 'rabbitmq' => [ 'class' => \rickcy\rabbitmq\Configuration::class, 'connections' => [ [ // You can pass these parameters as a single `url` option: https://rabbitmq.cn/uri-spec.html 'host' => 'YOUR_HOSTNAME', 'port' => '5672', 'user' => 'YOUR_USERNAME', 'password' => 'YOUR_PASSWORD', 'vhost' => '/', ] // When multiple connections is used you need to specify a `name` option for each one and define them in producer and consumer configuration blocks ], 'exchanges' => [ [ 'name' => 'YOUR_EXCHANGE_NAME', 'type' => 'direct' // Refer to Defaults section for all possible options ], ], 'queues' => [ [ 'name' => 'YOUR_QUEUE_NAME', // Queue can be configured here the way you want it: //'durable' => true, //'auto_delete' => false, ], [ 'name' => 'YOUR_ANOTHER_QUEUE_NAME', ], ], 'bindings' => [ [ 'queue' => 'YOUR_QUEUE_NAME', 'exchange' => 'YOUR_EXCHANGE_NAME', 'routing_keys' => ['YOUR_ROUTING_KEY'], ], ], 'producers' => [ [ 'name' => 'YOUR_PRODUCER_NAME', ], ], 'consumers' => [ [ 'name' => 'YOUR_CONSUMER_NAME', // Every consumer should define one or more callbacks for corresponding queues 'callbacks' => [ // queue name => callback class name 'YOUR_QUEUE_NAME' => \path\to\YourConsumer::class, ], ], ], ], ], ];
'callback' 参数可以是类名或来自 依赖注入容器 的服务名。从 Yii 版本 2.0.11 开始,您可以像这样配置您的容器
如果您需要多个消费者,可以在配置中列出相应的条目,但这样将为每个消费者需要单独的工作进程(守护进程)。如果您处理的是小队列并且消息消费得很快,那么将它们组合到一个工作进程中可能是绝对合适的。所以只需在消费者配置中列出您的回调,一个工作进程就会在多个队列上执行您的业务逻辑。
<?php use yii\di\Instance; return [ // ... 'container' => [ 'definitions' => [], 'singletons' => [ 'rabbitmq.import-data.consumer' => [ [ 'class' => \path\to\YourConsumer::class, ], [ // If dependency is needed 'some-dependency' => Instance::of('dependency-service-name'), ], ], ], ], ];
请确保所有队列和交换机都在相应的绑定中定义,正确设置消息路由取决于您。
生命周期事件
还实现了一些生命周期事件:before_consume、after_consume、before_publish、after_publish。您可以使用它们在消息被消费/发布前后进行任何附加工作。例如,确保 Yii 知道数据库连接在消费者(一个长时间运行的过程)超时后被关闭。
日志记录器
<?php // config/main.php return [ // ... 'components' => [ // ... 'rabbitmq' => [ // ... 'on before_consume' => function ($event) { if (\Yii::$app->has('db') && \Yii::$app->db->isActive) { try { \Yii::$app->db->createCommand('SELECT 1')->query(); } catch (\yii\db\Exception $exception) { \Yii::$app->db->close(); } } }, ], // ... ], ];
最后但同样重要的是日志记录器配置,这也是可选的。
默认情况下禁用日志记录器。当启用时,它将日志消息记录到主应用程序日志或指定的类别名称对应的自定义日志目标。选项 'print_console' 在您在控制台中调试消费者时提供更多信息。
<?php // config/main.php return [ // ... 'components' => [ // ... 'rabbitmq' => [ // ... 'logger' => [ 'log' => true, 'category' => 'application', 'print_console' => false, 'system_memory' => false, ], ], // ... ], ];
示例
在 这里 可以找到使用 RabbitMQ 扩展的 Yii2 基本模板的简单设置。您可以自由地对其进行实验并单独调试现有的配置。
控制台命令
此扩展提供了一些控制台命令
- rabbitmq/consume - 运行消费者
- rabbitmq/declare-all - 根据配置创建 RabbitMQ 交换机、队列和绑定
- rabbitmq/declare-exchange - 创建配置中列出的交换机
- rabbitmq/declare-queue - 创建配置中列出的队列
- rabbitmq/delete-all - 删除配置中定义的所有 RabbitMQ 交换机和队列
- rabbitmq/delete-exchange - 删除交换机
- rabbitmq/delete-queue - 删除队列
- rabbitmq/publish - 从 STDIN 发布消息到队列
- rabbitmq/purge-queue - 删除队列中的所有消息
启动消费者
yii rabbitmq/consume YOUR_CONSUMER_NAME
在这种情况下,您可以使用进程控制系统,如Supervisor,来重启消费者进程,从而保持工作进程持续运行。
消息限制
由于PHP守护进程,尤其是基于框架的守护进程可能容易发生内存泄漏,因此限制要消费和停止的消息数量可能是合理的。
--memoryLimit, -l: (defaults to 0)
--messagesLimit, -m: (defaults to 0)
自动声明
默认情况下,扩展配置为自动声明模式,这意味着对于每个发布消息的交换、队列和绑定,都会进行检查和创建(如果缺失)。如果性能对您的应用程序非常重要,您应该在配置中禁用该功能,并使用控制台命令自行声明和删除路由模式。
用法
由于消费者工作进程将从队列中读取消息,执行回调方法并将消息传递给它。
消费
为了使类成为回调,它应该实现ConsumerInterface接口
<?php namespace components\rabbitmq; use rickcy\rabbitmq\components\ConsumerInterface; use PhpAmqpLib\Message\AMQPMessage; class YourConsumer implements ConsumerInterface { /** * @param AMQPMessage $msg * @return bool */ public function execute(AMQPMessage $msg) { $data = $msg->body; // Apply your business logic here return ConsumerInterface::MSG_ACK; } }
您可以发布任何数据类型(对象、整数、数组等),尽管RabbitMQ在这里将负载作为字符串传输,但在消费者中的msgs->body,您的数据将与发送时的数据类型相同。
返回代码
关于返回代码,有多个代码以供您控制代理对消息的后续处理
- ConsumerInterface::MSG_ACK - 确认消息(标记为已处理)并从队列中删除
- ConsumerInterface::MSG_REJECT - 拒绝并从队列中删除消息
- ConsumerInterface::MSG_REJECT_REQUEUE - 拒绝并在RabbitMQ中重新入队消息
发布
以下是一个发布消息的示例
$producer = \Yii::$app->rabbitmq->getProducer('YOUR_PRODUCER_NAME'); $msg = serialize(['dataset_id' => 657, 'linked_datasets' => []]); $producer->publish($msg, 'YOUR_EXCHANGE_NAME', 'YOUR_ROUTING_KEY');
路由键作为第三个参数是可选的,这在fanout交换中是可能的。
默认情况下,仅在发布消息时才会建立与代理的连接,如果不需要,则不会在每次HTTP请求上尝试连接。
选项
所有配置选项
$rabbitmq_defaults = [ 'auto_declare' => true, 'connections' => [ [ 'name' => self::DEFAULT_CONNECTION_NAME, 'type' => AMQPLazyConnection::class, 'url' => null, 'host' => null, 'port' => 5672, 'user' => 'guest', 'password' => 'guest', 'vhost' => '/', 'connection_timeout' => 3, 'read_write_timeout' => 3, 'ssl_context' => null, 'keepalive' => false, 'heartbeat' => 0, 'channel_rpc_timeout' => 0.0 ], ], 'exchanges' => [ [ 'name' => null, 'type' => null, 'passive' => false, 'durable' => true, 'auto_delete' => false, 'internal' => false, 'nowait' => false, 'arguments' => null, 'ticket' => null, ], ], 'queues' => [ [ 'name' => '', 'passive' => false, 'durable' => true, 'exclusive' => false, 'auto_delete' => false, 'nowait' => false, 'arguments' => null, 'ticket' => null, ], ], 'bindings' => [ [ 'exchange' => null, 'queue' => null, 'to_exchange' => null, 'routing_keys' => [], ], ], 'producers' => [ [ 'name' => null, 'connection' => self::DEFAULT_CONNECTION_NAME, 'safe' => true, 'content_type' => 'text/plain', 'delivery_mode' => 2, 'serializer' => 'serialize', ], ], 'consumers' => [ [ 'name' => null, 'connection' => self::DEFAULT_CONNECTION_NAME, 'callbacks' => [], 'qos' => [ 'prefetch_size' => 0, 'prefetch_count' => 0, 'global' => false, ], 'idle_timeout' => 0, 'idle_timeout_exit_code' => null, 'proceed_on_exception' => false, 'deserializer' => 'unserialize', ], ], 'logger' => [ 'log' => false, 'category' => 'application', 'print_console' => true, 'system_memory' => false, ], ];
交换
例如,为了声明一个交换,您应该提供其名称和类型。
使用arguments
参数的示例用法可以是创建一个死信交换。
队列
至于队列声明,所有参数都是可选的。即使您没有为您的队列提供名称,服务器也会为您生成一个唯一的名称。
有关选项、它们的默认值和重要细节的完整说明,可以在AMQP 0-9-1参考指南中找到。
请注意,并非所有这些选项都可以在“即席”更改,换句话说,在队列或交换创建后,否则,您将收到一个错误。
破坏性更改
从1.*版本开始,此扩展完全重新编写了内部代码,可以认为是全新的。然而,以下关键差异是可以区分的
- PHP版本7.0及以上要求
- 配置格式已更改
- 所有扩展组件都通过Yii2 Bootstraping自动加载
- 支持不同的连接类型
- 所有扩展组件都在DIC中注册为单例
- 添加了路由组件以控制代理中的模式
- 队列和交换的默认选项已更改
- 控制台命令合并到一个控制器类中,该类会自动添加,无需配置
- 添加了新的控制台命令来处理路由模式
- 支持所有数据类型作为消息负载
- 消费者以可预测的方式处理控制信号