rocketmq源码(八)——推模式Consumer消费消息
2024-09-17 11:25:23 5 举报
RocketMQ的推模式Consumer消费消息过程主要包括以下几个核心步骤:首先,Consumer订阅指定的Topic。接着,Broker端收到推送消息后,将消息放入对应的Topic和Queue中。随后,Consumer通过PullRequestHoldService组件从Broker拉取消息,并放入对应的Queue中。最后,Consumer后台线程池从对应的Queue中获取消息,并在成功消费后发送ack消息给Broker,表示消息已被成功消费。这一过程实现了消息的实时推送和消息的可靠传输。
作者其他创作
大纲/内容
获取topic路由信息this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic)
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
new
main()
this.pullMessage(pullRequest);
new ClientRemotingProcessor
registerConsumer()
RemotingCommand
new
ConsumeMessageConcurrentlyService
this.remotingClient.start();
从consumerTable中拿到启动时注册的消费者MQConsumerInner impl = entry.getValue();
this.nettyClientConfig = nettyClientConfig; this.channelEventListener = channelEventListener;
开启线程this.rebalanceService.start();
NO_NEW_MSG
broker服务端
this.namespace = namespace;
创建ProcessQueueProcessQueue pq = new ProcessQueue(); pq.setLocked(true);
回调pullCallback.onSuccess(pullResult);
this.copySubscription();
负载均衡器,客服端请求消息入口RebalanceImpl rebalanceImpl = new RebalancePushImpl(this)
消费者处理消息默认实现类this.defaultMQPushConsumerImpl = defaultMQPushConsumerImpl;
this.clientRemotingProcessor = clientRemotingProcessor;
pullRequest
start();
定时线程定期清理过期消息cleanExpireMsg();
new RemoteBrokerOffsetStore
producer生产者send(msg)
true
RemoteBrokerOffsetStore
this.messageListener = messageListener;
this.rebalanceImpl.doRebalance(this.isConsumeOrderly());
拿到消费者MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());
NettyRemotingClient
开启一些定时任务startScheduledTask()其中一个,定时获取topic路由信息MQClientInstance.this.updateTopicRouteInfoFromNameServer();
this.mQClientFactory = mQClientFactory;
consumerTable(ConcurrentMap)
this.pullMessageService = new PullMessageService(this);
拿到topic下的消息队列Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
netty的handler处理服务端响应new NettyClientHandler()
注册并发消息监听器consumer.registerMessageListener(new MessageListenerConcurrently())
设置消息监听器this.messageListener = messageListener;
将请求转换为RemotingCommand类型RemotingCommand request = RemotingCommand.createRequestCommand( font color=\"#e74f4c\
ClientRemotingProcessor
设置消费位置consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
开启消费者客服端mQClientFactory.start();
拿取消息成功,返回给消费者response.setCode(ResponseCode.SUCCESS);response.setBody(r);
new MQClientAPIImpl
run()
立即唤醒reblance服务,请求消息this.mQClientFactory.rebalanceImmediately();
start()
new ConsumeMessageConcurrentlyService
推模式获取消息DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(\"CID_JODIE_1\");
this.defaultMQPushConsumer = defaultMQPushConsumer;
registerMessageListener
responseCallback.callback(response);
更新topic路由、messagequeue、Broker信息this.updateTopicSubscribeInfoWhenSubscriptionChanged();
设置消息监听器this.messageListenerInner = messageListener;
//响应客服端请求ctx.writeAndFlush(response);
消费者组this.consumerGroup = consumerGroup;
消费者请求消息超时时间this.pullTimeDelayMillsWhenException = defaultMQPushConsumer.getPullTimeDelayMillsWhenException()
启动消费者consumer.start();
PullAPIWrapper
ManyPullRequest mpr = this.pullRequestTable.get(key);
RebalancePushImpl
MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener;
mQClientFactory.registerConsumer()
异步处理this.consumeExecutor.submit(consumeRequest);
将请求放入pullRequestQueuethis.pullRequestQueue.put(pullRequest);
this.consumerGroup = consumerGroup;
producer发送消息到服务端,持久化成功后调用ReputMessageService.doReput();
consumer消费者客服端
impl.doRebalance();
消费者消费queue分配策略this.allocateMessageQueueStrategy = new AllocateMessageQueueAveragely()
this.mqClientFactory = mqClientFactory;
this.rebalanceService = new RebalanceService(this);
将请求通过netty客服端channel发送出去channel.writeAndFlush(request)
拿到下一次拉取消息的偏移量offset = this.rebalanceImpl.computePullFromWhereWithException(pullRequest.getMessageQueue());
Switch pullStatus
this.clientRemotingProcessor = new ClientRemotingProcessor(this);
MQClientAPIImpl
impl.pullMessage(pullRequest);
List<PullRequest> requestList = mpr.cloneListAndClear();
拿到订阅主题final String topic = entry.getKey();
this.consumeRequestQueue = new LinkedBlockingQueue<Runnable>();
return response;
new PullMessageService
this.dispatchPullRequest(pullRequestList);
再次将请求发个请求队列this.pullRequestQueue.put(pullRequest);
PullMessageService
new PullAPIWrapper
new DefaultMQPushConsumerImpl
没有消息返回对应coderesponse.setCode(ResponseCode.PULL_NOT_FOUND);
启动this.consumeMessageService.start();
DefaultMQPushConsumer
注册消息监听器到默认实现类this.defaultMQPushConsumerImpl.registerMessageListener(messageListener);
new DefaultMQPushConsumer
线程run()
创建PullRequest拉取消息请求PullRequest pullRequest = new PullRequest();
开启线程this.pullMessageService.start();
默认消费者组绑定队列策略:平均
订阅主题consumer.subscribe(\"TopicTest\
AllocateMessageQueueAveragely
pullRequestQueue(LinkedBlockingQueue)
DefaultMQPushConsumerImpl
FOUND
触发消息到达事件DefaultMessageStore.this.messageArrivingListener.arriving(dispatchRequest.getTopic()
RebalanceService
PullRequest pullRequest = this.pullRequestQueue.take();
向消费者推送消息channel.writeAndFlush(response)
new RebalanceService
this.rebalanceService.wakeup();
msg
subscribe
启动消费者this.defaultMQPushConsumerImpl.start();
this.mQClientAPIImpl.start();
doRebalance(final boolean isOrder)
while (!this.isStopped())
组装请求PullMessageRequestHeader requestHeader = new PullMessageRequestHeader();
DefaultMQPushConsumerImpl推模式消费者模式实现
new MQClientInstance
rocketmq消费者启动成功后,发起拉取消息入口this.mqClientFactory.doRebalance();
RemotingCommand response = responseFuture.getResponseCommand();
this.waitForRunning(waitInterval);
key = group
RequestCode.PULL_MESSAGE
根据策略拿到所有符合条件group和topic下的消息队列
new AllocateMessageQueueAveragely()
MQClientInstance
拿到分配消息队列的策略(平均)AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;
. . . . . .
0 条评论
下一页