RocketMQ Consumer消费失败重试流程
2022-06-12 14:04:19 7 举报
RocketMQ Consumer消费失败重试流程
作者其他创作
大纲/内容
选择重试队列,默认就一个队列0
将消息的topic恢复成最原本的样子
如果消息重试次数已到达了最大重试次数 或者 延迟级别小于0,则再次改变主题为 \"%DLQ%\"+groupName
处理消费结果
设置topic为重试topic\"%RETRY%\"+groupName
消费者会从重试队列中拉取到消息
topic:TestTopicgroupName:TestGroup
ConsumeMessageConcurrentlyService
生成重试的topic(\"%RETRY%\"+groupName)
失败就重复上面的流程
由于发送重试消息不包含消息体,所以需要根据消息偏移量从commitLog取出原消息msgExt
向文件中写入该新设置的消息,如果是延时消息,则会再将主题改为 SCHEDULE_TOPIC_XXXX
Broker
延时消息处理逻辑(具体看延时消费逻辑)
获取broker地址,向broker同步发送重试消息
接收到重试消息,开始处理
Consumer
消费失败
创建一个新的消息msgInner,将重试的消息复制到新的消息中
SendMessageProcessor
将msgExt的原本主题(TestTopic)存到property中,key为\"RETRY_TOPIC\"
如果重试队列数量小于1,说明该消费组不支持重试,直接返回成功
如果没有延时,就将延时级别设为 3+已重试次数
自定义的消费逻辑
最终会将消息从延时队列存入重试队列
0 条评论
下一页