【RocketMQ】消费流程Overview
2022-03-26 23:57:30 9 举报
rocketmq消费过程
作者其他创作
大纲/内容
submitConsumeRequest
updateOffset
queue1
pullRequest
waitForRunning(20)
TopicA
RebalanceByTopic
pullRequestQueue.take()doFlowControl()asyncPullMessage(32)
executePullRequestImmediatelly
queue2
ConsumeRequest是一个Runnable,这个任务中会调用listener处理消息1.判断这个ProcessQueue是不是被Drop2. 构造ConsumeConcurrentlyContext3. 用Context执行钩子函数4.
run()while(true) loop
NettyClientPublicExecutorThreadPool
ConsumeMessageConcurrentlyService
TopicB
msgList
ConsumeMessageBatchMaxSize1=default
executePullRequestLater
PullRequestService
processConsumeRequest()
RemoteBrokerOffsetStore
consumeMessageService.submitConsumeRequest
case NO_NEW_MSG;NO_MATCHED_MSG;
put to msg TreeMap
ConsumeRequestConsumeMessageThreadPool
ConsumeRequest.run()
Broker
listener.consumeMessage()
pullRequestQueue
dispatchPullRequest
case FOUND
doRebalance
0 条评论
下一页