ccinn/kafka-swoole

此包已被废弃,不再维护。没有建议的替代包。
此包最新版本(dev-master)没有可用的许可证信息。

PHP的Kafka客户端

安装: 5

依赖: 0

建议者: 0

安全: 0

星标: 34

关注者: 3

分支: 7

开放问题: 0

类型:application

dev-master 2020-12-24 06:07 UTC

This package is auto-updated.

Last update: 2021-08-24 07:42:54 UTC


README

🌈首个支持多种压缩形式的php-kafka客户端

实现所有kafka协议,提供分别的'HighLevel'和'LowLevel'客户端API,并利用swoole实现协同和灵活扩展消费者客户端

如果您想通过贡献代码来帮助我加速进度,请通过以下邮箱联系我:471113744@qq.com

核心框架:kafka-swoole-core

安装

通过composer

version=dev-master;composer create-project ccinn/kafka-swoole kafka-swoole ${version}

通过git

git clone https://github.com/whiteCcinn/kafka-swoole.git && cd kafka-swoole && composer install

docker

docker build -t kafka-swoole:latest .
docker run -it --name kafka-swoole -v $(PWD):/data/www kafka-swoole:latest bash

渲染

消费者组的成员

Topic:caiwenhui	PartitionCount:1	ReplicationFactor:1	Configs:
	Topic: caiwenhui	Partition: 0	Leader: 1004	Replicas: 1004	Isr: 1004

kafka-client

kafka-client-2

消费者组的多个成员

Topic:kafka-swoole	PartitionCount:4	ReplicationFactor:2	Configs:
	Topic: kafka-swoole	Partition: 0	Leader: 1003	Replicas: 1003,1002	Isr: 1003,1002
	Topic: kafka-swoole	Partition: 1	Leader: 1004	Replicas: 1004,1003	Isr: 1004,1003
	Topic: kafka-swoole	Partition: 2	Leader: 1001	Replicas: 1001,1004	Isr: 1001,1004
	Topic: kafka-swoole	Partition: 3	Leader: 1002	Replicas: 1002,1001	Isr: 1001,1002
  • KAFKA_CLIENT_CONSUMER_NUM=2
  • KAFKA_CLIENT_CONSUMER_NUM=4

kafka-client-3

命令

生产

php bin/kafka-client kafka.produce [options] [--] <message>

php bin/kafka-client kafka.produce --help

Description:
  Send a message

Usage:
  kafka.produce [options] [--] <message>

Arguments:
  message                      The message you wish to send.

Options:
  -t, --topic[=TOPIC]          Which is the topic you want to send?
  -p, --partition[=PARTITION]  Which is the topic you want to send to partition?
  -k, --key[=KEY]              Which is the topic you want to send to partition by key?
  -h, --help                   Display this help message
  -q, --quiet                  Do not output any message
  -V, --version                Display this application version
      --ansi                   Force ANSI output
      --no-ansi                Disable ANSI output
  -n, --no-interaction         Do not ask any interactive question
  -v|vv|vvv, --verbose         Increase the verbosity of messages: 1 for normal output, 2 for more verbose output and 3 for debug

Help:
  This command will help you send separate messages to a topic..

描述组

查看消费者组详情

Description:
  See consumer group details

Usage:
  kafka.describeGroups [options]

Options:
  -t, --topic=TOPIC     Which topic is subscribed by the consumer group?
  -g, --group=GROUP     Which consumer group?
  -h, --help            Display this help message
  -q, --quiet           Do not output any message
  -V, --version         Display this application version
      --ansi            Force ANSI output
      --no-ansi         Disable ANSI output
  -n, --no-interaction  Do not ask any interactive question
  -v|vv|vvv, --verbose  Increase the verbosity of messages: 1 for normal output, 2 for more verbose output and 3 for debug

Help:
  See consumer group details...
php bin/kafka-client kafka.describeGroups -t mulog_clean_24 -g kafka-swoole

DescribeGruops-BaseInfo
=======================

 -------------- ------------ -------------- --------------
  groupId        groupState   protocolType   protocolData
 -------------- ------------ -------------- --------------
  kafka-swoole   Stable       consumer       Range
 -------------- ------------ -------------- --------------

 --------------------------------------------------- -------------- -------------- ---------------- -----------
  memberId                                            clientId       clientHost     topcic           paritions
 --------------------------------------------------- -------------- -------------- ---------------- -----------
  kafka-swoole-44857c49-b019-439b-90dd-d71112b2c01e   kafka-swoole   /192.167.8.2   mulog_clean_24   0,1
 --------------------------------------------------- -------------- -------------- ---------------- -----------

 --------------------------------------------------- -------------- -------------- ---------------- -----------
  memberId                                            clientId       clientHost     topcic           paritions
 --------------------------------------------------- -------------- -------------- ---------------- -----------
  kafka-swoole-5714cd77-a0dd-4d29-aa20-718f9d713908   kafka-swoole   /192.167.8.2   mulog_clean_24   2,3
 --------------------------------------------------- -------------- -------------- ---------------- -----------

Rpc

支持实时获取运行时数据,通过RPC协议与AF_UNIX进程间通信进行交互

php bin/kafka-client rpc -h
Description:
  Built-in runtime RPC command

Usage:
  rpc <type>

Arguments:
  type                  which you want to execute command?

Options:
  -h, --help            Display this help message
  -q, --quiet           Do not output any message
  -V, --version         Display this application version
      --ansi            Force ANSI output
      --no-ansi         Disable ANSI output
  -n, --no-interaction  Do not ask any interactive question
  -v|vv|vvv, --verbose  Increase the verbosity of messages: 1 for normal output, 2 for more verbose output and 3 for debug

Help:
  The following are the built-in RPC command options:
  kafka_lag
  offset_checker
  block_size
  member_leader
  metadata_brokers
  metadata_topics

rpc

  • kafka_lag(检查当前偏移量与kafka最大偏移量之间的总差异)
php bin/kafka-client rpc kafka_lag
1000
  • offset_checker(查看当前偏移量和kafka服务中每个主题分区的偏移量和差异)
php bin/kafka-client rpc offset_checker
 -------------- ----------- ---------------- ------------------ -----------------
  topic          partition   current-offset   kafka-max-offset   remaining-count
 -------------- ----------- ---------------- ------------------ -----------------
  kafka-swoole   2           50223            50223              0
  kafka-swoole   3           70353            70353              0
  kafka-swoole   0           52395            52395              0
  kafka-swoole   1           50407            50407              0
 -------------- ----------- ---------------- ------------------ -----------------
  • block_size(如果您使用间接模式的存储介质,可以使用此命令查看当前存储介质数量)
php bin/kafka-client rpc block_size
254
  • member_leader(查看当前消费者组的Leader)
php bin/kafka-client rpc member_leader
 ---------------------------------------------------
  consumer-group-leaderId
 ---------------------------------------------------
  kafka-swoole-da43c9a0-b12d-46df-9941-ee80456ec9a2
 ---------------------------------------------------

 ---------------------------------------------------
  consumer-group-membersId
 ---------------------------------------------------
  kafka-swoole-6080eb8e-3bfb-4be0-a923-037bb99a2666
  kafka-swoole-da43c9a0-b12d-46df-9941-ee80456ec9a2
 ---------------------------------------------------
  • metadata_brokers(查看kafka服务的可用代理)
php bin/kafka-client rpc metadata_brokers
 --------- --------- ------
  node-id   host      port
 --------- --------- ------
  1003      mkafka3   9092
  1004      mkafka4   9092
  1001      mkafka1   9092
  1002      mkafka2   9092
 --------- --------- ------
  • metadata_topicss(查看订阅的主题详情)
php bin/kafka-client rpc metadata_topics
 -------------- ----------- ----------- --------------- -----------
  topic          partition   leader-id   replica-nodes   isr-nodes
 -------------- ----------- ----------- --------------- -----------
  kafka-swoole   2           1001        1001,1004       1001,1004
  kafka-swoole   1           1004        1004,1003       1004,1003
  kafka-swoole   3           1002        1002,1001       1002,1001
  kafka-swoole   0           1003        1003,1002       1002,1003
 -------------- ----------- ----------- --------------- -----------

消费者

php bin/kafka-client start

配置

通用

选项

FILE: config/common.yaml

# Your kafka version
kafka.version: 0.9.0.0
# This is for bootstrapping and the producer will only use it for getting metadata
# (topics, partitions and replicas). The socket connections for sending the actual data
# will be established based on the broker information returned in the metadata. The
# format is host1:port1,host2:port2, and the list can be a subset of brokers or
# a VIP pointing to a subset of brokers.
metadata.broker.list: "mkafka1:9092,mkafka2:9092,mkafka3:9092,mkafka4:9092"

# The producer generally refreshes the topic metadata from brokers when there is a failure
# (partition missing, leader not available...). It will also poll regularly (default: every 10min
# so 600000ms). If you set this to a negative value, metadata will only get refreshed on failure.
# If you set this to zero, the metadata will get refreshed after each message sent (not recommended)
# Important note: the refresh happen only AFTER the message is sent, so if the producer never sends
# a message the metadata is never refreshed
topic.metadata.refresh.interval.ms: 60 * 10 * 1000

# a string that uniquely identifies a set of consumers within the same consumer group
group.id: "kafka-swoole"

# Which you want to operation
# Example: topic1,topic2,topic3,topic4,topic5
topic.names: "kafka-swoole"

# The periodic time the heartbeat request is sent to tell kafka that the client still exists
heartbeat.interval.ms: 10 * 1000

# The maximum allowed time within a group for which no heartbeat is sent
# So this value must be greater than the heartbeat sending cycle:heartbeat.interval.ms
# (default: 30s)
group.keep.session.max.ms: 30 * 1000

# Select a strategy for assigning partitions to consumer streams. See ProtocolPartitionAssignmentStrategyEnum::class
# Range:
# RoundRobin:
# Sticky:
partition.assignment.strategy: "Range"

# highLevelApi the frequency in ms that the consumer offsets are committed to zookeeper
auto.commit.interval.ms: 10 * 1000

# smallest : automatically reset the offset to the smallest offset
# largest : automatically reset the offset to the largest offset
auto.offset.reset: largest

支持压缩协议

  • nomoral
  • gzip
  • snappy(如果您需要snappy压缩数据,您需要安装额外的php-ext-snappy扩展)

协议

  • 生产
  • 获取
  • 列出偏移量
  • 元数据
  • 领导者ISR
  • 停止副本
  • 更新元数据
  • 受控关闭
  • 提交偏移量
  • 获取偏移量
  • 查找协调器
  • 加入组
  • 心跳
  • 心跳
  • 离开组
  • 同步组
  • 描述组
  • 列出组
  • 列出组
  • Sasl握手
  • API版本
  • 创建主题
  • 删除主题
  • 删除记录
  • 初始化生产者ID
  • 偏移量领导者纪元
  • 将分区添加到事务
  • 将偏移量添加到事务
  • 结束事务
  • 写入事务标记
  • 事务偏移量提交
  • 描述ACL
  • 创建ACL
  • 删除ACL
  • 描述配置
  • 修改配置
  • 修改副本日志目录
  • 描述日志目录
  • Sasl认证
  • 创建分区
  • 创建委托令牌
  • 续签委托令牌
  • 过期委托令牌
  • 描述委托令牌
  • 删除组
  • 选举首选领导者
  • 增量修改配置

使用

这里的想法是基于API协议启动示例,而不是客户端API。例如,我们在这里启动ListOffsetsRequest请求。

$protocol = new ListOffsetsRequest();
$partitions = [];
array_push($partitions,
    (new PartitionsListsOffsets())->setPartition(Int32::value(0))
                                  ->setMaxNumOffsets(Int32::value(10))
                                  ->setTimestamp(Int64::value(time()))
);
$topics = [];
array_push($topics,
    (new TopicsListsOffsets())->setTopic(String16::value('caiwenhui'))
                              ->setPartitions($partitions)
);
$protocol->setRequestHeader(
    (new RequestHeader())->setApiVersion(Int16::value(ProtocolVersionEnum::API_VERSION_0))
                         ->setClientId(String16::value('kafka-swoole'))
                         ->setCorrelationId(Int32::value(ProtocolEnum::LIST_OFFSETS))
                         ->setApiKey(Int16::value(ProtocolEnum::LIST_OFFSETS))
);
$protocol->setReplicaId(Int32::value(-1));
$protocol->setTopics($topics);

$payload = $protocol->pack();
$n = $socket->send($payload);

$data = $socket->recv();
$protocol->response->unpack($data);
var_dump($protocol->response,$protocol->response->toArray()); // Here you can see the response protocol of the kafka service
/*
object(Kafka\Protocol\Response\ListOffsetsResponse)#46 (3) {
  ["responses":"Kafka\Protocol\Response\ListOffsetsResponse":private]=>
  array(1) {
    [0]=>
    object(Kafka\Protocol\Response\ListOffsets\ResponsesListOffsets)#68 (2) {
      ["topic":"Kafka\Protocol\Response\ListOffsets\ResponsesListOffsets":private]=>
      object(Kafka\Protocol\Type\String16)#72 (1) {
        ["value":protected]=>
        string(9) "caiwenhui"
      }
      ["partitionResponses":"Kafka\Protocol\Response\ListOffsets\ResponsesListOffsets":private]=>
      array(1) {
        [0]=>
        object(Kafka\Protocol\Response\ListOffsets\PartitionsResponsesListOffsets)#71 (3) {
          ["partition":"Kafka\Protocol\Response\ListOffsets\PartitionsResponsesListOffsets":private]=>
          object(Kafka\Protocol\Type\Int32)#79 (1) {
            ["value":protected]=>
            int(0)
          }
          ["errorCode":"Kafka\Protocol\Response\ListOffsets\PartitionsResponsesListOffsets":private]=>
          object(Kafka\Protocol\Type\Int16)#78 (1) {
            ["value":protected]=>
            int(3)
          }
          ["offsets":"Kafka\Protocol\Response\ListOffsets\PartitionsResponsesListOffsets":private]=>
          array(0) {
          }
        }
      }
    }
  }
  ["responseHeader":protected]=>
  object(Kafka\Protocol\Response\Common\ResponseHeader)#58 (1) {
    ["correlationId":"Kafka\Protocol\Response\Common\ResponseHeader":private]=>
    object(Kafka\Protocol\Type\Int32)#64 (1) {
      ["value":protected]=>
      int(2)
    }
  }
  ["size":protected]=>
  object(Kafka\Protocol\Type\Int32)#63 (1) {
    ["value":protected]=>
    int(33)
  }
}

array(3) {
  ["responses"]=>
  array(1) {
    [0]=>
    array(2) {
      ["topic"]=>
      string(9) "caiwenhui"
      ["partitionResponses"]=>
      array(1) {
        [0]=>
        array(3) {
          ["partition"]=>
          int(0)
          ["errorCode"]=>
          int(3)
          ["offsets"]=>
          array(0) {
          }
        }
      }
    }
  }
  ["responseHeader"]=>
  array(1) {
    ["correlationId"]=>
    int(2)
  }
  ["size"]=>
  int(33)
}
*/

环境

# APP
APP_NAME=kafka-swoole
## zh_CN,en_US
APP_LANGUGE=en_US

# SERVER
# The server just only receive rpc request select data in memory
SERVER_IP=127.0.0.1
SERVER_PORT=9501
SERVER_REACTOR_NUM=1
SERVER_WORKER_NUM=1
SERVER_MAX_REQUEST=50

# KAFKA_CLIENT_CONSUMER_NUM dynamically changes this parameter based on the partition of the subscribed topic
KAFKA_CLIENT_CONSUMER_NUM=2

# KAFKA_CLIENT
# Client Process
# KAFKA_CLIENT_API_MODE:"HIGH_LEVEL" / "LOW LEVEL"
# KAFKA_CLIENT_CONSUMER_NUM: Must be less than the maximum partition in topic
KAFKA_CLIENT_API_MODE=LOW_LEVEL

# REDIS/FILE/DIRECTLY
# If you choose "Directly" mode, the number of processing logical processes is equal to the minimum number of kafka client processes.
# The KAFKA_CUSTOM_PROCESS_NUM parameter is ignored.
# Make sure your consumption logic consumes as much data as possible, otherwise the rate of consumption will be lower than the rate of production.

# The process generated by KAFKA_CUSTOM_PROCESS_NUM gets messages from the storage medium
KAFKA_MESSAGE_STORAGE=REDIS

# Number of message processing processes
KAFKA_SINKER_PROCESS_NUM=2

# Which is your storage redis config
KAFKA_STORAGE_REDIS=POOL_REDIS_0

# Redis stores the persistent key
KAFKA_STORAGE_REDIS_KEY=${APP_NAME}:storage:redis:messages

# Redis persists the maximum number of messages
KAFKA_STORAGE_REDIS_LIMIT=40000

# Redis Pool
# `POOL_REDIS_NUM` is number,which begin offset is 0
POOL_REDIS_NUM=1
POOL_REDIS_0_MAX_NUMBER=5
POOL_REDIS_0_MAX_IDLE=3
POOL_REDIS_0_MIN_IDLE=0
POOL_REDIS_0_HOST=mredis
POOL_REDIS_0_PORT=60379
POOL_REDIS_0_AUTH=uXUxGIyprkel1nYWhCyoCYAT4CNCUW2mXkVcDfhTqetnYSD7
POOL_REDIS_0_DB=0
# other redis config ...

单元测试

以项目目录作为根目录。

php vendor/bin/phpunit tests/Protocol/
PHPUnit 7.5.16 by Sebastian Bergmann and contributors.

Runtime:       PHP 7.1.28
Configuration: /www5/kafka-swoole/phpunit.xml.dist

......................                                            22 / 22 (100%)

Time: 64 ms, Memory: 6.00 MB

OK (22 tests, 22 assertions)

参考