adaddad/kafka-php

PHP的Kafka客户端

维护者

详细信息

github.com/adaddad/kafka-php

主页

源码

安装: 734

依赖: 0

推荐者: 0

安全: 0

星标: 0

关注者: 0

分支: 450

v0.2.0.8 2017-10-23 09:33 UTC

README

中文文档

QQ Group Build Status Packagist Packagist Packagist GitHub issues GitHub forks GitHub stars GitHub license

Kafka-php 是一个纯PHP Kafka客户端,目前支持大于0.8.x版本的Kafka,本项目v0.2.x和v0.1.x版本不兼容,如果使用原始的v0.1.x版本,您可以参考Kafka PHP v0.1.x 文档,但建议切换到v0.2.x版本。v0.2.x版本使用PHP异步实现和Kafka broker交互,比v0.1.x版本更稳定,效率更高,因为使用PHP语言,所以不需要编译任何扩展,可以降低访问和维护成本

要求

  • 最低PHP版本:7.1
  • Kafka版本大于0.8
  • 消费者模块需要Kafka broker版本大于0.9.0

安装

将lib目录添加到PHP include_path中,并使用示例目录中的自动加载器(代码遵循PEAR/Zend单文件单类规范)。

Composer安装

如果您使用Composer管理项目依赖,只需将nmred/kafka-php作为依赖项添加到您的项目中。

$ composer require nmred/kafka-php

以下是一个 composer.json 文件的示例

{
	"require": {
		"adaddad/kafka-php": "0.2.*"
	}
}

配置

配置属性在配置中进行了文档说明

生产者

异步模式

<?php
require '../vendor/autoload.php';
date_default_timezone_set('PRC');
use Monolog\Logger;
use Monolog\Handler\StdoutHandler;
// Create the logger
$logger = new Logger('my_logger');
// Now add some handlers
$logger->pushHandler(new StdoutHandler());

$config = \Kafka\ProducerConfig::getInstance();
$config->setMetadataRefreshIntervalMs(10000);
$config->setMetadataBrokerList('10.13.4.159:9192');
$config->setBrokerVersion('1.0.0');
$config->setRequiredAck(1);
$config->setIsAsyn(false);
$config->setProduceInterval(500);
$producer = new \Kafka\Producer(
    function() {
        return [
            [
                'topic' => 'test',
                'value' => 'test....message.',
                'key' => 'testkey',
            ],
        ];
    }
);
$producer->setLogger($logger);
$producer->success(function($result) {
	var_dump($result);
});
$producer->error(function($errorCode) {
		var_dump($errorCode);
});
$producer->send(true);

同步模式

<?php
require '../vendor/autoload.php';
date_default_timezone_set('PRC');
use Monolog\Logger;
use Monolog\Handler\StdoutHandler;
// Create the logger
$logger = new Logger('my_logger');
// Now add some handlers
$logger->pushHandler(new StdoutHandler());

$config = \Kafka\ProducerConfig::getInstance();
$config->setMetadataRefreshIntervalMs(10000);
$config->setMetadataBrokerList('127.0.0.1:9192');
$config->setBrokerVersion('1.0.0');
$config->setRequiredAck(1);
$config->setIsAsyn(false);
$config->setProduceInterval(500);
$producer = new \Kafka\Producer();
$producer->setLogger($logger);

for($i = 0; $i < 100; $i++) {
    $producer->send([
        [
            'topic' => 'test1',
            'value' => 'test1....message.',
            'key' => '',
        ],
    ]);
}

消费者

<?php
require '../vendor/autoload.php';
date_default_timezone_set('PRC');
use Monolog\Logger;
use Monolog\Handler\StdoutHandler;
// Create the logger
$logger = new Logger('my_logger');
// Now add some handlers
$logger->pushHandler(new StdoutHandler());

$config = \Kafka\ConsumerConfig::getInstance();
$config->setMetadataRefreshIntervalMs(10000);
$config->setMetadataBrokerList('10.13.4.159:9192');
$config->setGroupId('test');
$config->setBrokerVersion('1.0.0');
$config->setTopics(['test']);
//$config->setOffsetReset('earliest');
$consumer = new \Kafka\Consumer();
$consumer->setLogger($logger);
$consumer->start(function($topic, $part, $message) {
	var_dump($message);
});

低级API

参考示例

QQ群

群1:531522091 群2:657517955 QQ Group