ccinn/ kafka-swoole
此包已被废弃,不再维护。没有建议的替代包。
此包最新版本(dev-master)没有可用的许可证信息。
PHP的Kafka客户端
dev-master
2020-12-24 06:07 UTC
Requires
- php: ^7.1
- ext-json: *
- ext-posix: *
- ext-sockets: *
- ccinn/kafka-swoole-core: dev-master
Requires (Dev)
- swoft/swoole-ide-helper: dev-master
This package is auto-updated.
Last update: 2021-08-24 07:42:54 UTC
README
🌈首个支持多种压缩形式的php-kafka客户端
实现所有kafka协议,提供分别的'HighLevel'和'LowLevel'客户端API,并利用swoole实现协同和灵活扩展消费者客户端
如果您想通过贡献代码来帮助我加速进度,请通过以下邮箱联系我:471113744@qq.com
核心框架:kafka-swoole-core
安装
通过composer
version=dev-master;composer create-project ccinn/kafka-swoole kafka-swoole ${version}
通过git
git clone https://github.com/whiteCcinn/kafka-swoole.git && cd kafka-swoole && composer install
docker
docker build -t kafka-swoole:latest . docker run -it --name kafka-swoole -v $(PWD):/data/www kafka-swoole:latest bash
渲染
消费者组的成员
Topic:caiwenhui PartitionCount:1 ReplicationFactor:1 Configs: Topic: caiwenhui Partition: 0 Leader: 1004 Replicas: 1004 Isr: 1004
消费者组的多个成员
Topic:kafka-swoole PartitionCount:4 ReplicationFactor:2 Configs: Topic: kafka-swoole Partition: 0 Leader: 1003 Replicas: 1003,1002 Isr: 1003,1002 Topic: kafka-swoole Partition: 1 Leader: 1004 Replicas: 1004,1003 Isr: 1004,1003 Topic: kafka-swoole Partition: 2 Leader: 1001 Replicas: 1001,1004 Isr: 1001,1004 Topic: kafka-swoole Partition: 3 Leader: 1002 Replicas: 1002,1001 Isr: 1001,1002
- KAFKA_CLIENT_CONSUMER_NUM=2
- KAFKA_CLIENT_CONSUMER_NUM=4
命令
生产
php bin/kafka-client kafka.produce [options] [--] <message>
php bin/kafka-client kafka.produce --help Description: Send a message Usage: kafka.produce [options] [--] <message> Arguments: message The message you wish to send. Options: -t, --topic[=TOPIC] Which is the topic you want to send? -p, --partition[=PARTITION] Which is the topic you want to send to partition? -k, --key[=KEY] Which is the topic you want to send to partition by key? -h, --help Display this help message -q, --quiet Do not output any message -V, --version Display this application version --ansi Force ANSI output --no-ansi Disable ANSI output -n, --no-interaction Do not ask any interactive question -v|vv|vvv, --verbose Increase the verbosity of messages: 1 for normal output, 2 for more verbose output and 3 for debug Help: This command will help you send separate messages to a topic..
描述组
查看消费者组详情
Description: See consumer group details Usage: kafka.describeGroups [options] Options: -t, --topic=TOPIC Which topic is subscribed by the consumer group? -g, --group=GROUP Which consumer group? -h, --help Display this help message -q, --quiet Do not output any message -V, --version Display this application version --ansi Force ANSI output --no-ansi Disable ANSI output -n, --no-interaction Do not ask any interactive question -v|vv|vvv, --verbose Increase the verbosity of messages: 1 for normal output, 2 for more verbose output and 3 for debug Help: See consumer group details...
php bin/kafka-client kafka.describeGroups -t mulog_clean_24 -g kafka-swoole DescribeGruops-BaseInfo ======================= -------------- ------------ -------------- -------------- groupId groupState protocolType protocolData -------------- ------------ -------------- -------------- kafka-swoole Stable consumer Range -------------- ------------ -------------- -------------- --------------------------------------------------- -------------- -------------- ---------------- ----------- memberId clientId clientHost topcic paritions --------------------------------------------------- -------------- -------------- ---------------- ----------- kafka-swoole-44857c49-b019-439b-90dd-d71112b2c01e kafka-swoole /192.167.8.2 mulog_clean_24 0,1 --------------------------------------------------- -------------- -------------- ---------------- ----------- --------------------------------------------------- -------------- -------------- ---------------- ----------- memberId clientId clientHost topcic paritions --------------------------------------------------- -------------- -------------- ---------------- ----------- kafka-swoole-5714cd77-a0dd-4d29-aa20-718f9d713908 kafka-swoole /192.167.8.2 mulog_clean_24 2,3 --------------------------------------------------- -------------- -------------- ---------------- -----------
Rpc
支持实时获取运行时数据,通过RPC协议与AF_UNIX进程间通信进行交互
php bin/kafka-client rpc -h
Description:
Built-in runtime RPC command
Usage:
rpc <type>
Arguments:
type which you want to execute command?
Options:
-h, --help Display this help message
-q, --quiet Do not output any message
-V, --version Display this application version
--ansi Force ANSI output
--no-ansi Disable ANSI output
-n, --no-interaction Do not ask any interactive question
-v|vv|vvv, --verbose Increase the verbosity of messages: 1 for normal output, 2 for more verbose output and 3 for debug
Help:
The following are the built-in RPC command options:
kafka_lag
offset_checker
block_size
member_leader
metadata_brokers
metadata_topics
- kafka_lag(检查当前偏移量与kafka最大偏移量之间的总差异)
php bin/kafka-client rpc kafka_lag 1000
- offset_checker(查看当前偏移量和kafka服务中每个主题分区的偏移量和差异)
php bin/kafka-client rpc offset_checker -------------- ----------- ---------------- ------------------ ----------------- topic partition current-offset kafka-max-offset remaining-count -------------- ----------- ---------------- ------------------ ----------------- kafka-swoole 2 50223 50223 0 kafka-swoole 3 70353 70353 0 kafka-swoole 0 52395 52395 0 kafka-swoole 1 50407 50407 0 -------------- ----------- ---------------- ------------------ -----------------
- block_size(如果您使用间接模式的存储介质,可以使用此命令查看当前存储介质数量)
php bin/kafka-client rpc block_size 254
- member_leader(查看当前消费者组的Leader)
php bin/kafka-client rpc member_leader --------------------------------------------------- consumer-group-leaderId --------------------------------------------------- kafka-swoole-da43c9a0-b12d-46df-9941-ee80456ec9a2 --------------------------------------------------- --------------------------------------------------- consumer-group-membersId --------------------------------------------------- kafka-swoole-6080eb8e-3bfb-4be0-a923-037bb99a2666 kafka-swoole-da43c9a0-b12d-46df-9941-ee80456ec9a2 ---------------------------------------------------
- metadata_brokers(查看kafka服务的可用代理)
php bin/kafka-client rpc metadata_brokers --------- --------- ------ node-id host port --------- --------- ------ 1003 mkafka3 9092 1004 mkafka4 9092 1001 mkafka1 9092 1002 mkafka2 9092 --------- --------- ------
- metadata_topicss(查看订阅的主题详情)
php bin/kafka-client rpc metadata_topics -------------- ----------- ----------- --------------- ----------- topic partition leader-id replica-nodes isr-nodes -------------- ----------- ----------- --------------- ----------- kafka-swoole 2 1001 1001,1004 1001,1004 kafka-swoole 1 1004 1004,1003 1004,1003 kafka-swoole 3 1002 1002,1001 1002,1001 kafka-swoole 0 1003 1003,1002 1002,1003 -------------- ----------- ----------- --------------- -----------
消费者
php bin/kafka-client start
配置
通用
选项
FILE: config/common.yaml
# Your kafka version kafka.version: 0.9.0.0 # This is for bootstrapping and the producer will only use it for getting metadata # (topics, partitions and replicas). The socket connections for sending the actual data # will be established based on the broker information returned in the metadata. The # format is host1:port1,host2:port2, and the list can be a subset of brokers or # a VIP pointing to a subset of brokers. metadata.broker.list: "mkafka1:9092,mkafka2:9092,mkafka3:9092,mkafka4:9092" # The producer generally refreshes the topic metadata from brokers when there is a failure # (partition missing, leader not available...). It will also poll regularly (default: every 10min # so 600000ms). If you set this to a negative value, metadata will only get refreshed on failure. # If you set this to zero, the metadata will get refreshed after each message sent (not recommended) # Important note: the refresh happen only AFTER the message is sent, so if the producer never sends # a message the metadata is never refreshed topic.metadata.refresh.interval.ms: 60 * 10 * 1000 # a string that uniquely identifies a set of consumers within the same consumer group group.id: "kafka-swoole" # Which you want to operation # Example: topic1,topic2,topic3,topic4,topic5 topic.names: "kafka-swoole" # The periodic time the heartbeat request is sent to tell kafka that the client still exists heartbeat.interval.ms: 10 * 1000 # The maximum allowed time within a group for which no heartbeat is sent # So this value must be greater than the heartbeat sending cycle:heartbeat.interval.ms # (default: 30s) group.keep.session.max.ms: 30 * 1000 # Select a strategy for assigning partitions to consumer streams. See ProtocolPartitionAssignmentStrategyEnum::class # Range: # RoundRobin: # Sticky: partition.assignment.strategy: "Range" # highLevelApi the frequency in ms that the consumer offsets are committed to zookeeper auto.commit.interval.ms: 10 * 1000 # smallest : automatically reset the offset to the smallest offset # largest : automatically reset the offset to the largest offset auto.offset.reset: largest
支持压缩协议
- nomoral
- gzip
- snappy(如果您需要snappy压缩数据,您需要安装额外的php-ext-snappy扩展)
协议
- 生产
- 获取
- 列出偏移量
- 元数据
- 领导者ISR
- 停止副本
- 更新元数据
- 受控关闭
- 提交偏移量
- 获取偏移量
- 查找协调器
- 加入组
- 心跳
- 心跳
- 离开组
- 同步组
- 描述组
- 列出组
- 列出组
- Sasl握手
- API版本
- 创建主题
- 删除主题
- 删除记录
- 初始化生产者ID
- 偏移量领导者纪元
- 将分区添加到事务
- 将偏移量添加到事务
- 结束事务
- 写入事务标记
- 事务偏移量提交
- 描述ACL
- 创建ACL
- 删除ACL
- 描述配置
- 修改配置
- 修改副本日志目录
- 描述日志目录
- Sasl认证
- 创建分区
- 创建委托令牌
- 续签委托令牌
- 过期委托令牌
- 描述委托令牌
- 删除组
- 选举首选领导者
- 增量修改配置
使用
这里的想法是基于API协议启动示例,而不是客户端API。例如,我们在这里启动ListOffsetsRequest请求。
$protocol = new ListOffsetsRequest(); $partitions = []; array_push($partitions, (new PartitionsListsOffsets())->setPartition(Int32::value(0)) ->setMaxNumOffsets(Int32::value(10)) ->setTimestamp(Int64::value(time())) ); $topics = []; array_push($topics, (new TopicsListsOffsets())->setTopic(String16::value('caiwenhui')) ->setPartitions($partitions) ); $protocol->setRequestHeader( (new RequestHeader())->setApiVersion(Int16::value(ProtocolVersionEnum::API_VERSION_0)) ->setClientId(String16::value('kafka-swoole')) ->setCorrelationId(Int32::value(ProtocolEnum::LIST_OFFSETS)) ->setApiKey(Int16::value(ProtocolEnum::LIST_OFFSETS)) ); $protocol->setReplicaId(Int32::value(-1)); $protocol->setTopics($topics); $payload = $protocol->pack(); $n = $socket->send($payload); $data = $socket->recv(); $protocol->response->unpack($data); var_dump($protocol->response,$protocol->response->toArray()); // Here you can see the response protocol of the kafka service /* object(Kafka\Protocol\Response\ListOffsetsResponse)#46 (3) { ["responses":"Kafka\Protocol\Response\ListOffsetsResponse":private]=> array(1) { [0]=> object(Kafka\Protocol\Response\ListOffsets\ResponsesListOffsets)#68 (2) { ["topic":"Kafka\Protocol\Response\ListOffsets\ResponsesListOffsets":private]=> object(Kafka\Protocol\Type\String16)#72 (1) { ["value":protected]=> string(9) "caiwenhui" } ["partitionResponses":"Kafka\Protocol\Response\ListOffsets\ResponsesListOffsets":private]=> array(1) { [0]=> object(Kafka\Protocol\Response\ListOffsets\PartitionsResponsesListOffsets)#71 (3) { ["partition":"Kafka\Protocol\Response\ListOffsets\PartitionsResponsesListOffsets":private]=> object(Kafka\Protocol\Type\Int32)#79 (1) { ["value":protected]=> int(0) } ["errorCode":"Kafka\Protocol\Response\ListOffsets\PartitionsResponsesListOffsets":private]=> object(Kafka\Protocol\Type\Int16)#78 (1) { ["value":protected]=> int(3) } ["offsets":"Kafka\Protocol\Response\ListOffsets\PartitionsResponsesListOffsets":private]=> array(0) { } } } } } ["responseHeader":protected]=> object(Kafka\Protocol\Response\Common\ResponseHeader)#58 (1) { ["correlationId":"Kafka\Protocol\Response\Common\ResponseHeader":private]=> object(Kafka\Protocol\Type\Int32)#64 (1) { ["value":protected]=> int(2) } } ["size":protected]=> object(Kafka\Protocol\Type\Int32)#63 (1) { ["value":protected]=> int(33) } } array(3) { ["responses"]=> array(1) { [0]=> array(2) { ["topic"]=> string(9) "caiwenhui" ["partitionResponses"]=> array(1) { [0]=> array(3) { ["partition"]=> int(0) ["errorCode"]=> int(3) ["offsets"]=> array(0) { } } } } } ["responseHeader"]=> array(1) { ["correlationId"]=> int(2) } ["size"]=> int(33) } */
环境
# APP APP_NAME=kafka-swoole ## zh_CN,en_US APP_LANGUGE=en_US # SERVER # The server just only receive rpc request select data in memory SERVER_IP=127.0.0.1 SERVER_PORT=9501 SERVER_REACTOR_NUM=1 SERVER_WORKER_NUM=1 SERVER_MAX_REQUEST=50 # KAFKA_CLIENT_CONSUMER_NUM dynamically changes this parameter based on the partition of the subscribed topic KAFKA_CLIENT_CONSUMER_NUM=2 # KAFKA_CLIENT # Client Process # KAFKA_CLIENT_API_MODE:"HIGH_LEVEL" / "LOW LEVEL" # KAFKA_CLIENT_CONSUMER_NUM: Must be less than the maximum partition in topic KAFKA_CLIENT_API_MODE=LOW_LEVEL # REDIS/FILE/DIRECTLY # If you choose "Directly" mode, the number of processing logical processes is equal to the minimum number of kafka client processes. # The KAFKA_CUSTOM_PROCESS_NUM parameter is ignored. # Make sure your consumption logic consumes as much data as possible, otherwise the rate of consumption will be lower than the rate of production. # The process generated by KAFKA_CUSTOM_PROCESS_NUM gets messages from the storage medium KAFKA_MESSAGE_STORAGE=REDIS # Number of message processing processes KAFKA_SINKER_PROCESS_NUM=2 # Which is your storage redis config KAFKA_STORAGE_REDIS=POOL_REDIS_0 # Redis stores the persistent key KAFKA_STORAGE_REDIS_KEY=${APP_NAME}:storage:redis:messages # Redis persists the maximum number of messages KAFKA_STORAGE_REDIS_LIMIT=40000 # Redis Pool # `POOL_REDIS_NUM` is number,which begin offset is 0 POOL_REDIS_NUM=1 POOL_REDIS_0_MAX_NUMBER=5 POOL_REDIS_0_MAX_IDLE=3 POOL_REDIS_0_MIN_IDLE=0 POOL_REDIS_0_HOST=mredis POOL_REDIS_0_PORT=60379 POOL_REDIS_0_AUTH=uXUxGIyprkel1nYWhCyoCYAT4CNCUW2mXkVcDfhTqetnYSD7 POOL_REDIS_0_DB=0 # other redis config ...
单元测试
以项目目录作为根目录。
php vendor/bin/phpunit tests/Protocol/
PHPUnit 7.5.16 by Sebastian Bergmann and contributors. Runtime: PHP 7.1.28 Configuration: /www5/kafka-swoole/phpunit.xml.dist ...................... 22 / 22 (100%) Time: 64 ms, Memory: 6.00 MB OK (22 tests, 22 assertions)