ekazak / yii2-rabbitmq
基于php-amqplib的包装器,用于通过RabbitMQ将消息集成到您的Yii2应用程序中。受Symfony 2的RabbitMqBundle启发,这是一个非常棒的包。
Requires
- php: ^7.0
- php-amqplib/php-amqplib: ^2.9
- yiisoft/yii2: ^2.0.13
Requires (Dev)
- php-coveralls/php-coveralls: ^1.0
- phpunit/phpunit: ^6.4
README
基于php-amqplib库的包装器,用于通过RabbitMQ将消息集成到您的Yii2应用程序中。受Symfony框架的RabbitMqBundle启发。
本文档适用于版本2.*,需要PHP版本≥7.0。对于PHP版本≥5.4的旧应用程序,请使用此扩展的先前版本。
安装
安装此扩展的首选方式是通过composer。
运行以下命令之一
php composer.phar require ekazak/yii2-rabbitmq
或
"ekazak/yii2-rabbitmq": "^2.2.0"
将以下内容添加到您的composer.json
文件的require部分。
配置
此扩展可以方便地创建满足您特定需求的RabbitMQ生产者和消费者。以下是一个基本配置示例
<?php return [ // should be in common.php 'components' => [ // ... 'rabbitmq' => [ 'class' => \ekazak\rabbitmq\Configuration::class, 'connections' => [ [ // You can pass these parameters as a single `url` option: https://rabbitmq.cn/uri-spec.html 'host' => 'YOUR_HOSTNAME', 'port' => '5672', 'user' => 'YOUR_USERNAME', 'password' => 'YOUR_PASSWORD', 'vhost' => '/', ] // When multiple connections is used you need to specify a `name` option for each one and define them in producer and consumer configuration blocks ], 'exchanges' => [ [ 'name' => 'YOUR_EXCHANGE_NAME', 'type' => 'direct' // Refer to Defaults section for all possible options ], ], 'queues' => [ [ 'name' => 'YOUR_QUEUE_NAME', // Queue can be configured here the way you want it: //'durable' => true, //'auto_delete' => false, ], [ 'name' => 'YOUR_ANOTHER_QUEUE_NAME', ], ], 'bindings' => [ [ 'queue' => 'YOUR_QUEUE_NAME', 'exchange' => 'YOUR_EXCHANGE_NAME', 'routing_keys' => ['YOUR_ROUTING_KEY'], ], ], 'producers' => [ [ 'name' => 'YOUR_PRODUCER_NAME', ], ], 'consumers' => [ [ 'name' => 'YOUR_CONSUMER_NAME', // Every consumer should define one or more callbacks for corresponding queues 'callbacks' => [ // queue name => callback class name 'YOUR_QUEUE_NAME' => \path\to\YourConsumer::class, ], ], ], ], ], ];
要使用此扩展,您应熟悉RabbitMQ的基本概念。如果您对自己的知识没有信心,我建议阅读这篇文章。
'callback'参数可以是类名或依赖注入容器中的服务名。从Yii版本2.0.11开始,您可以这样配置您的容器
<?php use yii\di\Instance; return [ // ... 'container' => [ 'definitions' => [], 'singletons' => [ 'rabbitmq.import-data.consumer' => [ [ 'class' => \path\to\YourConsumer::class, ], [ // If dependency is needed 'some-dependency' => Instance::of('dependency-service-name'), ], ], ], ], ];
如果您需要多个消费者,可以在配置中列出相应的条目,但每个消费者都需要一个单独的工作者(守护进程)。如果您处理的是小队列并且消息消耗得很快,这可能是绝对可行的。您可以将它们组合成一个工作者。所以只需在消费者配置中列出您的回调,一个工作者将在多个队列上执行您的业务逻辑。
请确保所有队列和交换机都在相应的绑定中定义,正确设置消息路由取决于您。
生命周期事件
还实现了一些生命周期事件:before_consume,after_consume,before_publish,after_publish。您可以使用它们在消息被消费/发布之前或之后执行任何附加工作。例如,确保当消费者是一个长期运行的过程时,Yii知道数据库连接由于超时而关闭。
<?php // config/main.php return [ // ... 'components' => [ // ... 'rabbitmq' => [ // ... 'on before_consume' => function ($event) { if (\Yii::$app->has('db') && \Yii::$app->db->isActive) { try { \Yii::$app->db->createCommand('SELECT 1')->query(); } catch (\yii\db\Exception $exception) { \Yii::$app->db->close(); } } }, ], // ... ], ];
日志记录器
最后但同样重要的是,是日志记录器配置,这也是可选的。
<?php // config/main.php return [ // ... 'components' => [ // ... 'rabbitmq' => [ // ... 'logger' => [ 'log' => true, 'category' => 'application', 'print_console' => false, 'system_memory' => false, ], ], // ... ], ];
默认情况下禁用日志记录器。启用时,它将日志消息记录到主应用程序日志或您指定的相应类别名称的自己的日志目标。'print_console'选项在控制台调试消费者时提供额外的信息。
示例
这里提供了使用RabbitMQ扩展的Yii2基本模板的简单设置。您可以自由地实验它并在隔离方式下调试现有的配置。
控制台命令
扩展提供了几个控制台命令
- rabbitmq/consume - 运行消费者
- rabbitmq/declare-all - 根据配置创建RabbitMQ交换机、队列和绑定
- rabbitmq/declare-exchange - 创建配置中列出的交换机
- rabbitmq/declare-queue - 创建配置中列出的队列
- rabbitmq/delete-all - 删除配置中定义的所有 RabbitMQ 交换机和队列
- rabbitmq/delete-exchange - 删除交换机
- rabbitmq/delete-queue - 删除队列
- rabbitmq/publish - 从 STDIN 发送消息到队列
- rabbitmq/purge-queue - 从队列中删除所有消息
启动消费者
yii rabbitmq/consume YOUR_CONSUMER_NAME
在这种情况下,您可以使用进程控制系统(如 Supervisor)来重启消费者进程,以此方式使您的工人持续运行。
消息限制
由于 PHP 守护进程特别是基于框架的可能会出现内存泄漏,因此限制要消费和停止的消息数量可能是合理的。
--memoryLimit, -l: (defaults to 0)
--messagesLimit, -m: (defaults to 0)
自动声明
默认情况下,扩展配置在自动声明模式下,这意味着每当发布消息时,都会检查交换机、队列和绑定是否存在,并在需要时创建它们。如果性能对您的应用程序非常重要,您应该在配置中禁用该功能,并使用控制台命令自行声明和删除路由模式。
用法
由于消费者工人将读取队列中的消息,执行回调方法并将消息传递给它。
消费
为了使类成为回调,它应实现 ConsumerInterface
<?php namespace components\rabbitmq; use mikemadisonweb\rabbitmq\components\ConsumerInterface; use PhpAmqpLib\Message\AMQPMessage; class YourConsumer implements ConsumerInterface { /** * @param AMQPMessage $msg * @return bool */ public function execute(AMQPMessage $msg) { $data = $msg->body; // Apply your business logic here return ConsumerInterface::MSG_ACK; } }
您可以发布任何数据类型(对象、整数、数组等),尽管 RabbitMQ 将将有效载荷作为字符串传输,但在这里消费者 $msg->body 中的数据将与发送时相同。
返回代码
关于返回代码,有一系列它们,以便您可以通过代理控制消息的后续处理
- ConsumerInterface::MSG_ACK - 确认消息(标记为已处理)并将其从队列中删除
- ConsumerInterface::MSG_REJECT - 拒绝并从队列中删除消息
- ConsumerInterface::MSG_REJECT_REQUEUE - 拒绝并将消息重新入队到 RabbitMQ
发布
以下是如何发布消息的示例
$producer = \Yii::$app->rabbitmq->getProducer('YOUR_PRODUCER_NAME'); $msg = serialize(['dataset_id' => 657, 'linked_datasets' => []]); $producer->publish($msg, 'YOUR_EXCHANGE_NAME', 'YOUR_ROUTING_KEY');
路由键作为第三个参数是可选的,对于扇出交换机可能是这种情况。
默认情况下,只有在发布消息时才会建立与代理的连接,如果不需要,它不会在每次 HTTP 请求上尝试连接。
选项
所有配置选项
$rabbitmq_defaults = [ 'auto_declare' => true, 'connections' => [ [ 'name' => self::DEFAULT_CONNECTION_NAME, 'type' => AMQPLazyConnection::class, 'url' => null, 'host' => null, 'port' => 5672, 'user' => 'guest', 'password' => 'guest', 'vhost' => '/', 'connection_timeout' => 3, 'read_write_timeout' => 3, 'ssl_context' => null, 'keepalive' => false, 'heartbeat' => 0, 'channel_rpc_timeout' => 0.0 ], ], 'exchanges' => [ [ 'name' => null, 'type' => null, 'passive' => false, 'durable' => true, 'auto_delete' => false, 'internal' => false, 'nowait' => false, 'arguments' => null, 'ticket' => null, ], ], 'queues' => [ [ 'name' => '', 'passive' => false, 'durable' => true, 'exclusive' => false, 'auto_delete' => false, 'nowait' => false, 'arguments' => null, 'ticket' => null, ], ], 'bindings' => [ [ 'exchange' => null, 'queue' => null, 'to_exchange' => null, 'routing_keys' => [], ], ], 'producers' => [ [ 'name' => null, 'connection' => self::DEFAULT_CONNECTION_NAME, 'safe' => true, 'content_type' => 'text/plain', 'delivery_mode' => 2, 'serializer' => 'serialize', ], ], 'consumers' => [ [ 'name' => null, 'connection' => self::DEFAULT_CONNECTION_NAME, 'callbacks' => [], 'qos' => [ 'prefetch_size' => 0, 'prefetch_count' => 0, 'global' => false, ], 'idle_timeout' => 0, 'idle_timeout_exit_code' => null, 'proceed_on_exception' => false, 'deserializer' => 'unserialize', ], ], 'logger' => [ 'log' => false, 'category' => 'application', 'print_console' => true, 'system_memory' => false, ], ];
交换
例如,为了声明一个交换机,您应该提供其名称和类型。
使用 arguments
参数的好用例可以是创建一个 死信交换机。
队列
至于队列声明,所有参数都是可选的。即使您不提供队列名称,服务器也会为您生成一个唯一的名称。
有关选项、默认值和有用细节的完整说明,请参阅 AMQP 0-9-1 参考指南。
请注意,并非所有这些选项都可以“实时”更改,换句话说,在队列或交换机已创建之后。否则,您将收到错误。
重大更改
从版本 1.* 开始,此扩展完全重写了内部结构,可以被认为是全新的。然而,以下关键区别可以区分出来
- PHP 版本 7.0 和以上要求
- 配置格式已更改
- 所有扩展组件都使用 Yii2 Bootstraping 自动加载
- 支持不同的连接类型
- 所有扩展组件都注册在 DIC 中作为单例
- 添加了路由组件以控制代理中的模式
- 队列和交换机的默认选项已更改
- 控制台命令已合并到一个控制器类中,该类会自动添加,无需配置
- 新增控制台命令,用于操作路由模式
- 支持所有数据类型作为消息负载
- 消费者以可预测的方式处理控制信号