efisuby/php-kafka

PHP 的 Kafka 客户端

v0.1.8 2016-01-19 14:45 UTC

This package is not auto-updated.

Last update: 2024-09-12 00:04:05 UTC


README

Build Status

Kafka-php 是一个 Apache Kafka 的 PHP 客户端,集成了 Zookeeper。它仅支持 Kafka 0.8 的最新版本,该版本仍在开发中,因此该模块目前尚未准备好用于生产。

Zookeeper 集成执行以下任务:

  • 在我们可以与 Kafka 服务器通信之前,从 Zookeeper 加载代理元数据
  • 监视代理状态,如果代理发生变化,客户端将刷新存储在客户端中的代理和主题元数据

要求

安装

将 lib 目录添加到 PHP include_path,并使用 examples 目录中类似的自动加载器(代码遵循 PEAR/Zend 单文件单类约定)。

Composer 安装

如果您使用 Composer 来管理项目的依赖关系,只需将 nmred/kafka-php 添加到项目中的 composer.json 文件即可。以下是一个 composer.json 文件的示例:

{
	"require": {
		"nmred/kafka-php": "0.1.*"
	}
}

生产

\Kafka\Produce::getInstance($hostList, $timeout)

  • hostList : Zookeeper 主机列表,例如 127.0.0.1:2181,192.168.1.114:2181
  • timeout : Zookeeper 超时

\Kafka\Produce::setRequireAck($ack = -1)

  • ack: 该字段表示服务器在响应请求之前应该收到多少个确认。

\Kafka\Produce::setMessages($topicName, $partitionId, $messages)

  • topicName : 正在发布数据的主题。
  • partitionId : 正在发布数据的分区。
  • messages : [数组] 发布消息。

\Kafka\Produce::send()

将消息集发送到服务器。

示例

$produce = \Kafka\Produce::getInstance('localhost:2181', 3000);

$produce->setRequireAck(-1);
$produce->setMessages('test', 0, array('test1111111'));
$produce->setMessages('test6', 0, array('test1111111'));
$produce->setMessages('test6', 2, array('test1111111'));
$produce->setMessages('test6', 1, array('test111111111133'));
$result = $produce->send();
var_dump($result);

消费者

\Kafka\Consumer::getInstance($hostList, $timeout)

  • hostList : Zookeeper 主机列表,例如 127.0.0.1:2181,192.168.1.114:2181
  • timeout : Zookeeper 超时

\Kafka\Consumer::setGroup($groupName)

  • groupName : 指定消费者组。

\Kafka\Consumer::setPartition($topicName, $partitionId, $offset = 0)

  • topicName : 正在拉取数据的主题。
  • partitionId : 正在拉取数据的分区。
  • offset: 设置拉取偏移量。默认 0

\Kafka\Consumer::fetch()

返回拉取消息迭代器。 \Kafka\Protocol\Fetch\Topic

\Kafka\Protocol\Fetch\Topic

这是一个迭代器对象

key : 主题名称 value: \Kafka\Protocol\Fetch\Partition

\Kafka\Protocol\Fetch\Partition

这是一个迭代器对象。

key: 分区 ID value: messageSet 对象

\Kafka\Protocol\Fetch\Partition::getErrCode()

返回分区拉取错误码。

\Kafka\Protocol\Fetch\Partition::getHighOffset()

返回分区拉取偏移量。

\Kafka\Protocol\Fetch\MessageSet

这是一个迭代器对象。 \Kafka\Protocol\Fetch\Message

示例

$consumer = \Kafka\Consumer::getInstance('localhost:2181');

$consumer->setGroup('testgroup');
$consumer->setPartition('test', 0);
$consumer->setPartition('test6', 2, 10);
$result = $consumer->fetch();
foreach ($result as $topicName => $topic) {
    foreach ($topic as $partId => $partition) {
        var_dump($partition->getHighOffset());
        foreach ($partition as $message) {
            var_dump((string)$message);
        }
    }
}

基本协议

生产 API

生产 API 用于将消息集发送到服务器。为了效率,它允许在单个请求中发送针对许多主题分区的消息集。

\Kafka\Protocol\Encoder::produceRequest

参数结构

array(
    'required_ack' => 1,
        // This field indicates how many acknowledgements the servers should receive before responding to the request. default `0`
        // If it is 0 the server will not send any response
        // If it is -1 the server will block until the message is committed by all in sync replicas before sending a response 
        // For any number > 1 the server will block waiting for this number of acknowledgements to occur
    'timeout' => 1000,
        // This provides a maximum time in milliseconds the server can await the receipt of the number of acknowledgements in RequiredAcks.
    'data' => array(
        array(
            'topic_name' => 'testtopic',
                // The topic that data is being published to.[String]
            'partitions' => array(
                array(
                    'partition_id' => 0,
                        // The partition that data is being published to.
                    'messages' => array(
                        'message1', 
                        // [String] message
                    ),
                ),
            ),
        ),
    ),
);

返回

数组

示例

$data = array(
    'required_ack' => 1,
    'timeout' => 1000,
    'data' => array(
        array(
            'topic_name' => 'test',
            'partitions' => array(
                array(
                    'partition_id' => 0,
                    'messages' => array(
                        'message1',
                        'message2',
                    ),
                ),
            ),
        ),
    ),
);

$conn = new \Kafka\Socket('localhost', '9092');
$conn->connect();
$encoder = new \Kafka\Protocol\Encoder($conn);
$encoder->produceRequest($data);

$decoder = new \Kafka\Protocol\Decoder($conn);
$result = $decoder->produceResponse();
var_dump($result);

Fetch API

Fetch API 用于获取某些主题分区的日志片段。逻辑上,用户指定主题、分区和开始偏移量以开始获取,并返回消息片段

\Kafka\Protocol\Encoder::fetchRequest

参数结构

array(
    'replica_id' => -1,
        // The replica id indicates the node id of the replica initiating this request. default `-1`
    'max_wait_time' => 100,
        // The max wait time is the maximum amount of time in milliseconds to block waiting if insufficient data is available at the time the request is issued. default 100 ms.
    'min_bytes' => 64 * 1024 // 64k
        // This is the minimum number of bytes of messages that must be available to give a response. default 64k.
    'data' => array(
        array(
            'topic_name' => 'testtopic',
                // The topic that data is being published to.[String]
            'partitions' => array(
                array(
                    'partition_id' => 0,
                        // The partition that data is being published to.
                    'offset' => 0,
                        // The offset to begin this fetch from. default 0
                    'max_bytes' => 100 * 1024 * 1024,
                        // This is the minimum number of bytes of messages that must be available to give a response. default 100Mb
                ),
            ),
        ),
    ),
);

返回

\Kafka\Protocol\Fetch\Topic 迭代器

示例

$data = array(
    'data' => array(
        array(
            'topic_name' => 'test',
            'partitions' => array(
                array(
                    'partition_id' => 0,
                    'offset' => 0, 
                ),
            ),
        ),
    ),
);

$conn = new \Kafka\Socket('localhost', '9092');
$conn->connect();
$encoder = new \Kafka\Protocol\Encoder($conn);
$encoder->fetchRequest($data);

$decoder = new \Kafka\Protocol\Decoder($conn);
$result = $decoder->fetchResponse();
var_dump($result);

偏移量 API

此 API 描述了一组主题分区可用的有效偏移量范围。与 produce 和 fetch API 请求一样,必须将请求发送到当前是问题分区领导者的代理。这可以使用元数据 API 来确定。

\Kafka\Protocol\Encoder::offsetRequest

####结构体参数

array(
    'replica_id' => -1,
        // The replica id indicates the node id of the replica initiating this request. default `-1`
    'data' => array(
        array(
            'topic_name' => 'testtopic',
                // The topic that data is being published to.[String]
            'partitions' => array(
                array(
                    'partition_id' => 0,
                        // The partition that get offset .
                    'time' => -1,
                        // Used to ask for all messages before a certain time (ms). 
                        // Specify -1 to receive the latest offsets
                        // Specify -2 to receive the earliest available offset. 
                    'max_offset' => 1, 
                        // max return offset element. default 10000.
                ),
            ),
        ),
    ),
);

返回

数组。

示例

$data = array(
    'data' => array(
        array(
            'topic_name' => 'test',
            'partitions' => array(
                array(
                    'partition_id' => 0,
                    'max_offset' => 10, 
                    'time' => -1, 
                ),
            ),
        ),
    ),
);

$conn = new \Kafka\Socket('localhost', '9092');
$conn->connect();
$encoder = new \Kafka\Protocol\Encoder($conn);
$encoder->offsetRequest($data);

$decoder = new \Kafka\Protocol\Decoder($conn);
$result = $decoder->offsetResponse();
var_dump($result);

元数据 API

返回的元数据是在分区级别,但为了方便和避免冗余,按主题分组。对于每个分区,元数据包含领导者的信息以及所有副本的信息和当前同步的副本列表。

\Kafka\Protocol\Encoder::metadataRequest

####结构体参数

array(
   'topic_name1', // topic name
);

返回

数组。

示例

$data = array(
    'test'
);

$conn = new \Kafka\Socket('localhost', '9092');
$conn->connect();
$encoder = new \Kafka\Protocol\Encoder($conn);
$encoder->metadataRequest($data);

$decoder = new \Kafka\Protocol\Decoder($conn);
$result = $decoder->metadataResponse();
var_dump($result);

偏移量提交 API

这些 API 允许集中管理偏移量。

\Kafka\Protocol\Encoder::commitOffsetRequest

####结构体参数

array(
    'group_id' => 'testgroup',
        // consumer group 
    'data' => array(
        array(
            'topic_name' => 'testtopic',
                // The topic that data is being published to.[String]
            'partitions' => array(
                array(
                    'partition_id' => 0,
                        // The partition that get offset .
                    'offset' => 0,
                        // The offset to begin this fetch from.
                    'time' => -1, 
                        // If the time stamp field is set to -1, then the broker sets the time stamp to the receive time before committing the offset.
                ),
            ),
        ),
    ),
);

返回

数组。

示例

$data = array(
    'group_id' => 'testgroup',
    'data' => array(
        array(
            'topic_name' => 'test',
            'partitions' => array(
                array(
                    'partition_id' => 0,
                    'offset' => 2, 
                ),
            ),
        ),
    ),
);


$conn = new \Kafka\Socket('localhost', '9092');
$conn->connect();
$encoder = new \Kafka\Protocol\Encoder($conn);
$encoder->commitOffsetRequest($data);

$decoder = new \Kafka\Protocol\Decoder($conn);
$result = $decoder->commitOffsetResponse();
var_dump($result);

偏移量获取 API

这些 API 允许集中管理偏移量。

\Kafka\Protocol\Encoder::fetchOffsetRequest

####结构体参数

array(
    'group_id' => 'testgroup',
        // consumer group 
    'data' => array(
        array(
            'topic_name' => 'testtopic',
                // The topic that data is being published to.[String]
            'partitions' => array(
                array(
                    'partition_id' => 0,
                        // The partition that get offset .
                ),
            ),
        ),
    ),
);

返回

数组。

示例

$data = array(
    'group_id' => 'testgroup',
    'data' => array(
        array(
            'topic_name' => 'test',
            'partitions' => array(
                array(
                    'partition_id' => 0,
                ),
            ),
        ),
    ),
);


$conn = new \Kafka\Socket('localhost', '9092');
$conn->connect();
$encoder = new \Kafka\Protocol\Encoder($conn);
$encoder->fetchOffsetRequest($data);

$decoder = new \Kafka\Protocol\Decoder($conn);
$result = $decoder->fetchOffsetResponse();
var_dump($result);