Push消费流程
2021-06-09 19:51:55 41 举报
RocketMQ消费流程
作者其他创作
大纲/内容
pullRequest
RebalanceService
wake
pullRequestQueue.take()doFlowControll()asyncPullMessage(32)
rebalanceByTopic
executePullRequestLater
pullRequestQueue
NettyClientPublicExecutor ThreadPool
executePullRequestImmediatelly
ConsmeRequest.run()
Java Class
waitForRunning(20)
doRebalance
msgList
NOTIFY_CONSUMER_IDS_CHANGED
阻塞队列
红黑树
queue 2
submitConsumeRequest
消息
Topic B
Class
consumeMessageBatchMaxSize = 2
ConsumeRequestConsumeMessageThreadPool
consumeMessageService.submitConsumeRequest
queue 1
不同颜色代表不同的线程的方法调用
RemoteBrokerOffsetStore
dispatchPullRequest
processConsumeResult()
case FOUND:
Broker
updateOffset
PullMessageService
case NO_NEW_MSG:NO_MATCHED_MSG:
ConsumeMessageConcurrentlyService
put to msgTreeMap
consumeMessageBatchMaxSize = 1
listener.consumeMessage()
run()While(true) loop
Method Call
Topic A
ConsumeRequest
0 条评论
下一页