g4197 / kafka-php
PHP 的 Kafka 客户端
v0.0.3
2024-06-30 11:04 UTC
Requires
- php: >=8.0
- amphp/amp: ^2.0.3
- lcobucci/clock: ^1.0
- psr/log: ^2.0|^3.0
Requires (Dev)
- maglnet/composer-require-checker: ^4.7
- mikey179/vfsstream: ^1.6.5
- phpunit/phpunit: ^10.5
- rector/rector: ^1.0.3
- roave/infection-static-analysis-plugin: ^1.34
- spatie/phpunit-watcher: ^1.23
- vimeo/psalm: ^5.16
- yiisoft/test-support: ^3.0
Suggests
- ext-krb5: To be able to use the GSSAPI SASL mechanism
This package is auto-updated.
Last update: 2024-09-30 11:41:00 UTC
README
从 kafka-php 分支。
分支原因
- 不兼容的库
- 旧 PHP 版本(7.1)
Kafka-php 是一个纯 PHP Kafka 客户端,目前支持 Kafka 0.8.x 以上的版本。该项目 v0.2.x 和 v0.1.x 版本不兼容,如果使用原始的 v0.1.x 版本,您可以参考文档 Kafka PHP v0.1.x 文档,但建议切换到 v0.2.x。v0.2.x 使用 PHP 异步实现和 Kafka 代理交互,比 v0.1.x 更稳定、更高效,因为使用 PHP 语言,所以不需要任何扩展即可使用,可以减少访问和维护成本。
要求
- 最低 PHP 版本:8.0
- Kafka 版本大于 0.8
- 消费者模块需要 Kafka 代理版本大于 0.9.0
安装
可以使用 composer 安装此包
composer require g41797/kafka-php
配置
配置属性在 配置 中有文档说明
生产者
异步模式
<?php require '../vendor/autoload.php'; date_default_timezone_set('PRC'); use Monolog\Logger; use Monolog\Handler\StdoutHandler; // Create the logger $logger = new Logger('my_logger'); // Now add some handlers $logger->pushHandler(new StdoutHandler()); $config = \Kafka\ProducerConfig::getInstance(); $config->setMetadataRefreshIntervalMs(10000); $config->setMetadataBrokerList('10.13.4.159:9192'); $config->setBrokerVersion('1.0.0'); $config->setRequiredAck(1); $config->setIsAsyn(false); $config->setProduceInterval(500); $producer = new \Kafka\Producer( function() { return [ [ 'topic' => 'test', 'value' => 'test....message.', 'key' => 'testkey', ], ]; } ); $producer->setLogger($logger); $producer->success(function($result) { var_dump($result); }); $producer->error(function($errorCode) { var_dump($errorCode); }); $producer->send(true);
同步模式
<?php require '../vendor/autoload.php'; date_default_timezone_set('PRC'); use Monolog\Logger; use Monolog\Handler\StdoutHandler; // Create the logger $logger = new Logger('my_logger'); // Now add some handlers $logger->pushHandler(new StdoutHandler()); $config = \Kafka\ProducerConfig::getInstance(); $config->setMetadataRefreshIntervalMs(10000); $config->setMetadataBrokerList('127.0.0.1:9192'); $config->setBrokerVersion('1.0.0'); $config->setRequiredAck(1); $config->setIsAsyn(false); $config->setProduceInterval(500); $producer = new \Kafka\Producer(); $producer->setLogger($logger); for($i = 0; $i < 100; $i++) { $producer->send([ [ 'topic' => 'test1', 'value' => 'test1....message.', 'key' => '', ], ]); }
消费者
<?php require '../vendor/autoload.php'; date_default_timezone_set('PRC'); use Monolog\Logger; use Monolog\Handler\StdoutHandler; // Create the logger $logger = new Logger('my_logger'); // Now add some handlers $logger->pushHandler(new StdoutHandler()); $config = \Kafka\ConsumerConfig::getInstance(); $config->setMetadataRefreshIntervalMs(10000); $config->setMetadataBrokerList('10.13.4.159:9192'); $config->setGroupId('test'); $config->setBrokerVersion('1.0.0'); $config->setTopics(['test']); //$config->setOffsetReset('earliest'); $consumer = new \Kafka\Consumer(); $consumer->setLogger($logger); $consumer->start(function($topic, $part, $message) { var_dump($message); });
低级 API
参考 示例