arquivei/php-kafka-consumer

PHP的Kafka消费者

安装数: 50 170

依赖项: 1

建议者: 0

安全性: 0

星标: 5

关注者: 13

分支: 2

开放问题: 11

类型:项目

2.6.1 2024-09-12 19:26 UTC

README

Latest Stable Version Total Downloads Tests Dependency coverage

PHP中的Apache Kafka消费者。订阅主题并定义回调来处理消息。

需求

为了使用这个库,您需要php-rdkafka PECL扩展。请注意,该扩展需要librdkafka C库。

最低要求

安装

使用composer

composer require arquivei/php-kafka-consumer

用法

<?php

require_once 'vendor/autoload.php';

use Kafka\Consumer\ConsumerBuilder;
use Kafka\Consumer\Entities\Config\Sasl;

class DefaultConsumer
{
    public function __invoke(string $message): void
    {
        print 'Init: ' . date('Y-m-d H:i:s') . PHP_EOL;
        sleep(2);
        print 'Finish: ' . date('Y-m-d H:i:s') . PHP_EOL;
    }
}

$consumer = ConsumerBuilder::create('broker:port', 'php-kafka-consumer-group-id', ['topic'])
    ->withSasl(new Sasl('username', 'pasword', 'mechanisms'))
    ->withCommitBatchSize(1)
    ->withSecurityProtocol('security-protocol')
    ->withHandler(new DefaultConsumer()) // or any callable
    ->build();

$consumer->consume();

或使用旧版API

<?php

require_once 'vendor/autoload.php';

use Kafka\Consumer\Contracts\Consumer;
use Kafka\Consumer\Entities\Config;
use Kafka\Consumer\Entities\Config\Sasl;

class DefaultConsumer extends Consumer
{
    public function handle(string $message): void
    {
        print 'Init: ' . date('Y-m-d H:i:s') . PHP_EOL;
        sleep(2);
        print 'Finish: ' . date('Y-m-d H:i:s') . PHP_EOL;
    }
}

$config = new Config(
    new Sasl(
        'username',
        'password',
        'mechanisms'
    ),
    ['topic'],
    'broker:port',
    1,
    'php-kafka-consumer-group-id',
    new DefaultConsumer(),
    'PLAINTEXT',
    'topic-dlq',
    1,
    6
);

$consumer = new \Kafka\Consumer\Consumer($config);
$consumer->consume();

与Laravel一起使用

您需要在config路径中添加php-kafka-consig.php

<?php

return [
    'topic' => 'topic',
    'broker' => 'broker',
    'groupId' => 'group-id',
    'securityProtocol' => 'security-protocol',
    'sasl' => [
        'mechanisms' => 'mechanisms',
        'username' => 'username',
        'password' => 'password',
    ],
];

使用命令执行消费者

$ php artisan arquivei:php-kafka-consumer --consumer="App\Consumers\YourConsumer" --commit=1

中间件

中间件是接收两个参数的简单调用函数:正在处理的消息和下一个处理程序。中间件的一些可能的用例包括:消息转换、过滤、日志记录或甚至是事务处理,您的想象力是无限的。

<?php

use Kafka\Consumer\ConsumerBuilder;

$consumer = ConsumerBuilder::create('broker:port', 'php-kafka-consumer-group-id', ['topic'])
    ->withHandler(function ($message) {/** ... */})
    // You may add any number of middlewares, they will be executed in the order provided
    ->withMiddleware(function (string $rawMessage, callable $next): void {
        $decoded = json_decode($rawMessage, true);
        $next($decoded);
    })
    ->withMiddleware(function (array $message, callable $next): void {
        if (! isset($message['foo'])) {
            return;
        }
        $next($message);
    })
    ->build();

$consumer->consume();

构建和测试

如果您想贡献,有一些实用工具可以帮助您。

首先创建一个容器

docker compose up -d --build

如果您有make,您可以使用Makefile中的预定义命令

make build

然后安装依赖项

docker compose exec php-fpm composer install

或使用make

make composer install

您可以在本地运行测试

docker compose exec php-fpm ./vendor/phpunit/phpunit/phpunit tests

或使用make

make test

并检查覆盖率

docker compose exec php-fpm phpdbg -qrr ./vendor/bin/phpunit --whitelist src/ --coverage-html coverage/

或使用make

make coverage