kafka cosumer架构
2020-08-25 10:31:44 0 举报
AI智能生成
kafka consumer架构
作者其他创作
大纲/内容
组件
poll()
通过acquireAndEnsureOpen()确保本对象是单线程进入,这是因为KafkaConsumer非线程安全
检查是否订阅了topic
检查是否订阅了topic
在主循环中通过pollForFetches()拉取一次消息。这个方法中先检查是否经存在拉取过的未加工消息,这是因为上一轮次拉取做了提前拉取处理。有可能已经拉取回消息等待处理。如果没有已拉取未加工数据,则准备新的拉取请求,网络IO拉取消息,加工拉取回来的数据。
如果上一步拉取到消息,并不会立即返回,而是再一次触发消息拉取,并且使用的是非阻塞方式,调用client.pollNoWakeup()。这样做的目的是,提前网络IO,把消息拉取请求发出去。在网络IO的同时,消息数据返回给consumer的调用者进行业务处理。这样做到了并行处理,提高了效率。等下次调用KafkaConsumer进行poll,当进行到第4步时,有可能直接返回了上轮次提前拉取到的消息,从而省去了网络IO时间。
kafka拉取消息图例
pollForFetches()
查看是否已经存在拉取回来未加工的消息原始数据,有的话立即调用fetcher.fetchedRecords()加工,然后返回
如果没有未加工的原始数据,那么调用fetcher.sendFetches()准备拉取请求
通过ConsumerNetworkClient发送拉取请求
加工拉取回的原始数据,返回
协调器(用来代替Zookeeper)
背景:
kafka引入协调器有其历史过程,原来consumer信息依赖于zookeeper存储,当代理或消费者发生变化时,引发消费者平衡,此时消费者之间是互不透明的,每个消费者和zookeeper单独通信,容易造成羊群效应和脑裂问题。
协调器
Server端:GroupCoordinator
每个broker启动的时候,都会创建GroupCoordinator实例,管理部分消费组(集群负载均衡)和组下每个消费者消费的偏移量(offset)
Consumer端:CosumerCoordinator
每个consumer实例化时,同时实例化一个ConsumerCoordinator对象,负责同一个消费组下各个消费者和服务端组协调器之前的通信
GroupCoordinator
(每个Broker启动时都会实例化一个)
(每个Broker启动时都会实例化一个)
负责工作
在与之连接的消费者中选举出消费者leader
下发leader消费者返回的消费者分区分配结果给所有的消费者
管理消费者的消费偏移量提交,保存在kafka的内部主题中
和消费者心跳保持,知道哪些消费者已经死掉,组中存活的消费者是哪些
组成组件
KafkaConfig:实例化OffsetConfig和GroupConfig
GroupMetadataManager:管理GroupMetadata,包括组员元数据
ReplicaManager:负责leader和follow的Replica管理
ZKUtils:Zookeeper获取内部Topic的分区元数据
DelayedJoin:监视消费者组所有消费者的加入
DelayedHeartbeat:监视所有消费者组的消费者与组协调器之间的心跳
ConsumerCoordinator
(每个KafkaConsumer都会实例化一个)
(每个KafkaConsumer都会实例化一个)
负责工作
更新消费者缓存的matadata
向组协调器申请加入组
消费者加入组后的相应处理
请求离开消费组
向组协调器提交偏移量
通过心跳,保持组协调器的连接感知
被组协调器选为leader的消费者的协调器,负责消费者分区分配。分配结果发送给组协调器
非leader的消费者,通过消费者协调器和组协调器同步分配结果
基础组件
HeartBeat
HeartBeatThread:消费者和组协调器的心跳检测
ConsumerInterceptor:拦截器
MetadataSnapshot:元数据快照
ConsumerNetworkClient:消费者网络层封装
OffsetCommitCallback:提交偏移量接口回调
PartitionAssignor:定义分区与消费者的分配策略
RangAssignor
RoundRobinAssignor
消费者入组过程
(由CosumerCoordinator和GroupCoordinator完成)
(由CosumerCoordinator和GroupCoordinator完成)
消费者偏移量维护
是以CosumerGroup+Topic+Partition来存储在kafka的,这就要求消费者要想kafka提交消费的偏移量
提交偏移量的方式
自动提交偏移量
设置 enable.auto.commit为true,设定好周期,默认5s。消费者每次调用轮询消息的poll() 方法时,会检查是否超过了5s没有提交偏移量,如果是,提交上一次轮询返回的偏移量。
这样做很方便,但是会带来重复消费的问题。假如最近一次偏移量提交3s后,触发了再均衡,服务器端存储的还是上次提交的偏移量,那么再均衡结束后,新的消费者会从最后一次提交的偏移量开始拉取消息,此3s内消费的消息会被重复消费。
手动提交偏移量
提交时机
poll()完立即提交
一旦处理到中间某条消息的时候异常,由于偏移量已经提交,那么出问题的消息位置到提交偏移量之间的消息就会丢失。
poll()完后等待消费完消息后再提交
有可能在处理到一半的时候发生再均衡,此时偏移量还未提交,那么再均衡后,会从上次提交的位置开始消费,造成重复消费。
提交方式
commitSync()是同步提交偏移量
只要没有发生不可恢复错误,会进行重试,直到成功
commitAsync()异步提交偏移量
只管提交,而不会等待broker返回提交结果
commitAsync不重试,是因为重试提交时,可能已经有其它更大偏移量已经提交成功了,
如果此时重试提交成功,那么更小的偏移量会覆盖大的偏移量。
那么如果此时发生再均衡,新的消费者将会重复消费消息。
commitAsync不重试,是因为重试提交时,可能已经有其它更大偏移量已经提交成功了,
如果此时重试提交成功,那么更小的偏移量会覆盖大的偏移量。
那么如果此时发生再均衡,新的消费者将会重复消费消息。
最佳方式
正常消费消息时,消费结束提交偏移量,采用异步方式
如果程序报错,finally中,提交偏移量,采用同步方式,确保提交成功
再均衡前的回调方法中,提交偏移量,采用同步方式,确保提交成功
如果程序报错,finally中,提交偏移量,采用同步方式,确保提交成功
再均衡前的回调方法中,提交偏移量,采用同步方式,确保提交成功
0 条评论
下一页