RocketMQ Consumer上报消费进度流程
2022-04-12 09:30:21 7 举报
RocketMQ Consumer上报消费进度流程
作者其他创作
大纲/内容
用topic@group作key
向消费进度管理组件提交消费进度
循环该实例上的每个消费者,分别调用各个消费者的持久化接口
获得一个broker地址,正常情况下都是master broker
startScheduledTask()
单向发送更新进度请求
使用消费进度组件持久化全部队列消费进度
启动持久化消费进度任务,默认每5s执行一次
Consumer
更新queueId对应的消费进度
map == null
false
start()
MQClientInstance
MQConsumerInner(DefaultMQPushConsumerImpl)
OffsetStore(RemoteBrokerOffsetStore)
以单向方式更新消费进度到broker
创建一个
从本地消费进度缓存offsetTable中遍历每个队列当前的消费进度
true第一次上报
updateConsumerOffset方法
ConsumerManageProcessor
ConsumerOffsetManager
Broker
构建更新队列消费进度的请求数据
从重平衡组件里获取该消费者分配的消息队列
Consumer启动时会同时启动上报消费进度的定时任务
用key从消费进度缓存表取出对应的消费队列和消费进度的map
0 条评论
下一页