Kafka Consumer 消费架构
2022-06-16 16:28:14 0 举报
Kafka Consumer 消费架构
作者其他创作
大纲/内容
Selector.setSend
groupCoordinator.doSyncGroup
ConsumerNetworkClient.trySend
appendForGroup
是否 leader
Kafka Consumer
onJoinComplete
ConsumerCoordinator
ConsumerCoordinator.ensureCoordinatorReady
MemberMetadata :memberId:groupId
同一分组下的所有消费者实例
onJoinFollower
分配结果同步到broker
Selector poll 触发发送请求
ConsumerCoordinator.lookupCoordinator
创建内部主题__consumer_offsetsgetOrCreateInternalTopic
pollForFetches
自动提交offset
broker_3
coordinatorhost/ port
Utils.abs(groupId.hashCode) % groupMetadataTopicPartitionCountgroupMetadataTopicPartitionCount 默认50
JoinGroupResponseHandler
send
是
UnsentRequests span style=\"font-size: inherit;\
broker_2
addGroup
KafkaApis.handleSyncGroupRequest
OffsetCommitRequestData
根据 destination 获取,或者建立channel kafkaChannel
订阅分区校验
KafkaConsumer
tryConnect
responseCallback回调返回结果
maybeAutoCommitOffsetsAsync
doCommitOffsetsAsync
创建新的Group
KafkaApis.handleFetchRequest
若 groupmetadata 中,leaderId 为空,则当前申请加入分组的客户端当选为 leader
onJoinLeader
groupCoordinator.handleSyncGroup
consumer_offset 分区所在的broker
获取partition leader 所在的 broker
Selector.send
同步分组
commitOffsetsAsync
leaderIdmemberIdmembers.asJava
校验异常
返回加入分组后信息,包括是否是 leader
sendFindCoordinatorRequest
加入分组joinGroupIfNeeded
sendSyncGroupRequest
1:n
分区计算方法
计算consumer 所属的分区
拉取消息
元数据缓存
performAssignment
KafkaApis.handleOffsetCommitRequest
client 连接 coordinator
sendOffsetCommitRequest
获取订阅分配列表
否
Kafka Broker
分配的分区
GroupMetadataManager:groupMetadataCache
连接 partition 所在的 broker
consumer 根据分配策略,计算当前分组下,主题分区的监听分配
KafkaApis.handleJoinGroupRequest
GroupMetadata:groupId
KafkaApis.handleFindCoordinatorRequest
KafkaConsumer.sendFetches
broker_1
分区所在leader的节点 Node,请求的参数 FetchRequest,存入到 UnsentRequests
0 条评论
下一页