proklung / bitrix.rabbitmq.module
Bitrix 中 RabbitMQ 支持
Requires
- php: >=7.1 || ^8.0
- ext-json: *
- composer/installers: ~1
- php-amqplib/php-amqplib: ^2.12.1 || ^3
- proklung/bitrix-containerable-boilerplate: ^1.1
- proklung/bitrix.module.boilerplate: ^1.0
- psr/log: ^1.1.3
Requires (Dev)
- icanhazstring/composer-unused: ^0.7.5
README
为什么?
原始模块专为 Bitrix 服务定位器设计,而它仅存在于比 20.5.400
更高的主模块版本中。在我的项目中没有这样的新版本,但功能有趣且有用。
差异
- 为了单独的交响乐容器,移除了 Bitrix 服务定位器
- 修复了一些错误
- 添加了单独的控制台命令运行器
- 适配了
RPC_Server
和RPC_Clients
的工作 - 适配了
Anon consumer
的工作 - 适配了
Batch consumer
的工作 - 内部容器被缓存
.settings.php
模块
return [ 'parameters' => [ 'value' => [ 'cache_path' => '/bitrix/cache/s1/proklung.rabbitmq', // Путь к закешированному контейнеру 'compile_container_envs' => ['dev', 'prod'], // Окружения при которых компилировать контейнер 'container.dumper.inline_factories' => false, // Дампить контейнер как одиночные файлы ], 'readonly' => false, ] ];
参数 cache_path
- 存储编译后容器的路径。如果没有设置,则默认为 /bitrix/cache/s1/proklung.rabbitmq
。
预计系统中已以某种方式安装了环境变量 DEBUG
到 $_ENV
数组中。如果没有,则默认假设环境为“调试模式”。
参数(数组)compile_container_envs
指定了需要缓存容器的环境。
目前的简单逻辑是:$_ENV["DEBUG"] === true
=> 环境为 dev
,否则 prod
。
带有一些修改的原版 readme.MD
关于项目
该模块通过使用 php-amqplib 库,允许在 PHP 应用程序中通过 RabbitMQ 进行消息交换。
该包实现了在 Thumper 库中定义的消息交换模板,这使得从控制器发布 RabbitMQ 中的消息变得非常简单
use Proklung\RabbitMq\Integration\DI\Services; $msg = ['user_id' => 1235, 'image_path' => '/path/to/new/pic.png']; Services::getInstance()->get('rabbitmq.upload_picture_producer')->publish(serialize($msg));
要获取队列 upload_pictures
中的 50 条消息,只需启动监听器即可
use Proklung\RabbitMq\Integration\DI\Services; $consumer = Services::getInstance()->get('rabbitmq.upload_picture_consumer'); $consumer->consume(50);
以下示例需要启动 RabbitMQ 服务器。
最低要求
php-7.1.3
或更高
安装
使用包管理器 composer 下载包
主项目的 composer.json
"extra": { "installer-paths": { "./bitrix/modules/{$name}/": ["type:bitrix-d7-module", "type:bitrix-module"], "./bitrix/components/{$name}/": ["type:bitrix-d7-component", "type:bitrix-component"], "./bitrix/templates/{$name}/": ["type:bitrix-d7-template", "type:bitrix-theme"] }, "repositories": [ { "type": "git", "url": "https://github.com/proklung/bitrix.rabbitmq.module" }, { "type": "git", "url": "https://github.com/proklung/bitrix.containerable.boilerplate" } ] }
$ composer require proklung/bitrix.rabbitmq.module
在网站的行政界面 bitrix/admin/partner_modules.php
中安装模块 proklung.rabbitmq
将以下代码添加到您的 init.php
use Bitrix\Main\Loader; use Proklung\RabbitMq\Integration\DI\Services; if (Loader::includeModule('proklung.rabbitmq')) { Services::boot(); }
使用
配置与父包相同。通过修改文件 bitrix/.settings.php
和 bitrix/.settings_extra.php
进行设置
return [ 'rabbitmq' => [ 'value' => [ 'connections' => [ 'default' => [ 'host' => '172.17.0.2', 'port' => 5672, 'user' => 'guest', 'password' => 'guest', 'vhost' => '/', 'lazy' => false, 'connection_timeout' => 3.0, 'read_write_timeout' => 3.0, 'keepalive' => false, 'heartbeat' => 0, 'use_socket' => true, ], ], 'producers' => [ 'upload_picture' => [ 'connection' => 'default', 'exchange_options' => [ 'name' => 'upload_picture', 'type' => 'direct', ], ], ], 'consumers' => [ 'upload_picture' => [ 'connection' => 'default', 'exchange_options' => [ 'name' => 'upload_picture', 'type' => 'direct', ], 'queue_options' => [ 'name' => 'upload_picture', ], // Автоматом регистрируется сервисом. Без обработки зависимостей. 'callback' => 'Proklung\RabbitMq\Consumers\UploadPictureConsumer', ], ], 'rpc_clients' => [ 'integer_store' => [ 'connection' => 'default', 'unserializer' => 'json_decode', 'lazy' => true, 'direct_reply_to' => false, 'expect_serialized_response' => false ], ], 'rpc_servers' => [ 'random_int' => [ 'connection' => 'default', // Автоматом регистрируется сервисом. Без обработки зависимостей. 'callback' => 'Proklung\RabbitMq\Examples\RandomIntServer', 'qos_options' => [ 'prefetch_size' => 0, 'prefetch_count' => 1, 'global' => false ], 'exchange_options' => [ 'name' => 'random_int', 'type' => 'topic', ], 'queue_options' => [ 'name' => 'random_int_queue', 'durable' => false, 'auto_delete' => true, ], 'serializer' => 'json_encode', ], ], ], 'readonly' => false, ], ];
消息处理器示例
// UploadPictureConsumer.php use Proklung\RabbitMq\RabbitMq\ConsumerInterface; use PhpAmqpLib\Message\AMQPMessage; class UploadPictureConsumer implements ConsumerInterface { public function execute(AMQPMessage $msg) { echo ' [x] Received ', $msg->body, "\n"; } }
RPC 消息服务器端示例(当客户端选项 expect_serialized_response
等于 false
时)
use PhpAmqpLib\Message\AMQPMessage; class RandomIntServer { public function execute(AMQPMessage $request) { $params = json_decode($request->getBody(), true); return ['request_id' => mt_rand(1, 123)]; } }
发送请求并从 RPC 获取响应
- 使用命令
php bin/rabbitmq bitrix-rabbitmq:rpc-server random_int
启动服务器 - 代码
use Proklung\RabbitMq\Integration\DI\Services; use Proklung\RabbitMq\RabbitMq\RpcClient; /** @var RpcClient $client */ $client = Services::boot()->get('rabbitmq.integer_store_rpc'); $client->addRequest(serialize(array('min' => 0, 'max' => 10)), 'random_int', 'request_id'); $replies = $client->getReplies(); // Обработать $replies
CLI 集成
提供了一些简化工作的命令
rabbitmq:consumer
执行消费者rabbitmq:delete
删除消费者的队列rabbitmq:purge
清空消费者的队列rabbitmq:setup-fabric
设置 RabbitMQ 架构rabbitmq:stdin-producer
执行从 STDIN 读取数据的生产者rabbitmq:rpc-server
启动 RPC 服务器
模块的 /install/bin
目录中有一个名为 rabbitmq
的文件。安装模块时,系统会尝试将其复制到位于 DOCUMENT_ROOT
上两级目录的 bin
目录中。如果该目录不存在,则不会进行任何操作。需要手动创建该目录并手动复制文件。
启动
php bin/rabbitmq bitrix-abbitmq:setup-fabric
所有可用命令
php bin/rabbitmq
适配 Bitrix
- 连接(Stream、Socket、Lazy、LazySocket)
- 连接工厂
- 绑定
- 生产者
- 消费者
- 部件持有者
- 回退生产者
- 多消费者
- 动态消费者
- 批量消费者
- 匿名消费者
- Rpc客户端
- Rpc服务器
- 日志通道
信用
本模块和文档基于 RabbitMqBundle。在那里您可以找到有关其使用的详细信息。