普通消息消费详细过程
2022-04-28 17:51:38 19 举报
rockemtq 普通消息消费的详细过程
作者其他创作
大纲/内容
25.将消息提交到ConsumeMessageService 内部的线程池 consumeExector 中
它是一个线程,直接看它的run()方法就可以了
4.创建消息拉取的核心类
6.1 启动消费者客户端和broker建立连接
26.ConsumeRequest#run()
37.如果发送到broker失败
从commitlog读取消息
PullAPIWrapper
22.PullCallback#onSuccess()
18.PullMessageProcessor#processRequest()
file
5.创建消费消息的服务类并启动
发送命令:RequestCode.CONSUMER_SEND_MSG_BACK参考第16步骤
是
PullResult中的消息是否empty
RebalanceService#start()
如果队列不为空
broker端接收
并发消费最大跨度偏移量是否大于2000?
MQClientAPIImpl#start()
41.SendMessageProcessor#asyncConsumerSendMsgBack()
startScheduledTask()
将消息按照 consumeBatchSize分割,分批次放入到 consumeExector 线程池中
否
对开发者直接暴露的消费者类
消费成功
24.ConsumeMessageService#submitConsumeRequest()
32.集群模式
异步拉取消息
9.启动 6.3步骤中的重平衡服务
14.构建回调对象PullCallback
开始
PullMessageService#start()
顺序消息:ConsumeMessageOrderlyService 其他消息:ConsumeMessageConcurrentlyService
10.调用内部的重平衡服务
则直接将PullRequest放入阻塞队列中,等待再次读取
12.准备拉取消息
再次放入阻塞队列中,这样就可以持续从queue中拉取消息了
如果队列为空
阻塞
在 6.2步骤中启动的定时任务中就有持久化任务,每隔5s执行一次
监听阻塞队列pullRequestQueue
13.流控处理:在去broker拉取消息之前,进行必要的流控处理,流控是用来保护消费者的,当消费者消费能力不够时,拉取速度太快会导致大量消息积压,很可能内存溢出
while (!this.isStopped()) { try { PullRequest pullRequest = this.pullRequestQueue.take(); this.pullMessage(pullRequest); } catch (InterruptedException ignored) { } catch (Exception e) { log.error(\"Pull Message Service Run Method exception\
将消费者的消费进度持久化到 broker 的 consumerOffset.json文件中
该方法中有一个比较熟悉的 pullBatchSize 参数,默认是32
准备拉取消息
15.PullAPIWrapper#pullKernelImpl()
19.MessageStore#getMessage()
它是步骤4中创建的拉取消息核心类
默认持久化到当前用户主目录下的.rocketmq_offsets/${clientId}/${group}/Offsets.json 文件中
29.处理消费结果
广播模式
20.CommitLog#getMessage()
将消息发送到broker
这里的分配好的意思是,我当前启动的客户端需要消费哪些队列已经确定了
35.持久化
1.DefaultMQPushConsumer#start()
它就是步骤14提到的回调对象
2.DefaultMQPushConsumerImpl#start()
6.start()
1.主要是设置新的topic2.重试次数+13.将原始topic放入properties属性中,留着后面解析消费
准备从broker拉取消息
消费失败
目的是获取这条信息的最初的完整信息
内部维护了一个 ThreadPoolExecutor 线程池,当拉取到消息后提交到线程池中进行消费
3.创建客户端实例
msgs.size()默认是32条,因为 pullBatchSize 默认是32,可以修改;consumeBatchSize 默认是1,取值于 consumeMessageBatchMaxSize, 默认是1,可以修改。他俩修改的值具有一定的相关性
内部的消费者实现类启动
Queue缓存的消息数是够超过1000?
顺便说一句,如果重试超过了默认的最大重试次数16,则这里的topic将替换为死信队列的topic
判断 PullResult是否为空
为什么要立即启动一次重平衡?因为要给consumer指定它消费的queue是哪些
msgs.size() <= consumeBatchSize
处理消息,将二制消息解码为java对象MessageExt,也会对消息进行tag过滤
39.MQClientAPIImpl#consumerSendMessageBack()
启动客户端实例
RequestCode.CONSUMER_SEND_MSG_BACK
PullCallback#onException()
public void dispatchPullRequest(List<PullRequest> pullRequestList) { for (PullRequest pullRequest : pullRequestList) { this.defaultMQPushConsumerImpl.executePullRequestImmediately(pullRequest); log.info(\
这里就比较熟悉了,我们生产消息都会来到这里
直接丢弃消息不进行重试
46.写入commitlog
Queue缓存的消息字节数是否超过100MB?
实际上在第24到之后步骤他就是一个异步任务了,36步骤并不是严格按照这个顺序,当进入第24步骤后,也就是进入线程池后,36步可能就开始执行了,大家看源码就知道,我这里画的并不是很好。在这里拉取消息时,也可以指定延迟多长时间后再去拉取消息,参数是 pullInterval
集群模式
6.2 启动各种定时任务,其中有一个是持久化消费者的消费进度offset
23.PullAPIWrapper#processPullResult()
21.解析成PullResult 对象
定时任务
CommitLog#asyncPutMessage()
44.构建新的消息体对象 MessageExtBrokerInner
DefaultMQPushConsumerImpl#sendMessageBack()
7.监听
38.了解如何发送到broker
8.客户端立即启动一次重平衡
MQClientInstance(mQClientFactory)
submit到线程池
5.1 start()
这个暂时忽略,当从broker拉取到消息后,它进行回调处理,后面再说,这里先跳过看15步
延迟3秒后将PullRequest重新放入阻塞队列中等待再次读取消费
30.check消费结果
42.构建重试topic%RETRY%consumerGroup
RebalancePushImpl#dispatchPullRequest()
MessageListenerConcurrently#consumeMessage()
40.SendMessageProcessor#processRequest()
17.MQClientAPIImpl#pullMessageAsync()
36.再次将PullRequest放入阻塞队列中
RebalanceImpl#doRebalance()
34.记录消费位点(就是记录该消费者消费当前queue到哪里了)
11.分配好之后,构建 PullRequest 对象,7中的阻塞队列监听的就是这个对象
16.MQClientAPIImpl#pullMessage()
DefaultMessageStore#asyncPutMessage()
43.设置延迟等级
刚开始启动这里肯定是空的,因为还没有往队列中放入数据,那么什么时候放入呢?请从第9步往后看
this.mQClientFactory#rebalanceImmediately()
根据queue和偏移量等信息,从commitlog中读取文件,共计读取32次消息,放到集合中返回
31.广播模式
ConsumeMessageService
28.消费者开始消费消息
consumeExecutor
33. 从本地缓存队列中移除已经消费的消息
45.准备写入消息
消息也不能丢弃,而是调用submitConsumeRequestLater方法,5秒钟后将失败的消息重新封装成ConsumeRequest提交到消费者线程池重新消费
0 条评论
下一页