ensi / laravel-phprdkafka
Laravel 与 php-rdkafka 之间的桥接包
0.4.0
2024-06-26 12:52 UTC
Requires
- php: ^8.1
- ext-rdkafka: ^5.0 || ^6.0
- laravel/framework: ^9.0 || ^10.0 || ^11.0
Requires (Dev)
- friendsofphp/php-cs-fixer: ^3.2
- nunomaduro/collision: ^6.0 || ^7.0 || ^8.1
- orchestra/testbench: ^7.0 || ^8.0 || ^9.0
- pestphp/pest: ^1.22 || ^2.0
- pestphp/pest-plugin-laravel: ^1.1 || ^2.0
- phpstan/extension-installer: ^1.3
- phpstan/phpstan: ^1.11
- spaze/phpstan-disallowed-calls: ^2.15
README
此包允许您在 config/kafka.php 中描述 Kafka 生产者和消费者,然后在任何地方重用它们。
安装
您可以通过 composer 安装此包
composer require ensi/laravel-phprdkafka
使用以下命令发布配置文件
php artisan vendor:publish --provider="Ensi\LaravelPhpRdKafka\LaravelPhpRdKafkaServiceProvider" --tag="kafka-config"
现在转到 config/kafka.php
并在那里配置您的生产者和消费者。通常每个 Kafka 集群需要一个生产者/消费者。配置参数可以在 Librdkafka 配置参考 中找到
版本兼容性
基本用法
生产者示例
$producer = \Kafka::producer('producer-name'); // returns a configured RdKafka\Producer singleton. // or $producer = \Kafka::producer(); if you want to get the default producer. // or $producer = $kafkaManager->producer(); where $kafkaManager is an instance of Ensi\LaravelPhpRdKafka\KafkaManager resolved from the service container. // now you can implement any producer logic e.g: $headers = []; $topicName = 'test-topic'; $topic = $producer->newTopic($topicName); for ($i = 0; $i < 10; $i++) { $payload = json_encode([ 'body' => "Message $i in topic [$topicName]", 'headers' => $headers ]); $topic->produce(RD_KAFKA_PARTITION_UA, 0, $payload); $producer->poll(0); } for ($flushRetries = 0; $flushRetries < 10; $flushRetries++) { $result = $producer->flush(10000); if (RD_KAFKA_RESP_ERR_NO_ERROR === $result) { break; } } if (RD_KAFKA_RESP_ERR_NO_ERROR !== $result) { // Log and/or throw "Unable to flush Kafka producer, messages of topic [$topicName] might be lost.' exception. } // If you use php-fpm and producing is slow you can move its execution to the place after response has been sent. // This can be achieved e.g. by wrapping the whole producing or at least flushing in it in a "terminating" callback. // app()->terminating(function () { ... });
消费者示例
public function handle(KafkaManager $kafkaManager) { $consumer = $kafkaManager->consumer('consumer-name'); $consumer->subscribe(['test-topic-1', 'test-topic-2']); while (true) { $message = $consumer->consume(120*1000); switch ($message->err) { case RD_KAFKA_RESP_ERR_NO_ERROR: $this->info($message->payload); $this->processMessage($message); // do something with the message // $consumer->commitAsync($message); // commit offsets asynchronously if you set 'enable.auto.commit' => false, in config/kafka.php break; case RD_KAFKA_RESP_ERR__PARTITION_EOF: echo "No more messages; will wait for more\n"; break; case RD_KAFKA_RESP_ERR__TIMED_OUT: // this also happens when there is no new messages in the topic after the specified timeout: https://github.com/arnaud-lb/php-rdkafka/issues/343 echo "Timed out\n"; break; default: throw new Exception($message->errstr(), $message->err); break; } } }
您可以在 php-rdkafka 示例 中了解更多关于 php-rdkafka 生产者和消费者信息
以下 getter 可以提供对 RdKafka\Conf
实例的直接访问
$producerConf = $kafkaManager->producerConfig('producer-name'); $consumerConf = $kafkaManager->consumerConfig('consumer-name');
贡献
请参阅 CONTRIBUTING 以获取详细信息。
测试
- composer install
- composer test
安全漏洞
请查看 我们的安全策略 了解如何报告安全漏洞。
许可证
MIT 许可证 (MIT)。请参阅 许可证文件 了解更多信息。