proklung/bitrix.rabbitmq.module

Bitrix 中 RabbitMQ 支持

安装: 73

依赖: 0

建议者: 0

安全: 0

星标: 8

关注者: 1

分支: 3

开放问题: 0

类型:bitrix-d7-module

1.4.3 2021-08-10 05:06 UTC

README

为什么?

原始模块专为 Bitrix 服务定位器设计,而它仅存在于比 20.5.400 更高的主模块版本中。在我的项目中没有这样的新版本,但功能有趣且有用。

差异

  • 为了单独的交响乐容器,移除了 Bitrix 服务定位器
  • 修复了一些错误
  • 添加了单独的控制台命令运行器
  • 适配了 RPC_ServerRPC_Clients 的工作
  • 适配了 Anon consumer 的工作
  • 适配了 Batch consumer 的工作
  • 内部容器被缓存

.settings.php 模块

return [
    'parameters' => [
        'value' => [
            
            'cache_path' => '/bitrix/cache/s1/proklung.rabbitmq', // Путь к закешированному контейнеру
            'compile_container_envs' => ['dev', 'prod'], // Окружения при которых компилировать контейнер
            'container.dumper.inline_factories' => false, // Дампить контейнер как одиночные файлы
        ],
        'readonly' => false,
    ]
];

参数 cache_path - 存储编译后容器的路径。如果没有设置,则默认为 /bitrix/cache/s1/proklung.rabbitmq

预计系统中已以某种方式安装了环境变量 DEBUG$_ENV 数组中。如果没有,则默认假设环境为“调试模式”。

参数(数组)compile_container_envs 指定了需要缓存容器的环境。

目前的简单逻辑是:$_ENV["DEBUG"] === true => 环境为 dev,否则 prod

带有一些修改的原版 readme.MD

关于项目

该模块通过使用 php-amqplib 库,允许在 PHP 应用程序中通过 RabbitMQ 进行消息交换。

该包实现了在 Thumper 库中定义的消息交换模板,这使得从控制器发布 RabbitMQ 中的消息变得非常简单

use Proklung\RabbitMq\Integration\DI\Services;

$msg = ['user_id' => 1235, 'image_path' => '/path/to/new/pic.png'];
Services::getInstance()->get('rabbitmq.upload_picture_producer')->publish(serialize($msg));

要获取队列 upload_pictures 中的 50 条消息,只需启动监听器即可

use Proklung\RabbitMq\Integration\DI\Services;

$consumer = Services::getInstance()->get('rabbitmq.upload_picture_consumer');
$consumer->consume(50);

以下示例需要启动 RabbitMQ 服务器。

最低要求

  • php-7.1.3 或更高

安装

使用包管理器 composer 下载包

主项目的 composer.json

  "extra": {
    "installer-paths": {
      "./bitrix/modules/{$name}/": ["type:bitrix-d7-module", "type:bitrix-module"],
      "./bitrix/components/{$name}/": ["type:bitrix-d7-component", "type:bitrix-component"],
      "./bitrix/templates/{$name}/": ["type:bitrix-d7-template", "type:bitrix-theme"]
    },
    "repositories": [
        {
            "type": "git",
            "url": "https://github.com/proklung/bitrix.rabbitmq.module"
        },
    {
      "type": "git",
      "url": "https://github.com/proklung/bitrix.containerable.boilerplate"
    }
    ]
  }
$ composer require proklung/bitrix.rabbitmq.module

在网站的行政界面 bitrix/admin/partner_modules.php 中安装模块 proklung.rabbitmq

将以下代码添加到您的 init.php

use Bitrix\Main\Loader;
use Proklung\RabbitMq\Integration\DI\Services;

if (Loader::includeModule('proklung.rabbitmq')) {
    Services::boot();
}

使用

配置与父包相同。通过修改文件 bitrix/.settings.phpbitrix/.settings_extra.php 进行设置

return [
    'rabbitmq' => [
        'value' => [
            'connections' => [
                'default' => [
                    'host' => '172.17.0.2',
                    'port' => 5672,
                    'user' => 'guest',
                    'password' => 'guest',
                    'vhost' => '/',
                    'lazy' => false,
                    'connection_timeout' => 3.0,
                    'read_write_timeout' => 3.0,
                    'keepalive' => false,
                    'heartbeat' => 0,
                    'use_socket' => true,
                ],
            ],
            'producers' => [
                'upload_picture' => [
                    'connection' => 'default',
                    'exchange_options' => [
                        'name' => 'upload_picture',
                        'type' => 'direct',
                    ],
                ],
            ],
            'consumers' => [
                'upload_picture' => [
                    'connection' => 'default',
                    'exchange_options' => [
                        'name' => 'upload_picture',
                        'type' => 'direct',
                    ],
                    'queue_options' => [
                        'name' => 'upload_picture',
                    ],
                    // Автоматом регистрируется сервисом. Без обработки зависимостей.
                    'callback' => 'Proklung\RabbitMq\Consumers\UploadPictureConsumer',
                ],
            ],
            'rpc_clients' => [
                'integer_store' => [
                    'connection' => 'default',
                    'unserializer' => 'json_decode',
                    'lazy' => true,
                    'direct_reply_to' => false,
                    'expect_serialized_response' => false
                ],
            ],
            'rpc_servers' => [
                'random_int' => [
                    'connection' => 'default',
                    // Автоматом регистрируется сервисом. Без обработки зависимостей.
                    'callback' => 'Proklung\RabbitMq\Examples\RandomIntServer',
                    'qos_options' => [
                        'prefetch_size' => 0,
                        'prefetch_count' => 1,
                        'global' => false
                    ],
                    'exchange_options' => [
                        'name' => 'random_int',
                        'type' => 'topic',
                    ],
                    'queue_options' => [
                        'name' => 'random_int_queue',
                        'durable' => false,
                        'auto_delete' => true,
                    ],
                    'serializer' => 'json_encode',
                ],
            ],
        ],
        'readonly' => false,
    ],
];

消息处理器示例

// UploadPictureConsumer.php

use Proklung\RabbitMq\RabbitMq\ConsumerInterface;
use PhpAmqpLib\Message\AMQPMessage;

class UploadPictureConsumer implements ConsumerInterface
{
    public function execute(AMQPMessage $msg)
    {
        echo ' [x] Received ', $msg->body, "\n";
    }
}

RPC 消息服务器端示例(当客户端选项 expect_serialized_response 等于 false 时)

use PhpAmqpLib\Message\AMQPMessage;

class RandomIntServer
{
    public function execute(AMQPMessage $request)
    {
        $params = json_decode($request->getBody(), true);
        
        return ['request_id' => mt_rand(1, 123)];
    }
}

发送请求并从 RPC 获取响应

  1. 使用命令 php bin/rabbitmq bitrix-rabbitmq:rpc-server random_int 启动服务器
  2. 代码
    use Proklung\RabbitMq\Integration\DI\Services;
    use Proklung\RabbitMq\RabbitMq\RpcClient;

    /** @var RpcClient $client */
    $client = Services::boot()->get('rabbitmq.integer_store_rpc');

    $client->addRequest(serialize(array('min' => 0, 'max' => 10)), 'random_int', 'request_id');
    $replies = $client->getReplies();
    // Обработать $replies

CLI 集成

提供了一些简化工作的命令

  • rabbitmq:consumer 执行消费者
  • rabbitmq:delete 删除消费者的队列
  • rabbitmq:purge 清空消费者的队列
  • rabbitmq:setup-fabric 设置 RabbitMQ 架构
  • rabbitmq:stdin-producer 执行从 STDIN 读取数据的生产者
  • rabbitmq:rpc-server 启动 RPC 服务器

模块的 /install/bin 目录中有一个名为 rabbitmq 的文件。安装模块时,系统会尝试将其复制到位于 DOCUMENT_ROOT 上两级目录的 bin 目录中。如果该目录不存在,则不会进行任何操作。需要手动创建该目录并手动复制文件。

启动

   php bin/rabbitmq bitrix-abbitmq:setup-fabric

所有可用命令

   php bin/rabbitmq

适配 Bitrix

  • 连接(Stream、Socket、Lazy、LazySocket)
  • 连接工厂
  • 绑定
  • 生产者
  • 消费者
  • 部件持有者
  • 回退生产者
  • 多消费者
  • 动态消费者
  • 批量消费者
  • 匿名消费者
  • Rpc客户端
  • Rpc服务器
  • 日志通道

信用

本模块和文档基于 RabbitMqBundle。在那里您可以找到有关其使用的详细信息。