tanur / yii2-rabbitmq
基于 mikemadisonweb/yii2-rabbitmq 分支开发。基于 php-amqplib 的包装器,可以将消息集成到您的 Yii2 应用程序中,通过 RabbitMQ 进行通信。灵感来源于 Symfony 2 的 RabbitMqBundle,是一个非常棒的包。
Requires
- php-amqplib/php-amqplib: ^3.1.0
- yiisoft/yii2: ^2.0
README
基于 php-amqplib 的包装器,可以将消息集成到您的 Yii2 应用程序中,通过 RabbitMQ 进行通信。灵感来源于 Symfony 2 的 RabbitMqBundle,是一个非常棒的包。
安装
安装此扩展的首选方法是通过 composer。
运行以下命令之一
php composer.phar require --prefer-dist tanur/yii2-rabbitmq
或
"tanur/yii2-rabbitmq": "^1.0"
将其添加到您的 composer.json 文件的 require 部分。
配置
此扩展简化了创建 RabbitMQ 生产者和消费者 以满足您特定需求的过程。以下是一个基本的配置示例
<?php // config/main.php return [ // ... 'components' => [ // ... 'rabbitmq' => [ 'class' => 'mikemadisonweb\rabbitmq\Configuration', 'connections' => [ 'default' => [ 'host' => '127.0.0.1', 'port' => '5672', 'user' => 'your_username', 'password' => 'your_password', 'vhost' => '/', 'heartbeat' => 0, ], ], 'producers' => [ 'import_data' => [ 'connection' => 'default', 'exchange_options' => [ 'name' => 'import_data', 'type' => 'direct', ], ], ], 'consumers' => [ 'import_data' => [ 'connection' => 'default', 'exchange_options' => [ 'name' => 'import_data', 'type' => 'direct', ], 'queue_options' => [ 'name' => 'import_data', ], 'callback' => \path\to\ImportDataConsumer::class, ], ], ], // ... ], // ... 'controllerMap' => [ 'rabbitmq-consumer' => \mikemadisonweb\rabbitmq\controllers\ConsumerController::className(), 'rabbitmq-producer' => \mikemadisonweb\rabbitmq\controllers\ProducerController::className(), ], // ... ];
将生产者视为消息的入口点,然后使用指定的连接细节将其传递给 RabbitMQ 队列。消费者是一个守护进程服务,它会从队列中取消息并处理它们。
多个消费者
如果您需要多个消费者,您可以在配置中列出相应的条目,但这需要为每个消费者分配一个单独的工人(守护进程)。如果您处理的队列较小且消息消费速度很快,您可能希望将它们组合到一个工人中。
以下是设置具有多个队列的消费者
<?php // config/main.php return [ // ... 'components' => [ // ... 'rabbitmq' => [ // ... 'multipleConsumers' => [ 'import_data' => [ 'connection' => 'default', 'exchange_options' => [ 'name' => 'exchange_name', 'type' => 'direct', ], 'queues' => [ 'import_data' => [ 'name' => 'import_data', 'callback' => \path\to\ImportDataConsumer::class, 'routing_keys' => ['import_data'], ], 'update_index' => [ 'name' => 'update_index', 'callback' => \path\to\UpdateIndexConsumer::class, 'routing_keys' => ['update_index'], ], ], ], ], ], // ... ], ];
请注意,所有队列都在同一个交换器下,设置正确的回调路由取决于您。
生命周期事件
还实现了几个生命周期事件:before_consume 和 after_consume。您可以使用它们在消息消费前后执行任何附加工作。例如,在消费者作为长时间运行进程而不会因超时而关闭数据库连接之前重新打开数据库连接。
<?php // config/main.php return [ // ... 'components' => [ // ... 'rabbitmq' => [ // ... 'on before_consume' => function ($event) { if (isset(\Yii::$app->db)) { if (\Yii::$app->db->getIsActive()) { \Yii::$app->db->close(); } \Yii::$app->db->open(); } }, ], // ... ], ];
日志记录器
最后但并非最不重要的是,日志记录器配置也是可选的。
<?php // config/main.php return [ // ... 'components' => [ // ... 'rabbitmq' => [ // ... 'logger' => [ 'enable' => true, 'category' => 'amqp', 'print_console' => true, ], ], // ... ], ];
默认启用日志记录器,但它将消息记录到主应用程序日志中。您可以设置自己的日志目标并指定相应的类别名称,如上面设置的 'amqp'。默认禁用 'print_console' 选项,它可以在您在控制台调试消费者时提供额外的信息。
控制台命令
扩展提供了一些控制台命令。
- rabbitmq-consumer/single - 运行消费者(每个队列一个实例)
- rabbitmq-consumer/multiple - 运行消费者(每个多个队列一个实例)
- rabbitmq-consumer/setup-fabric - 根据配置设置 RabbitMQ 交换器和队列
- rabbitmq-consumer/delete-all - 根据配置删除所有队列
- rabbitmq-producer/publish - 从 STDIN 发布消息到队列
其中最重要的是单消费者和多消费者命令,因为它们根据 consumer 和 multipleConsumer 配置分别启动消费者进程。
由于 PHP 守护进程特别是基于框架的可能会出现内存泄漏,因此合理地限制要消费和停止的消息数量可能是合理的。
yii rabbitmq-consumer/single import_data -m=10
在这种情况下,您可以使用进程控制系统,如 supervisor,来重新启动消费者进程,这样就可以持续运行您的工人。
用法
消费者工作进程将读取队列中的消息,然后执行回调并将消息传递给它。回调类应实现 ConsumerInterface。
<?php namespace components\rabbitmq; use mikemadisonweb\rabbitmq\components\ConsumerInterface; use PhpAmqpLib\Message\AMQPMessage; class ImportDataConsumer implements ConsumerInterface { /** * @param AMQPMessage $msg * @return bool */ public function execute(AMQPMessage $msg) { $data = unserialize($msg->body); if ($this->isValid($data)) { // Apply your business logic here return ConsumerInterface::MSG_ACK; } } }
您可以根据自己的需求格式化消息(JSON、XML等),唯一的要求是它应该是一个字符串。以下是一个如何发布消息的示例
\Yii::$app->rabbitmq->load(); $producer = \Yii::$container->get(sprintf('rabbit_mq.producer.%s', 'exchange_name')); $msg = serialize(['dataset_id' => $dataset->id, 'linked_datasets' => []]); $producer->publish($msg, 'import_data');
服务名称模板 'rabbit_mq.producer.%s' 也可用作常量 mikemadisonweb\rabbitmq\components\BaseRabbitMQ::PRODUCER_SERVICE_NAME。这是必需的,因为生产者类是懒加载的,这意味着它们只有在需要时才会被创建。同样,连接类也是按需创建的,这意味着不会在每次请求时都建立与RabbitMQ的连接。