rossriley/kafka-php

PHP 的 Kafka 消费者和生产者库

dev-master / 1.0.x-dev 2014-03-29 11:26 UTC

This package is not auto-updated.

Last update: 2024-09-14 15:48:17 UTC


README

这是现有 Kafka PHP 客户端(处于孵化阶段)的替代品,编写它的主要动机是认为将获取请求完全加载到内存中似乎不太重要,而是应该从套接字持续拉取。此外,PHP 的控制流和通信模式(每个请求都是 HTTP 服务器中的线程)不同,因此 API 不需要遵循 Scala/Java 对象图,可以更简单。

与现有 Kafka PHP 客户端有一些不同

- Streaming message individually rather than loading the whole response 
  into memory
- Offset implemented by hexdecimal tranformation to fully support Kafka 
  long offsets
- Gzip working correctly both ways, including the pre-compression message 
  header
- Messages produced in batch consumed correctly in compressed as well 
  as uncompressed state
- CRC32 check working
- Producers and Consumers are abstracted to allow for changes in Kafka 
  API without disrupting the client code
- Broker abstraction for different connection strategies
- OffsetRequest workaround for 64-bit unix timestamp
- Produce request only checks correct bytes were sent (ack not available)
- Producer compresses batches of consecutive messages with same 
  compression codec as a single message

控制台脚本

在 'script' 文件夹下有一组示例脚本。所有脚本之间的参数约定是共享的,但不同的脚本有不同的参数。

-c  Connector, set which Zookeeper server you want to connect
-b  Broker, the Kafka broker where we want to connect
-t  Topic, sets the topic where you want to produce
-m  Message, sets the message you want to produce
-l  List, will list the available topics
-o  Offset (optional), sets the starting point where
    we want to consume
-h  Help, it will display the help for the script

以下是可用的脚本

./scripts/simple/producer -b {broker} -t {topc}
./scripts/simple/producer -b hq-pau-d02:9092 -t test-topic

./scripts/simple/consumer -b {broker} -t {topic} [-o {offset}]
./scripts/simple/consumer -b hq-pau-d02:9092 -t test-topic

./scripts/producers/producer -c {connector} -t {topic} -m {message}
./scripts/producers/producer -c hq-pau-d02:2181 -t test-topic -m "Hello"

./scripts/producers/cached -c {connector} -t {topic} -m {message}
./scripts/producers/cached -c hq-pau-d02:2181 -t test-topic -m "Hello"

./scripts/producers/partitioned -c {connector} -t {topic} -m {message}
./scripts/producers/partitioned -c hq-pau-d02:2181 -t test-topic -m "Hello"

./scripts/producers/daemon -c {connector} -t {topic}
./scripts/producers/daemon -c hq-pau-d02:2181 -t test-topic

./scripts/consumers/consumer -c {connector} -t {topic} 
./scripts/consumers/consumer -c hq-pau-d02:2181 -t test-topic 

./scripts/consumers/daemon -c {connector} -t {topic}
./scripts/consumers/daemon -c hq-pau-d02:2181 -t test-topic

单元测试

测试是一组包含在主运行器中的原生 PHP assert() 调用

$> ./test

教程

这不是一个教程,但将说明如何创建简单的生产者和消费者,仅为了说明如何使用 kafka-php 库。

简单生产者

此代码将向给定的主题发送消息。

// require kafka-php library                            
require "kafka-php/src/Kafka/Kafka.php";                
                                                        
$connector = "hq-pau-d02:2181";                         
$topic     = "test-topic";                              
$message   = "Hello world!";                            
                                                        
$producer = \Kafka\ProducerConnector::Create($connector);
                                                        
// add the message                                      
$producer->addMessage($topic, $message);                
                                                        
// produce the actual messages into kafka               
$producer->produce();                          

简单消费者

这将说明如何创建消费者并消费单个消息。虽然不太有用,但将说明这一点。

// require kafka-php library                           
require "kafka-php/src/Kafka/Kafka.php";               
                                                       
// setting variables                                   
$connector = "hq-pau-d02:2181";                        
$topic     = "test-topic";                             
                                                       
// create the connector                                
$cc = \Kafka\ConsumerConnector::Create($connector);    
                                                       
// create the message stream, we point to the beginning
// of the topic offset                                 
$messageStream = $cc->createMessageStreams(            
    $topic,                                            
    65535,                                             
    \Kafka\Kafka::OFFSETS_EARLIEST                     
);                                                     
                                                       
// get the message                                     
$message = $messageStream[0]->nextMessage();           
                                                       
// output the message                                  
echo $message->payload() ."\n";                        

从特定主题消费所有消息

此消费者将执行类似操作,但将消费特定给定主题的所有消息,从开始(偏移量 = 0)。

// require kafka-php library                                
require "kafka-php/src/Kafka/Kafka.php";                    
                                                            
// setting variables                                        
$connector = "hq-pau-d02:2181";                             
$topic     = "test-topic";                                  
                                                            
// create the connector                                     
$cc = \Kafka\ConsumerConnector::Create($connector);         
                                                            
// create the message stream, we point to the beginning     
// of the topic offset                                      
$messageStreams = $cc->createMessageStreams(                
    $topic,                                                 
    65535,                                                  
    \Kafka\Kafka::OFFSETS_EARLIEST                          
);                                                          
                                                            
// infinite loop                                            
while (true) {                                              
    $fetchCount = 0;                                        
                                                            
    foreach ($messageStreams as $mid => $messageStream) {   
        while ($message = $messageStream->nextMessage()) {  
            $fetchCount++;                                  
            echo $message->payload() . "\n";                
        }                                                   
    }                                                       
                                                            
    if ($fetchCount == 0) {                                 
        echo " --- no more messages ---\n";                 
        die;                                                
    }                                                       
}                                                           

消费者守护进程

最后,一些更接近实际使用的库。一个将监听为新主题生成的新消息的消费者。与前一个消费者相比,这次我们将设置最高可能的偏移量,以忽略过去的消息,仅截获新的消息。

// require kafka-php library                                
require "kafka-php/src/Kafka/Kafka.php";                    
                                                            
// setting variables                                        
$connector = "hq-pau-d02:2181";                             
$topic     = "test-topic";                                  
                                                            
// create the connector                                     
$cc = \Kafka\ConsumerConnector::Create($connector);         
                                                            
// create the message stream, we point to the end           
// of the topic offset                                      
$messageStreams = $cc->createMessageStreams(                
    $topic,                                                 
    65535,                                                  
    \Kafka\Kafka::OFFSETS_LATEST                            
);                                                          
                                                            
while (true) {                                              
    $fetchCount = 0;                                        
                                                            
    foreach ($messageStreams as $mid => $messageStream) {   
        // keep getting messages, if we have more           
        while ($message = $messageStream->nextMessage()) {  
            $fetchCount++;                                  
            // just print topic and payload                 
            echo "{$message->payload()}\n";                 
        }                                                   
    }                                                       
                                                            
    if ($fetchCount == 0) {                                 
        // no more messages, so sleep and try again         
        sleep(1);                                           
    }                                                       
}                                                           

待办事项

以下是需要完成的任务列表

  • 进行中 - ConsumerConnector rebalance 流程(zk 监视器似乎有错误,所以可能在 nextMessage 上)

  • 待办 - 尝试实现新的版本化网络格式 0.8 和确认

  • 待办 - Snappy 压缩 - 无法在 64 位上编译 snappy.so :)

  • 待办 - 检测 64 位 PHP 并在底层将 Kafka Offset 十六进制替换为十进制

  • 待办 - 性能分析 & 优化

    • 通道 - 在 hasIncomingData 中实现缓冲区以加快流式传输,并在 read() 方法中从该缓冲区读取
    • 消费者通道 - 分析消费(解压缩 & 反序列化成本,刷新损坏的响应流)
    • 生产者通道 - 分析生产(压缩 & 序列化成本)

附录 - 在 Ubuntu 上为 Apache2 编译 php-zookeeper 源扩展

首先准备编译 C 源代码和 automake 工具,如果您还没有的话

sudo apt-get install build-essential checkinstall libcppunit-dev autoconf automake libtool ant

然后您需要从 C 源代码编译 libzookeeper

sudo git clone git://github.com/apache/zookeeper.git /usr/share/zookeeper
cd /usr/share/zookeeper/
sudo ant compile_jute
cd src/c
ACLOCAL="aclocal -I /usr/local/share/aclocal" sudo autoreconf -if
//OR//
ACLOCAL="aclocal -I /usr/share/aclocal" sudo autoreconf -if
sudo ./configure
sudo make
sudo make install

克隆 php-zookeeper 源代码,并使用 phpize 构建 PHP 扩展

apt-get install php5-dev
sudo git clone git://github.com/andreiz/php-zookeeper.git /usr/share/php-zookeeper
cd /usr/share/php-zookeeper
git checkout v0.2.1
phpize
sudo ./configure
sudo make
sudo make install
sudo echo "extension=zookeeper.so" > /etc/php5/cli/conf.d/zookeeper.ini
sudo echo "extension=zookeeper.so" > /etc/php5/apache2/conf.d/zookeeper.ini

在 cli 上测试是否工作,并重新启动 Apache!

echo '<?php $zoo = new Zookeeper("localhost:2181"); print_r($zoo->getChildren("/"));' | php
service apache2 restart