adexos / m2-kafka-connector
支持Avro的开源Kafka模块
v1.0.5
2024-09-03 14:12 UTC
Requires
- php: ^8.1
- ext-rdkafka: *
- flix-tech/avro-serde-php: ^2.1
- koco/messenger-kafka: ^0.18.0
- phpdocumentor/reflection-docblock: ^5.3
- symfony/serializer: >=5.4
README
本模块是 koco/messenger-kafka 在Magento 2中的桥梁。
它基于Magento 2原生队列系统构建,并对设置进行了调整。
它还处理使用Avro Schemes的读取。
免责声明
目前只完成了读取部分。您无法向队列写入。
安装
composer require adexos/m2-kafka-connector
使用
声明您的配置
为此,您可以在您的 app/code/Namespace/Module/etc/adminhtml/system.xml
中包含kafka配置表
<?xml version="1.0"?> <config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="urn:magento:module:Magento_Config:etc/system_file.xsd"> <system> <section id="kafka" translate="label" type="text" sortOrder="300" showInDefault="1"> <group id="warehouse" translate="" type="text" sortOrder="20" showInDefault="1"> <label>Warehouse</label> <include path="Adexos_KafkaConnector::includes/kafka_conf_included.xml"/> </group> </section> </system> </config>
请注意您设置的 group id
,它将用于将队列运行器连接到Kafka代理
您可以在以下位置找到配置:商店 -> 配置 -> 服务 -> Kafka
添加队列系统
通信
app/code/Namespace/Module/etc/communication.xml
<?xml version="1.0"?> <config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="urn:magento:framework:Communication/etc/communication.xsd"> <topic name="warehouse.update.stock" request="Namespace\Module\Model\StockMessage"/> </config>
request
字段将用作您收到的消息的模型。
由于Magento 2的要求,此类 必须 使用 PhpDoc 完全类型化
请注意,
topic name
不一定需要与您要查找的Kafka队列同名。由于大多数Kafka队列都在同一代理中,但名称根据环境不同而不同,我们无法像对其他连接类型(如db
队列)那样在.xml中定义它们相反,真实的Kafka主题名称必须定义在您之前设置的系统中。
队列消费者
app/code/Namespace/Module/etc/queue_consumer.xml
<?xml version="1.0"?> <config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="urn:magento:framework-message-queue:etc/consumer.xsd"> <consumer name="warehouse" queue="warehouse.stock.update" connection="kafka.warehouse" handler="Namespace\Module\Handler\StockMessageHandler::handle"/> </config>
queue
:根据Magento文档,它必须与communication.xml
文件中定义的主题名称相同connection
:请注意,Kafka连接 必须 以kafka.
开头,例如:kafka.warehouse
。这样做是为了检测 所有 Kafka连接类型并在配置中检索它们
如果连接是
kafka.warehouse
,则系统.xml文件中定义的group id必须是warehouse
。这允许我们通过core_config_data
表自动映射
handler
:将接收您的消息并对其进行处理的处理程序。参数类型必须与communication.xml
中request
字段中定义的类型相同