xiaoe-tech/kafka-php

PHP Kafka 客户端

v0.2.1.0 2020-07-21 09:05 UTC

README

中文文档

QQ Group Build Status Packagist Packagist Packagist GitHub issues GitHub forks GitHub stars GitHub license

Kafka-php 是一个纯 PHP Kafka 客户端,目前支持 Kafka 0.8.x 以上的版本。本项目 v0.2.x 和 v0.1.x 不兼容,如果使用原始的 v0.1.x,您可以参考文档 Kafka PHP v0.1.x 文档,但建议切换到 v0.2.x。v0.2.x 使用 PHP 异步实现和 Kafka 代理交互,比 v0.1.x 更稳定、更高效,因为使用 PHP 语言,所以无需编译任何扩展即可使用,从而降低访问和维护成本。

要求

  • 最小 PHP 版本:5.5
  • Kafka 版本高于 0.8
  • 消费者模块需要 Kafka 代理版本高于 0.9.0

安装

将 lib 目录添加到 PHP include_path 中,并使用示例目录中类似的自动加载器(代码遵循 PEAR/Zend 一类一个文件约定)。

Composer 安装

如果您使用 Composer 来管理项目的依赖项,只需将 nmred/kafka-php 添加到您的项目 composer.json 文件中。以下是一个 composer.json 文件的示例:

{
	"require": {
		"xiaoe/kafka-php": "0.2.*"
	}
}

配置

配置属性在 配置 中有文档说明

生产者

异步模式

<?php
require '../vendor/autoload.php';
date_default_timezone_set('PRC');
use Monolog\Logger;
use Monolog\Handler\StdoutHandler;
// Create the logger
$logger = new Logger('my_logger');
// Now add some handlers
$logger->pushHandler(new StdoutHandler());

$config = \Kafka\ProducerConfig::getInstance();
$config->setMetadataRefreshIntervalMs(10000);
$config->setMetadataBrokerList('10.13.4.159:9192');
$config->setBrokerVersion('0.9.0.1');
$config->setRequiredAck(1);
$config->setIsAsyn(false);
$config->setProduceInterval(500);
$producer = new \Kafka\Producer(function() {
	return array(
		array(
			'topic' => 'test',
			'value' => 'test....message.',
			'key' => 'testkey',
			),
	);
});
$producer->setLogger($logger);
$producer->success(function($result) {
	var_dump($result);
});
$producer->error(function($errorCode) {
		var_dump($errorCode);
});
$producer->send(true);

同步模式

<?php
require '../vendor/autoload.php';
date_default_timezone_set('PRC');
use Monolog\Logger;
use Monolog\Handler\StdoutHandler;
// Create the logger
$logger = new Logger('my_logger');
// Now add some handlers
$logger->pushHandler(new StdoutHandler());

$config = \Kafka\ProducerConfig::getInstance();
$config->setMetadataRefreshIntervalMs(10000);
$config->setMetadataBrokerList('127.0.0.1:9192');
$config->setBrokerVersion('0.9.0.1');
$config->setRequiredAck(1);
$config->setIsAsyn(false);
$config->setProduceInterval(500);
$producer = new \Kafka\Producer();
$producer->setLogger($logger);

for($i = 0; $i < 100; $i++) {
        $result = $producer->send(array(
                array(
                        'topic' => 'test1',
                        'value' => 'test1....message.',
                        'key' => '',
                ),
        ));
        var_dump($result);
}

消费者

<?php
require '../vendor/autoload.php';
date_default_timezone_set('PRC');
use Monolog\Logger;
use Monolog\Handler\StdoutHandler;
// Create the logger
$logger = new Logger('my_logger');
// Now add some handlers
$logger->pushHandler(new StdoutHandler());

$config = \Kafka\ConsumerConfig::getInstance();
$config->setMetadataRefreshIntervalMs(10000);
$config->setMetadataBrokerList('10.13.4.159:9192');
$config->setGroupId('test');
$config->setBrokerVersion('0.9.0.1');
$config->setTopics(array('test'));
//$config->setOffsetReset('earliest');
$consumer = new \Kafka\Consumer();
$consumer->setLogger($logger);
$consumer->start(function($topic, $part, $message) {
	var_dump($message);
});

低级 API

参考 示例