RocketMQ延时消息流程
2023-02-07 09:10:33 10 举报
RocketMQ延时消息流程
作者其他创作
大纲/内容
存储之前,需要先对延时消息进行处理
创建一个新的消息,清除延时级别属性并恢复原来的主题和队列
备份真实主题和队列id,REAL_TOPIC为延时之前的topic,REAL_QID为延时之前的queueId
如果是普通消息,则是生产者发送时的topic和queueId, 如果时重试消息,则是重试的topic和queueId,在消费之前会再恢复成消息原本所属的topic
从起始消费进度开始解析出每个消息
之后就是消息的存储流程了
根据延时级别查找消费队列
在broker启动时就会启动该服务
判断延时消息到时间没,到时间就从commitLog取出来
CommitLog
ScheduleMessageService
DeliverDelayedMessageTimerTask(延时消息定时调度任务)
将新的消息存入commitLog并转发给原本主题队列中供消费者消费
设置topic为SCHEDULE_TOPIC_XXXX,queueId根据延时级别设置
创建定时任务,每隔10s持久化延时队列的消息消费进度
上面的其它情况则重新创建定时任务重新走流程判断
为每个延时级别创建一个定时器,1s后执行
根据偏移量和消息大小从commitLog文件中加载完整的消息
0 条评论
下一页