RocketMQ 顺序消费(简化版)
2022-02-21 18:20:08 8 举报
RocketMQ 顺序消费(简化版)
作者其他创作
大纲/内容
false
用消息队列和处理队列构建消费请求提交到消费线程池
创建一个新的ProcessQueue与该消息队列对应(locked=false)
释放lockConsume
获得一个消息队列拉取请求
原先队列中不包含新分配的队列
锁成功
锁定失败的,处理队列locked=false
processQueue.isLocked并且没有过期
会有后台任务定时向broker发送锁队列请求
去拉取消息,将拉取到的消息全部放到processQueue
释放消费锁lockConsume
从processQueue获取消费锁lockConsume
后台线程每隔20s向broker发送锁定分配到的消息队列的请求
是
pullRequestQueue.take()获取一个消息队列拉取请求
run
processQueue.isLocked()
停止原先队列消息消费(ProcessQueue.Dropped=true)
消费
异步
每隔20s执行一次重平衡
循环取出消息
根据负载均衡策略重平衡,获得该消费者新的队列集合
获取processQueue中的消费锁lockConsume
true
成功锁定的,处理队列locked=true
processQueue.isDropped()
后台锁broker队列
新队列中不包含原先的队列
失败
向broker同步发送解锁该消息队列的请求
拉取并消费流程
重平衡流程
向broker同步发送锁队列请求
synchronized(消息队列锁对象)
成功
0 条评论
下一页