comecc / kafka-php
v0.3.0.0
2020-06-17 05:57 UTC
Requires
- php: ^7.1
- amphp/amp: ^2.0.3
- lcobucci/clock: ^1.0
- psr/log: ^1.0
Requires (Dev)
- doctrine/coding-standard: ^2.1
- infection/infection: ^0.7
- kmelia/monolog-stdout-handler: ^1.2
- mikey179/vfsstream: ^1.6.5
- monolog/monolog: ^1.23
- phpstan/phpstan: ^0.9
- phpstan/phpstan-phpunit: ^0.9
- phpstan/phpstan-strict-rules: ^0.9
- phpunit/phpcov: ^4.0
- phpunit/phpunit: ^6.5
- satooshi/php-coveralls: 2.0.0
- slevomat/coding-standard: ^4.1
- squizlabs/php_codesniffer: ^3.2
Suggests
- ext-krb5: To be able to ues the GSSAPI SASL mechanism
README
Kafka-php 是一个纯 PHP Kafka 客户端,目前支持 Kafka 0.8.x 以上版本,本项目 v0.2.x 和 v0.1.x 不兼容,如果使用原始的 v0.1.x,您可以参考文档 Kafka PHP v0.1.x 文档,但建议切换到 v0.2.x。v0.2.x 使用 PHP 异步实现和 Kafka 代理交互,比 v0.1.x 更稳定,效率更高,因为使用 PHP 语言,所以不需要编译任何扩展,可以用来减少访问和维护成本
要求
- 最低 PHP 版本:7.1
- Kafka 版本大于 0.8
- 消费者模块需要 Kafka 代理版本大于 0.9.0
安装
将 lib 目录添加到 PHP include_path,并使用示例目录中的自动加载器(代码遵循 PEAR/Zend 单类单文件约定)。
Composer 安装
如果您使用 Composer 管理项目的依赖项,请将依赖项 nmred/kafka-php
添加到您的项目中。
$ composer require nmred/kafka-php
以下是一个 composer.json 文件的最小示例
{
"require": {
"nmred/kafka-php": "0.2.*"
}
}
配置
配置属性在 配置 中有文档说明
生产者
异步模式
<?php require '../vendor/autoload.php'; date_default_timezone_set('PRC'); use Monolog\Logger; use Monolog\Handler\StdoutHandler; // Create the logger $logger = new Logger('my_logger'); // Now add some handlers $logger->pushHandler(new StdoutHandler()); $config = \Kafka\ProducerConfig::getInstance(); $config->setMetadataRefreshIntervalMs(10000); $config->setMetadataBrokerList('10.13.4.159:9192'); $config->setBrokerVersion('1.0.0'); $config->setRequiredAck(1); $config->setIsAsyn(false); $config->setProduceInterval(500); $producer = new \Kafka\Producer( function() { return [ [ 'topic' => 'test', 'value' => 'test....message.', 'key' => 'testkey', ], ]; } ); $producer->setLogger($logger); $producer->success(function($result) { var_dump($result); }); $producer->error(function($errorCode) { var_dump($errorCode); }); $producer->send(true);
同步模式
<?php require '../vendor/autoload.php'; date_default_timezone_set('PRC'); use Monolog\Logger; use Monolog\Handler\StdoutHandler; // Create the logger $logger = new Logger('my_logger'); // Now add some handlers $logger->pushHandler(new StdoutHandler()); $config = \Kafka\ProducerConfig::getInstance(); $config->setMetadataRefreshIntervalMs(10000); $config->setMetadataBrokerList('127.0.0.1:9192'); $config->setBrokerVersion('1.0.0'); $config->setRequiredAck(1); $config->setIsAsyn(false); $config->setProduceInterval(500); $producer = new \Kafka\Producer(); $producer->setLogger($logger); for($i = 0; $i < 100; $i++) { $producer->send([ [ 'topic' => 'test1', 'value' => 'test1....message.', 'key' => '', ], ]); }
消费者
<?php require '../vendor/autoload.php'; date_default_timezone_set('PRC'); use Monolog\Logger; use Monolog\Handler\StdoutHandler; // Create the logger $logger = new Logger('my_logger'); // Now add some handlers $logger->pushHandler(new StdoutHandler()); $config = \Kafka\ConsumerConfig::getInstance(); $config->setMetadataRefreshIntervalMs(10000); $config->setMetadataBrokerList('10.13.4.159:9192'); $config->setGroupId('test'); $config->setBrokerVersion('1.0.0'); $config->setTopics(['test']); //$config->setOffsetReset('earliest'); $consumer = new \Kafka\Consumer(); $consumer->setLogger($logger); $consumer->start(function($topic, $part, $message) { var_dump($message); });
低级 API
参考 示例