jenner/kafka-php

PHP的Kafka客户端

0.2.2 2015-09-08 02:06 UTC

This package is auto-updated.

Last update: 2024-09-14 11:55:53 UTC


README

Build Status

Kafka-php是一个集成了Zookeeper的PHP客户端,用于Apache Kafka。它仅支持Kafka 0.8的最新版本,该版本仍在开发中,因此该模块目前还不是生产就绪的。

Zookeeper集成执行以下任务

  • 在我们可以与Kafka服务器通信之前,从Zookeeper中加载代理元数据
  • 监视代理状态,如果代理发生更改,客户端将刷新存储在客户端中的代理和主题元数据

要求

安装

将lib目录添加到PHP include_path,并使用示例目录中的自动加载器(代码遵循PEAR/Zend的单文件单类约定)。

生产

\Kafka\Produce::getInstance($hostList, $timeout)

  • hostList:zookeeper主机列表,例如127.0.0.1:2181,192.168.1.114:2181
  • timeout:zookeeper超时

\Kafka\Produce::setRequireAck($ack = -1)

  • ack:此字段表示服务器在响应请求之前应接收多少个确认。

\Kafka\Produce::setMessages($topicName, $partitionId, $messages)

  • topicName:数据被发布的主题。
  • partitionId:数据被发布的分区。
  • messages:[数组]发布消息。

\Kafka\Produce::send()

将消息集发送到服务器。

示例

$produce = \Kafka\Produce::getInstance('localhost:2181', 3000);

$produce->setRequireAck(-1);
$produce->setMessages('test', 0, array('test1111111'));
$produce->setMessages('test6', 0, array('test1111111'));
$produce->setMessages('test6', 2, array('test1111111'));
$produce->setMessages('test6', 1, array('test111111111133'));
$result = $produce->send();
var_dump($result);

消费者

\Kafka\Consumer::getInstance($hostList, $timeout)

  • hostList:zookeeper主机列表,例如127.0.0.1:2181,192.168.1.114:2181
  • timeout:zookeeper超时

\Kafka\Consumer::setGroup($groupName)

  • groupName:指定消费者组。

\Kafka\Consumer::setPartition($topicName, $partitionId, $offset = 0)

  • topicName:数据被检索到的主题。
  • partitionId:数据被检索到的分区。
  • offset:设置检索偏移量。默认值为0

\Kafka\Consumer::fetch()

返回检索消息迭代器。 \Kafka\Protocol\Fetch\Topic

\Kafka\Protocol\Fetch\Topic

此对象是迭代器

key:主题名称 value\Kafka\Protocol\Fetch\Partition

\Kafka\Protocol\Fetch\Partition

此对象是迭代器。

key:分区ID value:messageSet对象

\Kafka\Protocol\Fetch\Partition::getErrCode()

返回分区检索错误码。

\Kafka\Protocol\Fetch\Partition::getHighOffset()

返回分区检索偏移量。

\Kafka\Protocol\Fetch\MessageSet

此对象是迭代器。 \Kafka\Protocol\Fetch\Message

示例

$consumer = \Kafka\Consumer::getInstance('localhost:2181');

$consumer->setGroup('testgroup');
$consumer->setPartition('test', 0);
$consumer->setPartition('test6', 2, 10);
$result = $consumer->fetch();
foreach ($result as $topicName => $topic) {
    foreach ($topic as $partId => $partition) {
        var_dump($partition->getHighOffset());
        foreach ($partition as $message) {
            var_dump((string)$message);
        }
    }
}

基本协议

生产API

生产API用于将消息集发送到服务器。为了提高效率,它允许在单个请求中发送针对许多主题分区的消息集。

\Kafka\Protocol\Encoder::produceRequest

参数结构

array(
    'required_ack' => 1,
        // This field indicates how many acknowledgements the servers should receive before responding to the request. default `0`
        // If it is 0 the server will not send any response
        // If it is -1 the server will block until the message is committed by all in sync replicas before sending a response 
        // For any number > 1 the server will block waiting for this number of acknowledgements to occur
    'timeout' => 1000,
        // This provides a maximum time in milliseconds the server can await the receipt of the number of acknowledgements in RequiredAcks.
    'data' => array(
        array(
            'topic_name' => 'testtopic',
                // The topic that data is being published to.[String]
            'partitions' => array(
                array(
                    'partition_id' => 0,
                        // The partition that data is being published to.
                    'messages' => array(
                        'message1', 
                        // [String] message
                    ),
                ),
            ),
        ),
    ),
);

返回

数组

示例

$data = array(
    'required_ack' => 1,
    'timeout' => 1000,
    'data' => array(
        array(
            'topic_name' => 'test',
            'partitions' => array(
                array(
                    'partition_id' => 0,
                    'messages' => array(
                        'message1',
                        'message2',
                    ),
                ),
            ),
        ),
    ),
);

$conn = new \Kafka\Socket('localhost', '9092');
$conn->connect();
$encoder = new \Kafka\Protocol\Encoder($conn);
$encoder->produceRequest($data);

$decoder = new \Kafka\Protocol\Decoder($conn);
$result = $decoder->produceResponse();
var_dump($result);

检索API

检索API用于检索一个或多个日志的一个片段,用于某些主题分区。逻辑上指定主题、分区和开始检索的偏移量,然后返回一个消息片段。

\Kafka\协议\编码器::fetchRequest

参数结构

array(
    'replica_id' => -1,
        // The replica id indicates the node id of the replica initiating this request. default `-1`
    'max_wait_time' => 100,
        // The max wait time is the maximum amount of time in milliseconds to block waiting if insufficient data is available at the time the request is issued. default 100 ms.
    'min_bytes' => 64 * 1024 // 64k
        // This is the minimum number of bytes of messages that must be available to give a response. default 64k.
    'data' => array(
        array(
            'topic_name' => 'testtopic',
                // The topic that data is being published to.[String]
            'partitions' => array(
                array(
                    'partition_id' => 0,
                        // The partition that data is being published to.
                    'offset' => 0,
                        // The offset to begin this fetch from. default 0
                    'max_bytes' => 100 * 1024 * 1024,
                        // This is the minimum number of bytes of messages that must be available to give a response. default 100Mb
                ),
            ),
        ),
    ),
);

返回

\Kafka\协议\Fetch\主题迭代器

示例

$data = array(
    'data' => array(
        array(
            'topic_name' => 'test',
            'partitions' => array(
                array(
                    'partition_id' => 0,
                    'offset' => 0, 
                ),
            ),
        ),
    ),
);

$conn = new \Kafka\Socket('localhost', '9092');
$conn->connect();
$encoder = new \Kafka\Protocol\Encoder($conn);
$encoder->fetchRequest($data);

$decoder = new \Kafka\Protocol\Decoder($conn);
$result = $decoder->fetchResponse();
var_dump($result);

偏移量API

本API描述了可用于一组主题分区的有效偏移量范围。与生产者和fetch API一样,请求必须指向当前是所涉及分区首领的代理。这可以通过元数据API确定。

\Kafka\协议\编码器::offsetRequest

####参数结构

array(
    'replica_id' => -1,
        // The replica id indicates the node id of the replica initiating this request. default `-1`
    'data' => array(
        array(
            'topic_name' => 'testtopic',
                // The topic that data is being published to.[String]
            'partitions' => array(
                array(
                    'partition_id' => 0,
                        // The partition that get offset .
                    'time' => -1,
                        // Used to ask for all messages before a certain time (ms). 
                        // Specify -1 to receive the latest offsets
                        // Specify -2 to receive the earliest available offset. 
                    'max_offset' => 1, 
                        // max return offset element. default 10000.
                ),
            ),
        ),
    ),
);

返回

数组。

示例

$data = array(
    'data' => array(
        array(
            'topic_name' => 'test',
            'partitions' => array(
                array(
                    'partition_id' => 0,
                    'max_offset' => 10, 
                    'time' => -1, 
                ),
            ),
        ),
    ),
);

$conn = new \Kafka\Socket('localhost', '9092');
$conn->connect();
$encoder = new \Kafka\Protocol\Encoder($conn);
$encoder->offsetRequest($data);

$decoder = new \Kafka\Protocol\Decoder($conn);
$result = $decoder->offsetResponse();
var_dump($result);

元数据API

返回的元数据位于分区级别,但为了方便和避免冗余,按主题分组。对于每个分区,元数据包含首领的信息以及所有副本的信息和当前同步的副本列表。

\Kafka\协议\编码器::metadataRequest

####参数结构

array(
   'topic_name1', // topic name
);

返回

数组。

示例

$data = array(
    'test'
);

$conn = new \Kafka\Socket('localhost', '9092');
$conn->connect();
$encoder = new \Kafka\Protocol\Encoder($conn);
$encoder->metadataRequest($data);

$decoder = new \Kafka\Protocol\Decoder($conn);
$result = $decoder->metadataResponse();
var_dump($result);

偏移量提交API

这些API允许集中管理偏移量。

\Kafka\协议\编码器::commitOffsetRequest

####参数结构

array(
    'group_id' => 'testgroup',
        // consumer group 
    'data' => array(
        array(
            'topic_name' => 'testtopic',
                // The topic that data is being published to.[String]
            'partitions' => array(
                array(
                    'partition_id' => 0,
                        // The partition that get offset .
                    'offset' => 0,
                        // The offset to begin this fetch from.
                    'time' => -1, 
                        // If the time stamp field is set to -1, then the broker sets the time stamp to the receive time before committing the offset.
                ),
            ),
        ),
    ),
);

返回

数组。

示例

$data = array(
    'group_id' => 'testgroup',
    'data' => array(
        array(
            'topic_name' => 'test',
            'partitions' => array(
                array(
                    'partition_id' => 0,
                    'offset' => 2, 
                ),
            ),
        ),
    ),
);


$conn = new \Kafka\Socket('localhost', '9092');
$conn->connect();
$encoder = new \Kafka\Protocol\Encoder($conn);
$encoder->commitOffsetRequest($data);

$decoder = new \Kafka\Protocol\Decoder($conn);
$result = $decoder->commitOffsetResponse();
var_dump($result);

偏移量检索API

这些API允许集中管理偏移量。

\Kafka\协议\编码器::fetchOffsetRequest

####参数结构

array(
    'group_id' => 'testgroup',
        // consumer group 
    'data' => array(
        array(
            'topic_name' => 'testtopic',
                // The topic that data is being published to.[String]
            'partitions' => array(
                array(
                    'partition_id' => 0,
                        // The partition that get offset .
                ),
            ),
        ),
    ),
);

返回

数组。

示例

$data = array(
    'group_id' => 'testgroup',
    'data' => array(
        array(
            'topic_name' => 'test',
            'partitions' => array(
                array(
                    'partition_id' => 0,
                ),
            ),
        ),
    ),
);


$conn = new \Kafka\Socket('localhost', '9092');
$conn->connect();
$encoder = new \Kafka\Protocol\Encoder($conn);
$encoder->fetchOffsetRequest($data);

$decoder = new \Kafka\Protocol\Decoder($conn);
$result = $decoder->fetchOffsetResponse();
var_dump($result);