mshauneu/php-rdkafka-bundle

整合php-rdkafka与Symfony2|3

1.0.5 2017-04-08 06:40 UTC

This package is not auto-updated.

Last update: 2024-09-23 13:01:19 UTC


README

关于

Symfony 包提供了对基于 rdkafka 绑定到 librdkafkaKafka 发布-订阅消息系统的连接

安装

在您的 composer.json 中添加依赖

{
    "require": {
        "mshauneu/php-rdkafka-bundle"
    }
}

在您的应用程序内核中启用该包

// app/AppKernel.php
public function registerBundles() {
    $bundles = array(
        // ...
        new Mshauneu\RdKafkaBundle\MshauneuRdKafkaBundle(),
    );
}

配置

简单的配置可能看起来像

mshauneu_rd_kafka:
  producers: 
    test_producer: 
      brokers: 127.0.0.1:9092
      topic: test_topic   
  consumers:
    test_consumer:
      brokers: 127.0.0.1:9092
      topic: test_topic   
      properties: 
        group_id: "test_group_id"
      topic_properties: 
        offset_store_method: broker           
        auto_offset_reset: smallest
        auto_commit_interval_ms: 100

配置属性在 CommunicatorConfiguration.php 中有文档说明

使用方法

向Kafka主题发布消息

从Symfony控制器

$payload = 'test_message';
$topicProducer = $container->get('mshauneu_rd_kafka')->getProducer("test_producer");
$topicProducer->produceStart();
$topicProducer->produce("message");
$topicProducer->produceStop();

通过CLI

./app/console kafka:producer --producer test_producer test_message 

从Kafka主题中消费消息

实现 ConsumerInterface

class MessageHandler implements ConsumerInterface {
	public function consume($topic, $partition, $offset, $key, $payload) {
		echo "Received payload: " . $payload . PHP_EOL;
	}
}

注册它

test_message_handler:
    class: MessageHandler

从Symfony控制器

$topicConsumer = $container->get('mshauneu_rd_kafka')->getConsumer("test_producer");
$topicConsumer->consumeStart(TopicCommunicator::OFFSET_STORED);
$topicConsumer->consume($consumerImpl);
$topicConsumer->consumeStop();

通过CLI

./app/console kafka:consumer --consumer test_consumer --handler test_message_handler 

许可证

本项目受MIT许可证的约束。有关完整的许可证文本,请参阅 LICENSE 文件。