nmred / kafka-php
PHP的Kafka客户端
v0.2.0.8
2017-10-23 09:33 UTC
Requires (Dev)
- kmelia/monolog-stdout-handler: 1.2.1
- monolog/monolog: 1.22.1
- phpunit/phpcov: *
- phpunit/phpunit: ~4.0
- satooshi/php-coveralls: dev-master
This package is not auto-updated.
Last update: 2024-09-12 18:28:52 UTC
README
Kafka-php是一个纯PHP Kafka客户端,目前支持大于0.8.x版本的Kafka,该项目v0.2.x和v0.1.x版本不兼容,如果使用原始的v0.1.x版本,您可以参考Kafka PHP v0.1.x 文档,但建议切换到v0.2.x版本。v0.2.x使用PHP异步实现和Kafka代理交互,比v0.1.x更稳定、更高效,因为使用PHP语言,所以无需编译任何扩展即可使用,以降低访问和维护成本。
要求
- 最低PHP版本:7.1
- Kafka版本大于0.8
- 消费者模块需要Kafka代理版本大于0.9.0
安装
将lib目录添加到PHP include_path,并使用示例目录中的自动加载器(代码遵循PEAR/Zend的单文件单类约定)。
Composer安装
如果您使用Composer管理项目的依赖项,只需将依赖项nmred/kafka-php
添加到您的项目中。
$ composer require nmred/kafka-php
以下是一个composer.json文件的最小示例
{
"require": {
"nmred/kafka-php": "0.2.*"
}
}
配置
配置属性在配置中有文档说明
生产者
异步模式
<?php require '../vendor/autoload.php'; date_default_timezone_set('PRC'); use Monolog\Logger; use Monolog\Handler\StdoutHandler; // Create the logger $logger = new Logger('my_logger'); // Now add some handlers $logger->pushHandler(new StdoutHandler()); $config = \Kafka\ProducerConfig::getInstance(); $config->setMetadataRefreshIntervalMs(10000); $config->setMetadataBrokerList('10.13.4.159:9192'); $config->setBrokerVersion('1.0.0'); $config->setRequiredAck(1); $config->setIsAsyn(false); $config->setProduceInterval(500); $producer = new \Kafka\Producer( function() { return [ [ 'topic' => 'test', 'value' => 'test....message.', 'key' => 'testkey', ], ]; } ); $producer->setLogger($logger); $producer->success(function($result) { var_dump($result); }); $producer->error(function($errorCode) { var_dump($errorCode); }); $producer->send(true);
同步模式
<?php require '../vendor/autoload.php'; date_default_timezone_set('PRC'); use Monolog\Logger; use Monolog\Handler\StdoutHandler; // Create the logger $logger = new Logger('my_logger'); // Now add some handlers $logger->pushHandler(new StdoutHandler()); $config = \Kafka\ProducerConfig::getInstance(); $config->setMetadataRefreshIntervalMs(10000); $config->setMetadataBrokerList('127.0.0.1:9192'); $config->setBrokerVersion('1.0.0'); $config->setRequiredAck(1); $config->setIsAsyn(false); $config->setProduceInterval(500); $producer = new \Kafka\Producer(); $producer->setLogger($logger); for($i = 0; $i < 100; $i++) { $producer->send([ [ 'topic' => 'test1', 'value' => 'test1....message.', 'key' => '', ], ]); }
消费者
<?php require '../vendor/autoload.php'; date_default_timezone_set('PRC'); use Monolog\Logger; use Monolog\Handler\StdoutHandler; // Create the logger $logger = new Logger('my_logger'); // Now add some handlers $logger->pushHandler(new StdoutHandler()); $config = \Kafka\ConsumerConfig::getInstance(); $config->setMetadataRefreshIntervalMs(10000); $config->setMetadataBrokerList('10.13.4.159:9192'); $config->setGroupId('test'); $config->setBrokerVersion('1.0.0'); $config->setTopics(['test']); //$config->setOffsetReset('earliest'); $consumer = new \Kafka\Consumer(); $consumer->setLogger($logger); $consumer->start(function($topic, $part, $message) { var_dump($message); });
低级API
参考示例