rossriley / kafka-php
PHP 的 Kafka 消费者和生产者库
Requires
- php: >=5.4
This package is not auto-updated.
Last update: 2024-09-14 15:48:17 UTC
README
这是现有 Kafka PHP 客户端(处于孵化阶段)的替代品,编写它的主要动机是认为将获取请求完全加载到内存中似乎不太重要,而是应该从套接字持续拉取。此外,PHP 的控制流和通信模式(每个请求都是 HTTP 服务器中的线程)不同,因此 API 不需要遵循 Scala/Java 对象图,可以更简单。
与现有 Kafka PHP 客户端有一些不同
- Streaming message individually rather than loading the whole response
into memory
- Offset implemented by hexdecimal tranformation to fully support Kafka
long offsets
- Gzip working correctly both ways, including the pre-compression message
header
- Messages produced in batch consumed correctly in compressed as well
as uncompressed state
- CRC32 check working
- Producers and Consumers are abstracted to allow for changes in Kafka
API without disrupting the client code
- Broker abstraction for different connection strategies
- OffsetRequest workaround for 64-bit unix timestamp
- Produce request only checks correct bytes were sent (ack not available)
- Producer compresses batches of consecutive messages with same
compression codec as a single message
控制台脚本
在 'script' 文件夹下有一组示例脚本。所有脚本之间的参数约定是共享的,但不同的脚本有不同的参数。
-c Connector, set which Zookeeper server you want to connect
-b Broker, the Kafka broker where we want to connect
-t Topic, sets the topic where you want to produce
-m Message, sets the message you want to produce
-l List, will list the available topics
-o Offset (optional), sets the starting point where
we want to consume
-h Help, it will display the help for the script
以下是可用的脚本
./scripts/simple/producer -b {broker} -t {topc}
./scripts/simple/producer -b hq-pau-d02:9092 -t test-topic
./scripts/simple/consumer -b {broker} -t {topic} [-o {offset}]
./scripts/simple/consumer -b hq-pau-d02:9092 -t test-topic
./scripts/producers/producer -c {connector} -t {topic} -m {message}
./scripts/producers/producer -c hq-pau-d02:2181 -t test-topic -m "Hello"
./scripts/producers/cached -c {connector} -t {topic} -m {message}
./scripts/producers/cached -c hq-pau-d02:2181 -t test-topic -m "Hello"
./scripts/producers/partitioned -c {connector} -t {topic} -m {message}
./scripts/producers/partitioned -c hq-pau-d02:2181 -t test-topic -m "Hello"
./scripts/producers/daemon -c {connector} -t {topic}
./scripts/producers/daemon -c hq-pau-d02:2181 -t test-topic
./scripts/consumers/consumer -c {connector} -t {topic}
./scripts/consumers/consumer -c hq-pau-d02:2181 -t test-topic
./scripts/consumers/daemon -c {connector} -t {topic}
./scripts/consumers/daemon -c hq-pau-d02:2181 -t test-topic
单元测试
测试是一组包含在主运行器中的原生 PHP assert() 调用
$> ./test
教程
这不是一个教程,但将说明如何创建简单的生产者和消费者,仅为了说明如何使用 kafka-php 库。
简单生产者
此代码将向给定的主题发送消息。
// require kafka-php library
require "kafka-php/src/Kafka/Kafka.php";
$connector = "hq-pau-d02:2181";
$topic = "test-topic";
$message = "Hello world!";
$producer = \Kafka\ProducerConnector::Create($connector);
// add the message
$producer->addMessage($topic, $message);
// produce the actual messages into kafka
$producer->produce();
简单消费者
这将说明如何创建消费者并消费单个消息。虽然不太有用,但将说明这一点。
// require kafka-php library
require "kafka-php/src/Kafka/Kafka.php";
// setting variables
$connector = "hq-pau-d02:2181";
$topic = "test-topic";
// create the connector
$cc = \Kafka\ConsumerConnector::Create($connector);
// create the message stream, we point to the beginning
// of the topic offset
$messageStream = $cc->createMessageStreams(
$topic,
65535,
\Kafka\Kafka::OFFSETS_EARLIEST
);
// get the message
$message = $messageStream[0]->nextMessage();
// output the message
echo $message->payload() ."\n";
从特定主题消费所有消息
此消费者将执行类似操作,但将消费特定给定主题的所有消息,从开始(偏移量 = 0)。
// require kafka-php library
require "kafka-php/src/Kafka/Kafka.php";
// setting variables
$connector = "hq-pau-d02:2181";
$topic = "test-topic";
// create the connector
$cc = \Kafka\ConsumerConnector::Create($connector);
// create the message stream, we point to the beginning
// of the topic offset
$messageStreams = $cc->createMessageStreams(
$topic,
65535,
\Kafka\Kafka::OFFSETS_EARLIEST
);
// infinite loop
while (true) {
$fetchCount = 0;
foreach ($messageStreams as $mid => $messageStream) {
while ($message = $messageStream->nextMessage()) {
$fetchCount++;
echo $message->payload() . "\n";
}
}
if ($fetchCount == 0) {
echo " --- no more messages ---\n";
die;
}
}
消费者守护进程
最后,一些更接近实际使用的库。一个将监听为新主题生成的新消息的消费者。与前一个消费者相比,这次我们将设置最高可能的偏移量,以忽略过去的消息,仅截获新的消息。
// require kafka-php library
require "kafka-php/src/Kafka/Kafka.php";
// setting variables
$connector = "hq-pau-d02:2181";
$topic = "test-topic";
// create the connector
$cc = \Kafka\ConsumerConnector::Create($connector);
// create the message stream, we point to the end
// of the topic offset
$messageStreams = $cc->createMessageStreams(
$topic,
65535,
\Kafka\Kafka::OFFSETS_LATEST
);
while (true) {
$fetchCount = 0;
foreach ($messageStreams as $mid => $messageStream) {
// keep getting messages, if we have more
while ($message = $messageStream->nextMessage()) {
$fetchCount++;
// just print topic and payload
echo "{$message->payload()}\n";
}
}
if ($fetchCount == 0) {
// no more messages, so sleep and try again
sleep(1);
}
}
待办事项
以下是需要完成的任务列表
-
进行中 - ConsumerConnector rebalance 流程(zk 监视器似乎有错误,所以可能在 nextMessage 上)
-
待办 - 尝试实现新的版本化网络格式 0.8 和确认
-
待办 - Snappy 压缩 - 无法在 64 位上编译 snappy.so :)
-
待办 - 检测 64 位 PHP 并在底层将 Kafka Offset 十六进制替换为十进制
-
待办 - 性能分析 & 优化
- 通道 - 在 hasIncomingData 中实现缓冲区以加快流式传输,并在 read() 方法中从该缓冲区读取
- 消费者通道 - 分析消费(解压缩 & 反序列化成本,刷新损坏的响应流)
- 生产者通道 - 分析生产(压缩 & 序列化成本)
附录 - 在 Ubuntu 上为 Apache2 编译 php-zookeeper 源扩展
首先准备编译 C 源代码和 automake 工具,如果您还没有的话
sudo apt-get install build-essential checkinstall libcppunit-dev autoconf automake libtool ant
然后您需要从 C 源代码编译 libzookeeper
sudo git clone git://github.com/apache/zookeeper.git /usr/share/zookeeper
cd /usr/share/zookeeper/
sudo ant compile_jute
cd src/c
ACLOCAL="aclocal -I /usr/local/share/aclocal" sudo autoreconf -if
//OR//
ACLOCAL="aclocal -I /usr/share/aclocal" sudo autoreconf -if
sudo ./configure
sudo make
sudo make install
克隆 php-zookeeper 源代码,并使用 phpize 构建 PHP 扩展
apt-get install php5-dev
sudo git clone git://github.com/andreiz/php-zookeeper.git /usr/share/php-zookeeper
cd /usr/share/php-zookeeper
git checkout v0.2.1
phpize
sudo ./configure
sudo make
sudo make install
sudo echo "extension=zookeeper.so" > /etc/php5/cli/conf.d/zookeeper.ini
sudo echo "extension=zookeeper.so" > /etc/php5/apache2/conf.d/zookeeper.ini
在 cli 上测试是否工作,并重新启动 Apache!
echo '<?php $zoo = new Zookeeper("localhost:2181"); print_r($zoo->getChildren("/"));' | php
service apache2 restart