spartaques/phpcorekafka

此包最新版本(v18.0.0)没有提供许可证信息。

php rdkafka 的包装器

v18.0.0 2022-06-03 08:39 UTC

This package is auto-updated.

Last update: 2024-09-30 01:26:57 UTC


README

这个库是一个 纯PHP 的包装器,用于 https://arnaud.le-blanc.net/php-rdkafka-doc/phpdoc/book.rdkafka.html。它已经在 https://kafka.apache.org/ 上进行了测试。

这个库被用于简化PHP项目的开发工作,并处理常见用例。它受到了 https://github.com/php-amqplib/php-amqplib 的启发。

项目负责人

https://github.com/Spartaques

支持的 Kafka 版本

Kafka 版本 >= 0.9.0.0

设置

确保您已安装 composer,然后运行以下命令

composer require spartaques/phpcorekafka

主题

在为您的应用程序创建主题之前,有一些事情您应该了解。

  1. 消息排序
  2. 副本因子
  3. 分区数量

关于排序: 需要保持固定顺序的任何事件都必须进入同一个主题(并且它们还必须使用相同的分区键)。因此,作为经验法则,我们可以这样说,所有关于同一实体的所有事件都需要进入同一个主题。下面将描述分区键。

副本因子应用于实现容错。

分区是Kafka中的扩展单元,因此我们必须了解我们的数据并计算出适当的数量。

生产

关于生产,最重要的几点是

  1. 模式(同步、异步、fire & forget)
  2. 配置
  3. 消息顺序
  4. 负载模式

1 因为Kafka默认使用异步模式,如果代理出现错误,我们可能会丢失一些消息。为了避免这种情况,我们可以简单地等待从代理那里获得响应。

examples/producesync.php 描述了同步生产。我们简单地使用超时来生产消息,这意味着我们等待来自服务器的响应。

examples/produce_async.php 描述了异步生产。我们只是使用0作为poll的超时值。

在使用异步模式时,不要忘记调用flush()以确保所有事件都已发布。这与PHP有关,因为PHP在每个请求后都会死亡,因此可能会丢失一些事件。

2 我们应该了解我们需要从生产者和代理那里得到什么。所有东西都可以配置以获得最佳结果。

生产者最重要的配置参数

acks - acks参数控制有多少个分区副本必须接收到记录,然后生产者才能认为写入成功。此选项对消息丢失的可能性有重大影响。

buffer.memory - 这设置生产者将用于缓冲等待发送到代理的消息的内存量。

compression.type - 默认情况下,消息是未压缩的。可以将此参数设置为snappy、gzip或lz4,在这种情况下,相应的压缩算法将在发送到代理之前压缩数据。

重试次数 - 当生产者从服务器接收到错误消息时,错误可能是瞬时的(例如,某个分区的领导者缺失)。在这种情况下,重试参数的值将控制生产者在放弃并通知客户端问题之前将重试发送消息的次数。

等等...

3 Kafka使用排序策略(分区器)将消息发送到分区,这取决于使用场景。https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md描述了处理此情况的分区器参数。

示例:examples/produceorder.php

提示:始终使用RD_KAFKA_PARTITION_UA作为分区号。Kafka将处理一切。

在大多数情况下,顺序并不重要。但如果我们,例如,使用kafka存储客户端,客户端地址信息在一般信息之前是不合适的。

因此,在这种情况下,我们有两种选择

  1. 仅使用1个分区和1个消费者,因此消息将按一个顺序处理。
  2. 使用多个分区和消费者,但与某些实体相关的消息应始终发送到同一个分区。Kafka默认通过一致性哈希和分区器处理这种情况。(例如,使用Rabbitmq时,您应该自己编写这个逻辑)。

4 消息有效负载架构应该是干净和简单的,并且仅包含使用的数据。

对于更复杂的架构,请使用https://avro.apache.org/序列化器。

消费

  1. 提交

  2. 重新平衡

  3. 配置

1 根据应该处理的数据,我们可以选择适合我们的行为。

如果重复或丢失不是问题,使用自动提交(默认情况下工作)将是明智的决定。

当我们想避免这种行为时,我们应该使用手动提交。

手动提交仅在将enable.auto.commit设置为false时才起作用,并且有两种模式

  1. 同步 - 在处理消息后提交最后一个偏移量。示例:examples/consumesynccommit.php

  2. 异步 - 在处理消息后非阻塞地提交最后一个偏移量。示例:examples/consume_async_commit.php

  3. 自动提交(默认)- examples/consumeautocommit.php

2 重新平衡是将分区重新分配给可用消费者的过程。它从消费者领导者开始,当它在一段时间内没有收到某个消费者的心跳时。当我们使用手动提交模式时,我们应该在重新平衡开始之前提交偏移量。

示例:src/Common/DefaultCallbacks.php , syncRebalance()。

3 配置

最重要的配置参数

enable.auto.commit - 此参数控制消费者是否会自动提交偏移量,默认为true。

fetch.min.bytes - 此属性允许消费者指定它希望从代理程序获取记录时的最小数据量。

fetch.max.wait.ms - 通过设置fetch.min.bytes,您告诉Kafka在响应消费者之前等待,直到它有足够的数据可以发送。

max.partition.fetch.bytes - 此属性控制服务器返回每个分区的最大字节数。默认为1 MB。

session.timeout.ms - 消费者可以不与代理程序保持联系,同时仍然被视为活跃的时间量,默认为3秒。

auto.offset.reset - 此属性控制消费者在开始读取它没有提交偏移量的分区或如果提交的偏移量无效时的行为(通常因为消费者已经关闭了很长时间,以至于具有该偏移量的记录已经从代理程序中过时)。

max.poll.records - 此参数控制对poll()的单个调用将返回的最大记录数。

receive.buffer.bytessend.buffer.bytes - 这些是在套接字写入和读取数据时使用的TCP发送和接收缓冲区的大小。

更多信息: https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md

在生产中使用

应该处理两个问题

  1. kafka
  2. 服务器上代码能够正常运行。

在一种情况下,我们必须处理所有错误、记录和分析。为此,我们可以注册回调函数

并使用自定义回调函数推送一些通知。例如

src/Common/DefaultCallbacks.php 中的 error() 方法。

在第二种情况下,我们应该使用某些进程管理器,即 supervisor 来监控我们的进程,并确信我们的消费者能够优雅地处理信号和退出(关闭连接)。

与框架一起使用

此库与框架无关。