RocketMQ 顺序消费
2022-02-21 18:19:20 19 举报
RocketMQ 顺序消费
作者其他创作
大纲/内容
是
已经获得Broker的锁并且没有过期
转发到死信队列
processQueue.isLocked()
更新本地消费进度
处理消费结果
如果消息队列没被锁住或锁过期(默认超过30s过期)了,则该消费者锁住该消息队列,如果已被锁住并且没有过期,则锁队列失败
①
限流
启动时,每隔20s向broker发送锁定该消费者分配到的消息队列的请求
正常的拉取流程,参考拉取的那张流程图
锁定失败的,处理队列locked=false
成功
调用重平衡组件,真正开始执行重平衡逻辑
成功锁定的,处理队列locked=true
获取消息队列锁对象
pullMessage
持久化该MessageQueue的消费进度后删除本地消费进度缓存
RebalanceLockManager
释放消费锁lockConsume
则跳过该队列,回到①处继续处理下一个队列,但该队列已经不能继续被消费了
新队列中不包含原先的队列
用消息队列和处理队列构建消费请求提交到消费线程池
true
消费状态
对队列信息和消费者客户端id排序
Broker
。。。
是否到达最大重试次数
PullMessageService(拉取线程)
DefaultMQPushConsumerImpl
PullCallback#onSuccess
ConsumeMessageOrderlyService
释放消费锁
消息重新放回processQueue,挂起消费队列一会儿,稍后继续消费
失败
从随机的一个broker中获取该消费组中所有的消费者客户端IDcidAll
对比原先的队列和当前新分配的队列
如果该消费队列是第一次拉取消息,则先计算拉取偏移量
RebalanceService(重平衡线程)
MQClientInstance
RebalanceImpl
RebalancePushImpl
设置拉取时间
获取该topic对应的队列信息mqSet
移除该消息队列removeUnnecessaryMessageQueue
根据负载均衡策略重平衡,获得该消费者新的队列集合
消息拉取成功后重新将PullRequest加入到pullRequestQueue,继续该队列的下一次消息拉取
遍历订阅的每个topic,对每个topic中的队列都重新负载均衡
向broker同步发送解锁该消息队列的请求
计算从哪开始拉取
停止原先队列消息消费(ProcessQueue.Dropped=true)
run()
ConsumeRequest
拉取的流程跟并发消费区别在这一步判断
消费之前先获取processQueue中的消费锁lockConsume
原先队列中不包含新分配的队列
从内存中删除该消息队列的消费进度
pullRequestQueue是否为空
doRebalance(boolean isOrder)
调用自定义的消费逻辑
遍历每个消费组的消费者实例,分别执行重平衡
判断processQueue.isDropped()
是否成功锁住队列
获取该消费组下消息队列锁定情况
从processQueue获取消费锁lockConsume
拉取消息
pullRequestQueue.take()
空,则阻塞等待
根据并发消费数循环的从某个processQueue中顺序取消息,直到processQueue中的消息都消费完了或该线程消费时间限制到了
将拉取请求延迟3s后再重新放回阻塞队列
将消息队列和处理队列提交给顺序消费服务
将拉取到的消息全部放到processQueue
doRebalance()
向broker同步发送锁队列请求
创建新的PullRequest放到拉取任务队列中
否
创建一个新的ProcessQueue与该消息队列对应(locked=false)
获取该消费队列锁定记录,如果被该消费者锁定,则删除锁定记录
每隔20s执行一次重平衡
清空consumingMsgOrderlyTreeMap,返回消费进度
0 条评论
下一页