Kafka消费和Offset提交流程
2021-09-01 20:43:48 6 举报
Kafka消费和Offset提交流程
作者其他创作
大纲/内容
更新偏移量
是
ListenerConsumer
ConsumerAcknowledgment
非自动提交 + 非Record AckMode
批量拉取数据pollTimeoutpollCount
acks:待Ack的Offset的阻塞队列
默认:AckMode=Batch,ConsumerMode=非Batch一、AckMode:Record(每条记录)、Batch(批量)、Time(时间)、COUNT(待Ack数量)、COUNT_TIME(次数或时间)、MANUAL(手动)、MANUAL_IMMEDIATE(手动立即提交)二、Spring kafka ConsumerModeBatch(批量)、非Batch(单个)
非手动立即提交
批量执行
和当前偏移量比,大于当前偏移量才会更新
是否为当前线程
Note:前提 isAutoCommit为false1)由Spring管理我们的自动提交2)手动提交(手动提交和手动立即提交)
AckMode = Record?
否
提交偏移量
循环执行
ConsumerBatchAcknowledgment
业务手动提交
1、KafkaListenerAnnotationBeanPostProcessor.postProcessAfterInitialization()2、KafkaListenerEndpointRegistrar.registerEndpoint() KafkaListenerEndpointRegistrar.createListenerContainer()3、AbstractKafkaListenerContainerFactory.createListenerContainer() ConcurrentKafkaListenerContainerFactory.createContainerInstance()5、ConcurrentMessageListenerContainer AbstractMessageListenerContainer实现了 SmartLifeCycle 生命周期方法 在Bean初始化后,其onStartUp方法如果返回True,就会调用其start()方法 ConcurrentMessageListenerContainer.doStart()
业务onMessage(List<Record>)
非自动提交 + 非任何Manual手动提交模式
获取偏移量
业务onMessage(Record)
是否MANUAL_IMMEDIATE
2、更新
获取records中最大偏移量记录
拉取阻塞队列数据
更新
循环拉取
1、拉取
Spring Batch模式提交(处理上个循环的记录待提交的Offset)
TIME和TIME_COUNT模式提交偏移量条件是否满足
加入阻塞队列
3、提交逻辑
Spring Batct模式提交
是否批量消费
非手动提交
AckMode非TIme 切非Cout、
0 条评论
下一页