hferradj / kafka-php
PHP的Kafka客户端
v0.2.0.14
2021-10-18 15:07 UTC
Requires
- php: ^7.1|^8.0
- amphp/amp: ^2.0.3
- lcobucci/clock: ^1.0
- psr/log: ^1.0
Suggests
- ext-krb5: To be able to ues the GSSAPI SASL mechanism
README
Kafka-php是一个纯PHP的Kafka客户端,目前支持大于0.8.x版本的Kafka,本项目v0.2.x和v0.1.x版本不兼容,如果使用原始的v0.1.x版本,您可以参考文档Kafka PHP v0.1.x 文档,但建议切换到v0.2.x版本。v0.2.x版本使用PHP异步实现和Kafka代理交互,比v0.1.x版本更稳定、更高效,因为使用PHP语言,所以无需编译任何扩展,可以降低访问和维护成本
要求
- 最低PHP版本:7.1
- Kafka版本大于0.8
- 消费者模块需要Kafka代理版本大于0.9.0
安装
将lib目录添加到PHP的include_path,并使用示例目录中的自动加载器(代码遵循PEAR/Zend的一个类一个文件规范)。
Composer安装
如果您使用Composer管理项目的依赖关系,只需将依赖项nmred/kafka-php添加到您的项目中。
$ composer require nmred/kafka-php
以下是一个 composer.json 文件的示例
{
"require": {
"nmred/kafka-php": "0.2.*"
}
}
配置
配置属性在配置中有文档说明
生产者
异步模式
<?php require '../vendor/autoload.php'; date_default_timezone_set('PRC'); use Monolog\Logger; use Monolog\Handler\StdoutHandler; // Create the logger $logger = new Logger('my_logger'); // Now add some handlers $logger->pushHandler(new StdoutHandler()); $config = \Kafka\ProducerConfig::getInstance(); $config->setMetadataRefreshIntervalMs(10000); $config->setMetadataBrokerList('10.13.4.159:9192'); $config->setBrokerVersion('1.0.0'); $config->setRequiredAck(1); $config->setIsAsyn(false); $config->setProduceInterval(500); $producer = new \Kafka\Producer( function() { return [ [ 'topic' => 'test', 'value' => 'test....message.', 'key' => 'testkey', ], ]; } ); $producer->setLogger($logger); $producer->success(function($result) { var_dump($result); }); $producer->error(function($errorCode) { var_dump($errorCode); }); $producer->send(true);
同步模式
<?php require '../vendor/autoload.php'; date_default_timezone_set('PRC'); use Monolog\Logger; use Monolog\Handler\StdoutHandler; // Create the logger $logger = new Logger('my_logger'); // Now add some handlers $logger->pushHandler(new StdoutHandler()); $config = \Kafka\ProducerConfig::getInstance(); $config->setMetadataRefreshIntervalMs(10000); $config->setMetadataBrokerList('127.0.0.1:9192'); $config->setBrokerVersion('1.0.0'); $config->setRequiredAck(1); $config->setIsAsyn(false); $config->setProduceInterval(500); $producer = new \Kafka\Producer(); $producer->setLogger($logger); for($i = 0; $i < 100; $i++) { $producer->send([ [ 'topic' => 'test1', 'value' => 'test1....message.', 'key' => '', ], ]); }
消费者
<?php require '../vendor/autoload.php'; date_default_timezone_set('PRC'); use Monolog\Logger; use Monolog\Handler\StdoutHandler; // Create the logger $logger = new Logger('my_logger'); // Now add some handlers $logger->pushHandler(new StdoutHandler()); $config = \Kafka\ConsumerConfig::getInstance(); $config->setMetadataRefreshIntervalMs(10000); $config->setMetadataBrokerList('10.13.4.159:9192'); $config->setGroupId('test'); $config->setBrokerVersion('1.0.0'); $config->setTopics(['test']); //$config->setOffsetReset('earliest'); $consumer = new \Kafka\Consumer(); $consumer->setLogger($logger); $consumer->start(function($topic, $part, $message) { var_dump($message); });
低级API
参考示例
