apptension / yii2-rabbitmq
基于 mikemadisonweb/yii2-rabbitmq 分支。基于 php-amqplib 的包装器,用于将消息集成到您的 Yii2 应用程序中,通过 RabbitMQ 实现。受 RabbitMqBundle for Symfony 2 启发,非常棒的包。
Requires
- php-amqplib/php-amqplib: ^3.1.0
- yiisoft/yii2: ^2.0
README
基于 php-amqplib 的包装器,用于将消息集成到您的 Yii2 应用程序中,通过 RabbitMQ 实现。受 RabbitMqBundle for Symfony 2 启发,非常棒的包。
安装
通过 composer 安装此扩展是首选方法。
运行
php composer.phar require --prefer-dist tanur/yii2-rabbitmq
或者将以下内容添加到您的 composer.json 文件的 require 部分:
"tanur/yii2-rabbitmq": "^1.0"
配置
此扩展简化了创建 RabbitMQ 生产者和消费者,以满足您的特定需求。以下是一个基本配置示例
<?php // config/main.php return [ // ... 'components' => [ // ... 'rabbitmq' => [ 'class' => 'mikemadisonweb\rabbitmq\Configuration', 'connections' => [ 'default' => [ 'host' => '127.0.0.1', 'port' => '5672', 'user' => 'your_username', 'password' => 'your_password', 'vhost' => '/', 'heartbeat' => 0, ], ], 'producers' => [ 'import_data' => [ 'connection' => 'default', 'exchange_options' => [ 'name' => 'import_data', 'type' => 'direct', ], ], ], 'consumers' => [ 'import_data' => [ 'connection' => 'default', 'exchange_options' => [ 'name' => 'import_data', 'type' => 'direct', ], 'queue_options' => [ 'name' => 'import_data', ], 'callback' => \path\to\ImportDataConsumer::class, ], ], ], // ... ], // ... 'controllerMap' => [ 'rabbitmq-consumer' => \mikemadisonweb\rabbitmq\controllers\ConsumerController::className(), 'rabbitmq-producer' => \mikemadisonweb\rabbitmq\controllers\ProducerController::className(), ], // ... ];
将生产者视为消息的入口点,然后将其通过指定的连接细节传递到 RabbitMQ 队列。消费者是一个守护进程服务,它会从队列中取出消息并进行处理。
多个消费者
如果您需要多个消费者,可以在配置中列出相应的条目,但每个消费者都需要一个单独的工作进程(守护进程)。如果您处理的是小队列且消息消费速度很快,可以将它们组合到一个工作进程中。
以下是设置具有多个队列的消费者的方法
<?php // config/main.php return [ // ... 'components' => [ // ... 'rabbitmq' => [ // ... 'multipleConsumers' => [ 'import_data' => [ 'connection' => 'default', 'exchange_options' => [ 'name' => 'exchange_name', 'type' => 'direct', ], 'queues' => [ 'import_data' => [ 'name' => 'import_data', 'callback' => \path\to\ImportDataConsumer::class, 'routing_keys' => ['import_data'], ], 'update_index' => [ 'name' => 'update_index', 'callback' => \path\to\UpdateIndexConsumer::class, 'routing_keys' => ['update_index'], ], ], ], ], ], // ... ], ];
请注意,所有队列都在同一个交换器下,设置正确的回调路由取决于您。
生命周期事件
还实现了一些生命周期事件:before_consume 和 after_consume。您可以使用它们在消息被消费前后执行任何额外的操作。例如,为了防止消费者作为长时间运行的过程而超时关闭数据库连接,可以重新打开数据库连接。
<?php // config/main.php return [ // ... 'components' => [ // ... 'rabbitmq' => [ // ... 'on before_consume' => function ($event) { if (isset(\Yii::$app->db)) { if (\Yii::$app->db->getIsActive()) { \Yii::$app->db->close(); } \Yii::$app->db->open(); } }, ], // ... ], ];
日志记录器
最后但同样重要的是,日志记录器配置也是可选的
<?php // config/main.php return [ // ... 'components' => [ // ... 'rabbitmq' => [ // ... 'logger' => [ 'enable' => true, 'category' => 'amqp', 'print_console' => true, ], ], // ... ], ];
默认启用日志记录器,但消息将被记录到主应用程序日志中。您可以通过设置自己的日志目标并指定相应的类别名称来更改此设置,如上例中的 'amqp'。默认禁用 'print_console' 选项,它会在您在控制台中调试消费者时提供额外的信息。
控制台命令
扩展提供了一些控制台命令
- rabbitmq-consumer/single - 运行消费者(每个队列一个实例)
- rabbitmq-consumer/multiple - 运行消费者(每个多个队列一个实例)
- rabbitmq-consumer/setup-fabric - 根据配置设置 RabbitMQ 交换器和队列
- rabbitmq-consumer/delete-all - 根据配置删除所有队列
- rabbitmq-producer/publish - 从 STDIN 发布消息到队列
其中最重要的命令是 single 和 multiple consumer 命令,因为它们分别基于 consumer 和 multipleConsumer 配置启动消费者进程。
由于 PHP 守护进程特别是基于框架的守护进程可能存在内存泄漏的风险,因此限制要消费和停止的消息数量可能是合理的。
yii rabbitmq-consumer/single import_data -m=10
在这种情况下,您可以使用进程控制系统,如 supervisor,来重新启动消费者进程,并以此方式使工作进程持续运行。
用法
消费者工作进程将读取队列中的消息,执行回调并将消息传递给它。回调类应实现 ConsumerInterface 接口
<?php namespace components\rabbitmq; use mikemadisonweb\rabbitmq\components\ConsumerInterface; use PhpAmqpLib\Message\AMQPMessage; class ImportDataConsumer implements ConsumerInterface { /** * @param AMQPMessage $msg * @return bool */ public function execute(AMQPMessage $msg) { $data = unserialize($msg->body); if ($this->isValid($data)) { // Apply your business logic here return ConsumerInterface::MSG_ACK; } } }
您可以按自己的意愿格式化消息(JSON、XML等),唯一的要求是它应该是一个字符串。以下是一个如何发布消息的示例
\Yii::$app->rabbitmq->load(); $producer = \Yii::$container->get(sprintf('rabbit_mq.producer.%s', 'exchange_name')); $msg = serialize(['dataset_id' => $dataset->id, 'linked_datasets' => []]); $producer->publish($msg, 'import_data');
针对服务名称 'rabbit_mq.producer.%s' 的模板也作为常量 mikemadisonweb\rabbitmq\components\BaseRabbitMQ::PRODUCER_SERVICE_NAME 可用。这是必要的,因为生产者类是懒加载的,这意味着它们只有在需要时才会被创建。同样,连接类也是按需创建的,这意味着不会在每次请求时都建立与RabbitMQ的连接。