xiaoe / kafka-php
PHP Kafka 客户端
v0.2.1.0
2020-07-21 09:05 UTC
Requires (Dev)
- kmelia/monolog-stdout-handler: 1.2.1
- monolog/monolog: 1.22.1
- phpunit/phpcov: *
- phpunit/phpunit: ~4.0
- satooshi/php-coveralls: dev-master
README
Kafka-php 是一个纯 PHP Kafka 客户端,目前支持大于 0.8.x 版本的 Kafka,本项目 v0.2.x 和 v0.1.x 不兼容。如果使用原始的 v0.1.x,您可以参考文档 Kafka PHP v0.1.x 文档,但建议切换到 v0.2.x。v0.2.x 使用 PHP 异步实现和 Kafka 代理交互,比 v0.1.x 更稳定、更高效,因为使用 PHP 语言,所以不需要任何扩展,可以降低访问和维护成本。
要求
- 最小 PHP 版本:5.5
- Kafka 版本大于 0.8
- 消费者模块需要 Kafka 代理版本大于 0.9.0
安装
将 lib 目录添加到 PHP include_path,并使用示例目录中类似的自动加载器(代码遵循 PEAR/Zend 一类一个文件约定)。
Composer 安装
如果您使用 Composer 管理项目的依赖项,只需在项目的 composer.json 文件中添加 nmred/kafka-php 依赖即可。以下是一个 composer.json 文件的示例
{
"require": {
"xiaoe/kafka-php": "0.2.*"
}
}
配置
配置属性在 配置 中有文档说明
生产者
异步模式
<?php require '../vendor/autoload.php'; date_default_timezone_set('PRC'); use Monolog\Logger; use Monolog\Handler\StdoutHandler; // Create the logger $logger = new Logger('my_logger'); // Now add some handlers $logger->pushHandler(new StdoutHandler()); $config = \Kafka\ProducerConfig::getInstance(); $config->setMetadataRefreshIntervalMs(10000); $config->setMetadataBrokerList('10.13.4.159:9192'); $config->setBrokerVersion('0.9.0.1'); $config->setRequiredAck(1); $config->setIsAsyn(false); $config->setProduceInterval(500); $producer = new \Kafka\Producer(function() { return array( array( 'topic' => 'test', 'value' => 'test....message.', 'key' => 'testkey', ), ); }); $producer->setLogger($logger); $producer->success(function($result) { var_dump($result); }); $producer->error(function($errorCode) { var_dump($errorCode); }); $producer->send(true);
同步模式
<?php
require '../vendor/autoload.php';
date_default_timezone_set('PRC');
use Monolog\Logger;
use Monolog\Handler\StdoutHandler;
// Create the logger
$logger = new Logger('my_logger');
// Now add some handlers
$logger->pushHandler(new StdoutHandler());
$config = \Kafka\ProducerConfig::getInstance();
$config->setMetadataRefreshIntervalMs(10000);
$config->setMetadataBrokerList('127.0.0.1:9192');
$config->setBrokerVersion('0.9.0.1');
$config->setRequiredAck(1);
$config->setIsAsyn(false);
$config->setProduceInterval(500);
$producer = new \Kafka\Producer();
$producer->setLogger($logger);
for($i = 0; $i < 100; $i++) {
$result = $producer->send(array(
array(
'topic' => 'test1',
'value' => 'test1....message.',
'key' => '',
),
));
var_dump($result);
}
消费者
<?php require '../vendor/autoload.php'; date_default_timezone_set('PRC'); use Monolog\Logger; use Monolog\Handler\StdoutHandler; // Create the logger $logger = new Logger('my_logger'); // Now add some handlers $logger->pushHandler(new StdoutHandler()); $config = \Kafka\ConsumerConfig::getInstance(); $config->setMetadataRefreshIntervalMs(10000); $config->setMetadataBrokerList('10.13.4.159:9192'); $config->setGroupId('test'); $config->setBrokerVersion('0.9.0.1'); $config->setTopics(array('test')); //$config->setOffsetReset('earliest'); $consumer = new \Kafka\Consumer(); $consumer->setLogger($logger); $consumer->start(function($topic, $part, $message) { var_dump($message); });
低级 API
参考 示例