grongor / kafka-rest-client
Apache Kafka的Confluent REST代理客户端
Requires
- php: ^7.2|^8.0
- beberlei/assert: ^3.2
- jms/serializer: ^3.2
- lcobucci/clock: ^1.2
- psr/http-client: ^1.0
- psr/http-factory: ^1.0
- psr/http-message: ^1.0.1
- psr/log: ^1.1
- teapot/status-code: ^1.1
- thecodingmachine/safe: ^0.1.16
Requires (Dev)
- cdn77/coding-standard: ^2.0
- guzzlehttp/psr7: ^1.6
- http-interop/http-factory-guzzle: ^1.0
- jakub-onderka/php-parallel-lint: ^1.0.0
- mockery/mockery: ^1.2.3
- php-http/message: ^1.0
- phpstan/extension-installer: ^1.0
- phpstan/phpstan: ^0.11.19
- phpstan/phpstan-beberlei-assert: ^0.11.2
- phpstan/phpstan-mockery: ^0.11.3
- phpstan/phpstan-phpunit: ^0.11.2
- phpstan/phpstan-strict-rules: ^0.11.1
- phpunit/phpunit: ^8.0
- roave/security-advisories: dev-master
- slevomat/coding-standard: dev-master as 5.0.4
- thecodingmachine/phpstan-safe-rule: ^0.1.4
Conflicts
- doctrine/annotations: <1.7.0
This package is auto-updated.
Last update: 2024-08-31 00:32:17 UTC
README
这是一个现代PHP客户端,用于Confluent REST代理(版本2)。
用于与API通信的HTTP客户端遵循现代的HTTP标准,如PSR-7、PSR-17和PSR-18。
除了实现REST代理API,此库还添加了一些方便的类,这些类对于与Apache Kafka的交互非常有用 - 请参阅示例以获取完整列表和简单教程,说明如何使用它们。
缺少的功能
一些功能被有意跳过以简化实现/缩短开发时间。如果您需要任何东西并且愿意创建一个pull request,我会很高兴查看它(如果合理)并将其合并。您可以通过打开问题来开始讨论。
- REST代理API的版本1
- 升级REST代理很容易,因此我认为实现第一个版本没有意义。
- AVRO嵌入格式
- JSON嵌入格式
- 我认为二进制格式足够了。您可以在与该库交互之前/之后始终序列化/反序列化您的对象,因此直接集成在这里只是增加了复杂性。
- 异步操作
- 为了简化库,所有方法都是同步的。如果需要,这肯定会改变。
- 此处提到的所有尚未实现的功能(在此处)
- 实现这些功能后,它们将尽快添加到这里。
示例
生产者
生产者允许您逐条或批量生产消息。两者都使用相同的底层API方法,因此使用批量方法更有效。
成功时,`produce` 和 `produceBatch` 都不返回任何内容,失败时抛出异常。可能会有部分成功/失败,因此抛出的异常 `FailedToProduceMessages` 包含两个公共属性,`$retryable` 和 `$nonRetryable`,其中每个都包含一个数组,`['error' => 'Kafka提供的错误', message => (生产给消息的对象)]`。错误是否可重试基于 `error_code`,如文档所述。您如何处理这些错误取决于您。
$producer = new Producer($restClient); $producer->produce('your-topic', new Message('some-message')); $messages = [new Message('some-message'), new Message('and-some-other-message')]; $producer->produceBatch('your-topic', $messages);
消费者
消费者允许您逐条消费消息,直到您从循环中返回、应用程序抛出异常或以其他方式被迫退出。`consume` 方法返回一个 生成器 并无限循环,在消息可用时产生消息。`consume` 方法接受两个参数:`timeout` 和 `maxBytes`。`timeout` 是消费者等待消息的最大时间。`maxBytes` 是单个请求中要获取的消息的最大大小。这两个设置与 `ConsumerOptions` 中的设置以及服务器设置互补(请参阅Kafka文档以获取更多信息)。
如果您将消费者选项 autoCommitEnable
设置为 false
,则可以使用消费者的 commit
方法提交消息。只需传入您希望提交的消息。在大多数情况下,建议关闭自动提交并手动提交每条消息,这样您的应用程序在处理过程中死亡时就不会“丢失”任何消息。
您还可以(可选地)使用 setIdleCallback
方法设置一个空闲回调。当没有消息可提供时,将调用此回调。如果没有提供,则空闲间隔等于 timeout
参数,如果设置了 consumerRequestTimeoutMs
选项,则等于该选项,否则等于代理配置选项 consumer.request.timeout.ms
。
$consumerFactory = new ConsumerFactory($restClient); $consumer = $consumerFactory->create('your-consumer-group', Subscription::topic('your-topic')); foreach ($consumer->consume() as $message) { // Do your magic $logger->info('Got new message', $message->content); // ... and when you are done, commit the message (if you turned off auto-committing). $consumer->commit($message); }
BatchConsumer
BatchConsumer 与 Consumer 的工作方式相同;不同之处在于 BatchConsumer 不单独提供每条消息,而是首先将它们放入批次(MessagesBatch
)。这些批次可以通过消息数量(maxCount
)、时间(maxDuration
)或两者(设置 maxCount
和 maxDuration
)进行配置来限制。如果您设置了 maxDuration
,则批次将不会超过该时间(加一些处理时间),因为它改变了消费者的 timeout
参数(消费者不会在网络中等待更多消息而卡住)。
BatchConsumer 对于处理“大型”数据集非常有用,否则您必须自己批量处理消息(例如,在插入数据库时,批量操作总是更好的)。
如 Consumer 示例中所述,您可能需要提交消息。为此,有一个 commit
方法,它接受提供的 MessagesBatch
并在一个请求中提交其内部的所有消息。
$batchConsumerFactory = new BatchConsumerFactory($restClient); $batchConsumer = $batchConsumerFactory->create( 'your-consumer-group', Subscription::topic('your-topic'), $maxCount = 10000, // Yield the batch when there is 10 000 messages in it $maxDuration = 60 // or when 60 seconds passes, whichever comes first. ); foreach ($batchConsumer->consume() as $messagesBatch) { // The batch might be empty if you specified the maxDuration. if ($messagesBatch->isEmpty()) { continue; } // Do your magic $database->insert($messagesBatch->getMessages()); // ... and when you are done, commit the batch. $batchConsumer->commit($messagesBatch); }