arquivei / php-kafka-consumer
PHP的Kafka消费者
Requires
- php: ~7.2 || ~7.3 || ~7.4 || ^8.0
- ext-rdkafka: ~3.0 || ~3.1 || ~4.0 || ~5.0 || ~6.0
- illuminate/console: ~6 || ~7 || ~8 || ~9 || ~10 || ~11
- monolog/monolog: ~1 || ~2 || ~3
Requires (Dev)
- phpunit/phpunit: ~7 || ~8 || ~9 || ~10 || ~11
- dev-master
- 2.6.1
- 2.6.0
- 2.5.0
- 2.4.0
- 2.3.1
- 2.3.0
- 2.2.4
- 2.2.3
- 2.2.2
- 2.2.1
- 2.2.0
- 2.1.0
- 2.0.3
- 2.0.2
- 2.0.1
- 2.0.0
- 1.7.0
- 1.6.1
- 1.6.0
- 1.5.0
- 1.4.4
- 1.4.3
- 1.4.2
- v1.4.1
- v1.4.0.x-dev
- 1.4.0
- 1.3.1
- 1.3.0
- 1.2.0
- 1.1.0
- 1.0.0
- dev-illuminate-console-10.x-support-test
- dev-feature/php8.2-support
- dev-illuminate-console-10.x-support
- dev-illuminate-console-9.x-support
- dev-php8-support
- dev-hotfix/retry-ignore-kafka-internal-timeout
- dev-feature/ConsumerImprovement
- dev-andrelugomes-patch-1
- dev-deprecation
- dev-refactory-kafka
This package is auto-updated.
Last update: 2024-09-12 19:29:49 UTC
README
PHP中的Apache Kafka消费者。订阅主题并定义回调来处理消息。
需求
为了使用这个库,您需要php-rdkafka PECL扩展。请注意,该扩展需要librdkafka C库。
最低要求
安装
使用composer
composer require arquivei/php-kafka-consumer
用法
<?php require_once 'vendor/autoload.php'; use Kafka\Consumer\ConsumerBuilder; use Kafka\Consumer\Entities\Config\Sasl; class DefaultConsumer { public function __invoke(string $message): void { print 'Init: ' . date('Y-m-d H:i:s') . PHP_EOL; sleep(2); print 'Finish: ' . date('Y-m-d H:i:s') . PHP_EOL; } } $consumer = ConsumerBuilder::create('broker:port', 'php-kafka-consumer-group-id', ['topic']) ->withSasl(new Sasl('username', 'pasword', 'mechanisms')) ->withCommitBatchSize(1) ->withSecurityProtocol('security-protocol') ->withHandler(new DefaultConsumer()) // or any callable ->build(); $consumer->consume();
或使用旧版API
<?php require_once 'vendor/autoload.php'; use Kafka\Consumer\Contracts\Consumer; use Kafka\Consumer\Entities\Config; use Kafka\Consumer\Entities\Config\Sasl; class DefaultConsumer extends Consumer { public function handle(string $message): void { print 'Init: ' . date('Y-m-d H:i:s') . PHP_EOL; sleep(2); print 'Finish: ' . date('Y-m-d H:i:s') . PHP_EOL; } } $config = new Config( new Sasl( 'username', 'password', 'mechanisms' ), ['topic'], 'broker:port', 1, 'php-kafka-consumer-group-id', new DefaultConsumer(), 'PLAINTEXT', 'topic-dlq', 1, 6 ); $consumer = new \Kafka\Consumer\Consumer($config); $consumer->consume();
与Laravel一起使用
您需要在config
路径中添加php-kafka-consig.php
<?php return [ 'topic' => 'topic', 'broker' => 'broker', 'groupId' => 'group-id', 'securityProtocol' => 'security-protocol', 'sasl' => [ 'mechanisms' => 'mechanisms', 'username' => 'username', 'password' => 'password', ], ];
使用命令执行消费者
$ php artisan arquivei:php-kafka-consumer --consumer="App\Consumers\YourConsumer" --commit=1
中间件
中间件是接收两个参数的简单调用函数:正在处理的消息和下一个处理程序。中间件的一些可能的用例包括:消息转换、过滤、日志记录或甚至是事务处理,您的想象力是无限的。
<?php use Kafka\Consumer\ConsumerBuilder; $consumer = ConsumerBuilder::create('broker:port', 'php-kafka-consumer-group-id', ['topic']) ->withHandler(function ($message) {/** ... */}) // You may add any number of middlewares, they will be executed in the order provided ->withMiddleware(function (string $rawMessage, callable $next): void { $decoded = json_decode($rawMessage, true); $next($decoded); }) ->withMiddleware(function (array $message, callable $next): void { if (! isset($message['foo'])) { return; } $next($message); }) ->build(); $consumer->consume();
构建和测试
如果您想贡献,有一些实用工具可以帮助您。
首先创建一个容器
docker compose up -d --build
如果您有make,您可以使用Makefile中的预定义命令
make build
然后安装依赖项
docker compose exec php-fpm composer install
或使用make
make composer install
您可以在本地运行测试
docker compose exec php-fpm ./vendor/phpunit/phpunit/phpunit tests
或使用make
make test
并检查覆盖率
docker compose exec php-fpm phpdbg -qrr ./vendor/bin/phpunit --whitelist src/ --coverage-html coverage/
或使用make
make coverage