purplefan/php-rdkafka-bundle

集成了 php-rdkafka 与 Symfony2|3

1.0.1 2016-10-04 08:40 UTC

This package is not auto-updated.

Last update: 2024-09-18 20:33:32 UTC


README

关于

Symfony 扩展包提供与基于 rdkafka 绑定到 librdkafkaKafka 发布-订阅消息系统的连接

安装

在 composer.json 中添加依赖

{
    "require": {
        "mshauneu/php-rdkafka-bundle"
    }
}

在应用程序内核中启用扩展包

// app/AppKernel.php
public function registerBundles() {
    $bundles = array(
        // ...
        new Mshauneu\RdKafkaBundle\MshauneuRdKafkaBundle(),
    );
}

配置

简单的配置可能看起来像

mshauneu_rd_kafka:
  producers: 
    test_producer: 
      brokers: 127.0.0.1:9092
      topic: test_topic   
  consumers:
    test_consumer:
      brokers: 127.0.0.1:9092
      topic: test_topic   
      properties: 
        group_id: "test_group_id"
      topic_properties: 
        offset_store_method: broker           
        auto_offset_reset: smallest
        auto_commit_interval_ms: 100

配置属性在 CommunicatorConfiguration.php 中有文档说明

使用

向 Kafka 主题发布消息

从 Symfony 控制器

$payload = 'test_message';
$topicProducer = $container->get('mshauneu_rd_kafka')->getProducer("test_producer");
$topicProducer->produceStart();
$topicProducer->produce("message");
$topicProducer->produceStop();

通过 CLI

./app/console kafka:producer --producer test_producer test_message 

从 Kafka 主题消费消息

实现 ConsumerInterface

class MessageHandler implements ConsumerInterface {
	public function consume($topic, $partition, $offset, $key, $payload) {
		echo "Received payload: " . $payload . PHP_EOL;
	}
}

注册它

test_message_handler:
    class: MessageHandler

从 Symfony 控制器

$topicConsumer = $container->get('mshauneu_rd_kafka')->getConsumer("test_producer");
$topicConsumer->consumeStart(TopicCommunicator::OFFSET_STORED);
$topicConsumer->consume($consumerImpl);
$topicConsumer->consumeStop();

通过 CLI

./app/console kafka:consumer --consumer test_consumer --handler test_message_handler 

许可

此项目受 MIT 许可证许可。有关完整的许可证文本,请参阅 LICENSE 文件。