Kafka源码——KafkaConsumer核心流程(version-0.10.1.0)
2022-01-30 11:02:32 26 举报
KafkaConsumer核心流程
作者其他创作
大纲/内容
completedReceives(处理完了的响应)List<NetworkReceive>
NetworkReceive receive 接收到的响应(处理拆包,先读取4字节的数据,(代表的意思就是后面跟着的消息体的大小))
获取请求
发送请求,获取GroupCoordinator位置,发送心跳
5、下发消费方案给消费者
封装成completedFetch添加到缓存completedFetchs集合中
Consumer验证是否订阅消息,如果topic为空报错
得到返回响应
String idibroker的id
存储偏移量
发送请求
Broker1
封装出来一台服务器,coordinator服务器信息
nextInLineRecords = parseFetchedData(completedFetch);
clusterResourceListeners消费者组拦截器
maybePrepareRebalance(group)
发送分区消费方案给coordinator
responseQueues(response.processor).put(response) for(onResponse <- responseListeners) onResponse(response.processor)
一个分区一个分区去读取的
获取到对应的segment对象
尝试修改ISR列表
如果开启自动提交偏移量,判断是否超时,则异步提交
拉取消息
val fetchRequest = request.requestObj.asInstanceOf[FetchRequest]
唤醒KafkaClient 发送网络请求
从本地的磁盘里面去读取日志信息
拿到返回相应
1、确保当前是AUTO_TOPICS或者AUTO_PATTERN订阅模式确报coordinator创建完成,如果没有创建向服务器发送请求开始创建
当成功找到coordinator后,那么就要发起加入group的请求,表示该customer是该组的成员,coordinator会收到该请求会选举一个customer的leader
JoinGroupResponseHandler
最终的更新所有的replica的LEO的值
如果当前获得消息的PartitionRecords是为空或者已经拉起完成,则需要从completedFetches队列重新获取completedFetche并解析成PartitionRecords
获取到请求
val messageSetSize = readResult.info.messageSet.sizeInBytes
val offsetsTopicMetadata = getOrCreateGroupMetadataTopic(request.securityProtocol)
3、制定消费方案
SocketChannel
响应
subscriptions.partitionsAutoAssigned() && coordinatorUnknown()
client.pollNoWakeup();
ensureCoordinatorReady()
获取到所有的replica
clientId客戶端ID
Fetcher 拉取消息
selector
创建SyncGroupRequest封装消费方案
java.nio.channels.Selector nioSelectorjavaNIO里面的Selector,Selector是负责网络的建立,发送网络请求,处理实际的网络IO
找到GroupCoordinator并连接
handleHeartbeatRequest(request)
KafkaConsumer
6、按照消费者方案从leader partition所在的主机消费数据
if (needRejoin())
获取最大的数目
acquire();
pollOnce(timeout);
Send(实现类ByteBufferSend) 发送出去的请求
心跳业务处理
poll()
heartbeat.sentHeartbeat(now);
client.poll(future);
leader partition维护了这个partition的所有的replica的LEO值
log.read
startHeartbeatThreadIfNeeded();
把响应存入到了一个队列里面
调用read方法
回调
发送网络请求
fetchedRecords()
Set<TopicPartition> partitions = new HashSet<>(response.responseData().keySet());
ConsumerNetworkClient
onSuccess
Broker3
group.id消费者组ID
topic1 leader partition0
fetchResponseCallback(0)
添加memberId
Node node = this.client.leastLoadedNode();
Kafka Server---ApiKeys.GROUP_COORDINATOR(Coordinator选举)
GroupID
1、注册
var entry = segments.floorEntry(startOffset)
如果设置的是自动commit,会周期性的异步提交offset
ByteBuffer buffer数据内容(this.buffer = ByteBuffer.allocate(receiveSize))
GroupCoordinatorResponseHandler
ConsumerCoordinator 消费者组管理
发送joinGroup请求
制定分区方案
val partition = coordinator.partitionFor(groupCoordinatorRequest.groupId)
replicaManager.fetchMessages(×××)
获取到leader partition
Kafka Server---ApiKeys.JOIN_GROUPleader consumer选举
while (recordsRemaining > 0)
就要更新ISR列表
2、consumer加入缴费者组
completedFetch != nullnextInLineRecords 是拉取消息的本地缓存数据,缓存数据中completedFetche并解析成PartitionRecords
joinGroupIfNeeded();
返回响应给消费者
this.nioSelector.wakeup();
把封装的请求暂时缓存
NetworkClient管理broker的连接状态
fetcher.sendFetches();
在超时时间内不断轮询
ensureCoordinatorReady();
原生NOI
核心逻辑
GroupCoordinatorResponse groupCoordinatorResponse = new GroupCoordinatorResponse(resp.responseBody());
trySend(now);
drained存储拉取的分区数据
组件调用
见:Kafka源码——KafkaProducer核心流程(version-0.10.1.0)
heartbeat消费者心跳检测
封装coordinator信息
N
发送一个注册的请求
执行完成
循环消费数据
sendResponseCallback
handle
doAutoCommitOffsetsAsync();
Kafka Server---ApiKeys.OFFSET_COMMIT(自动提交偏移量)
if (leaderId == null)
更新follower的LEO的值
调用副本管理器以追加组消息,将消息追加到分区的leader副本,并等待他们被复制到其他副本;* callback函数将在超时或满足所需ack时触发
this.subscriptions.hasNoSubscriptionOrUserAssignment()
通过segment读取磁盘上面的数据
初始化KafkaConsumer
SelectionKey
最终的更新的代码
制定分区方案把分区方案发送给coondinator
heartbeatThread = new HeartbeatThread(); heartbeatThread.start();
consumer.subscribe(Collections.singletonList(this.topic));
Utils.abs(groupId.hashCode) % groupMetadataTopicPartitionCount
Main线程
从缓存拉取数据返回数据
handleJoinGroupRequest(request)
List<String> disconnected没有建立连接的主机
拉取消息成功响应
ensureActiveGroup();
CompletedFetch completedFetch = completedFetches.poll();
就是谁先注册上来,谁就是leader consumer
val offsetCommitRequest = request.body.asInstanceOf[OffsetCommitRequest]
return found;
为消费者分配一个memberId
Consumer
获取数据
findCoordinatorFuture = sendGroupCoordinatorRequest(node);
获取到leader partition的HW的值
leastLoadedNode()
给coonrdinator提交
int offset = this.randOffset.nextInt(nodes.size());
通过log对象去读取数据
this.selector.wakeup();
lookupCoordinator()
autoCommitEnabled配置是否进行自动提交偏移的标志位
logEndOffset = logReadResult.info.fetchOffsetMetadata
Selector基于java NIO里面的selector去封装的
leader
ConsumerCoordinator
Kafka Server---ApiKeys.HEARTBEAT(Consumer向Coordinator发送心跳)
add 入队
group synchronized(加锁)进行leader customer选举
服务端拉取数据
metricConfig監控配置
List<String> connected完成建立连接的主机
是否要做负载均衡
生成一个MemberMetadata
2、指定Leader Consumer
extends
随机找一个连接的节点
client.wakeup();
Set<SelectionKey> immediatelyConnectedKeys
订阅了主题
ConsumerNetworkClient基于NetworkClient的封装
updateIsr(newInSyncReplicas)
records = fetcher.fetchedRecords();
consumer = new KafkaConsumer<>(props)
完成对之前调用还未处理的请求
封装响应
delayedOffsetStore.foreach(groupManager.store)
AbstractCoordinator包含了组件的管理功能
maybeAutoCommitOffsetsAsync(now);
Fetcher拉取消息
for(;;)
更新对应的consumer的上一次的心跳时间
分区算法
RequestFuture<Void> future = sendOffsetCommitRequest(offsets);
//时间轮机制,把往时间轮里面插入一个任务,这个任务就是用来检查心跳是否超时的。 //11:00:00 心跳 11:00:30 //去看最后30秒是否有心跳超时的。
ConsumerCoordinator消费者组管理
KafkaConsumer是线程不安全的同时只能允许一个线程执行,这里判断如果有多线程同时使用一个KafkaConsumer会抛出来异常
transportLayer封装SocketChannel,实现类PlaintextTransportLayer
val memberId = clientId + \"-\" + group.generateMemberIdSuffix
相同的消费者和ID
return sendSyncGroupRequest(request);
设置心跳的时间
val group = groupManager.addGroup(new GroupMetadata(groupId))
coordinator.poll(time.milliseconds());
Broker2
实现方法
handleOffsetCommitRequest(request)
val groupCoordinatorRequest = request.body.asInstanceOf[GroupCoordinatorRequest]
return client.leastLoadedNode(time.milliseconds());
joinFuture = sendJoinGroupRequest();
ByteBuffer[] buffers(write的数据)
随机值
handleFetchRequest()
最后一步如果拉取到了消息,接下来会再拉取一次消息,这样做的目的是提高网络IO,把消息拉取请求发出去,在网络IO的同时,消息数据返回给consumer的调用者进行业务处理,这样做到了并行处理,提供效率
member.latestHeartbeat = time.milliseconds()
心跳检测
topic1 leader partition1
里面就是发送提交偏移量的请求
开启心跳线程
poll
触发执行注册的监听offset提交
最后返回的就是这样的一个对象,FileMessageSet,里面封装的就是我们要读取的信息
metadata元数据
joinResponse.isLeader()
初始化joinGroup请求
Kafka Server---ApiKeys.FETCH(消费者拉取数据/Leader、Follower同步数据)
invokeCompletedOffsetCommitCallbacks()
future.complete(null);
client.pollNoWakeup();
client.poll(future);
实例化
缓存Group元数据信息
valueDeserializervalue的发序列化
获取对应的groupCoordinator的分区
replicaManager.appendMessages把接收到的数据追加到磁盘上面
Kafka 集群
发送此刻所有可以发送的请求
开始创建计算出来哪台服务器是coondinator服务器了
onJoinLeader(joinResponse).chain(future);
发送的是GROUP_COORDINATOR的请求 Kafkaapis
4、发送给ConsumerCoordinator
val leaderHW = leaderReplica.highWatermark
调用KafkaClient做进一步的网络交互
firePendingCompletedRequests();
val replica = getReplica(replicaId).get
发送SYNC_GROUP请求
返回__consumer_offsets对应的partition元数据信息
leastLoadedNode()
if (nextInLineRecords == null || nextInLineRecords.isEmpty()) {
封裝响应 node信息
如果memberId是UNKNOWN_MEMBER_ID表示customer是一个新生的customer,执行添加成员并进行balance操作
TransportLayer transportLayer封装了SocketChannel、SelectionKey
Y如果上一次分区缓存中的数据已经拉取完,直接中断本次循环拉取,并返回空的消息列表,直至有缓存数据为止
sendFetches
网络入口
消费数据
int recordsRemaining = maxPollRecords;
发送ApiKeys.FETCH请求
Reactor封装
coordinator来处理提交偏移量
List<String> failedSends建立连接失败的主机
判断是否已经找到Coordinator
RequestFuture<ByteBuffer> future = initiateJoinGroup();
keyDeserializerkey的反序列化
onSuccess()
HeartbeatThread.run
在发送请求
第一次进来的时候leaderid可定是null的
ByteBuffer size其实就是一个int类型的大小(int receiveSize = size.getInt())
如果缓存没数据发送网络请求从服务端拉取数据
NetworkClient(KafkaClient 管理broker的连接状态)
autoCommitIntervalMs每隔多久提交一次偏移量信息(消费offset)
handleGroupCoordinatorRequest
0 条评论
下一页