RocketMQ消费者消费消息流程简述
2025-02-12 09:13:09 0 举报
源码级别快速读懂RocketMQ消费者消费消息流程
作者其他创作
大纲/内容
1:实例化DefaultMQPushConsumer,设置消费者组
2:设置消费消息消费模式,从哪里开始消费
3:订阅指定的topic
4:注册监听器,用来监听消息,主要通过广播或者集群模式下的几种消费消息实现类,如集群模式下的顺序消费消息service、集群消息service
5:启动消费者DefaultMQPushConsumer
1:获得MQClientInstance mQClientFactory这个实例,先从本地map中获取,没有则实例化一个,并存入缓存中
2:调用start方法启动消费消息服务类,比如顺序消费消息实现类ConsumeMessageOrderlyService、并发消费消息实现类ConsumeMessageConcurrentlyService
3:将消费者注册到消费者组中,实际上是一个缓存map
4:调用start方法启动mQClientFactory(重要入口)
1:调用Start方法启动NettyRemetingClient客户端,开启 request-response channel
启动NettyRemetingClient客户端,开启 request-response channel
启动很多定时任务,都是用定时任务线程池封装的,如心跳机制
启动拉取消息服务PullMessageService线程,用来拉取消息的(②重要)
cpu调度run方法时,干了什么?
1:messageRequestQueue.take();从阻塞队列中拿到MessageRequest
2:执行pullMessage拉取消息
首先一大堆检查,如服务是否可用、消息大小阈值判断、计算拉取消息的偏移量等
其次调用pullKernelImpl方法来从broker异步拉取消息列表
主要是借助NettyRemotingClient客户端来异步回调批量拉取消息,从一个消息队列上,发送pullRequest请求,一次拉取的消息大小最大默认为32M
细节:封装PullMessageRequestHeader,然后创建RequestCommand:RequestCode.PULL_MESSAGE
最后broker服务端接收到该拉取请求后,进行处理,处理完成之后,通过异步回调接口,将消息返回
通过InvokeCallback回调接口的operationSucceed,拿到响应
调用processPullResponse处理响应response,主要是将响应进行解析,然后封装为PullResult对象
然后再调用回调接口PullCallback的onSuccess对pullResult进行处理
pullResult方法干了什么?
1:首先processPullResult处理pullResult,实际上是增强了pullResult,进一步封装
2:其次ProcessQueue调用putMessage方法(将消息存入treeMap中,自带顺序性,底层是红黑树结构)
1:其次ReadWriteLock treeMapLock,加写锁
2:循环遍历pullResult中的List<MessageExt>将MessageExt放入到这个TreeMap<Long, MessageExt> msgTreeMap中,key是队列中的偏移量,value为消息,同时设置dispatchToConsume为true
3:释放锁
3:提交消费请求consumeMessageService.submitConsumeRequest(实际上是将ConsumeRequest这个线程提交到线程池consumeExecutor中)
consumeExecutor线程池的参数如下:
corePoolSize:20个
maximumPoolSize:20个
keepAliveTime:1000 * 60
TimeUnit.MILLISECONDS
consumeRequestQueue:new LinkedBlockingQueue<>()
线程名:ConsumeMessageThread_consumerGroupTag
ConsumeRequest这个线程干了什么?(顺序消费消息)
run方法中
1:对当前消息队列上锁,final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);
2:synchronized (objLock),使用同步代码块,加锁
判断是不是广播模式或者processQueue的locked为true且没有过期,满足条件,则继续往下走
满足上面条件
判断集群模式下,该消息队列是不是被锁或者没有锁超时,没有被锁或者锁超时了,过会再加锁和重新消费tryLockLaterAndReconsume,延时10ms
获取最大能批量消费消息的大小,默认1次
通过处理队列,拿到这个队列上上的所有消息,List<MessageExt> msgs = this.processQueue.takeMessages(consumeBatchSize);
核心代码如下:加写锁->从msgTreeMap中pollFirstEntry获取并移除第一个元素,即消息->将消息存入List<MessageExt>中->释放锁
通过consumeLock继续加锁,这次加的是读锁
调用监听器的消费消息方法,messageListener.consumeMessage(Collections.unmodifiableList(msgs), context);自此消费者Consumer启动前注册的监听器就监听到了消息,设置自动提交标志位autoCommit为true,然后开始业务处理
消息消费成功后,释放锁
消息消费完成后,提交偏移量
如果消费不成功,比如超时,则提交一个SUSPEND_CURRENT_QUEUE_A_MOMENT,即阻塞当前队列一会,过会继续消费该消息,调用makeMessageToConsumeAgain,重新treeMapLock.writeLock().lockInterruptibly();对该队列加写锁,然后遍历每条消息,consumingMsgOrderlyTreeMap中移除当前的消息,重新往msgTreeMap中缓存消息,最后释放锁,接着submitConsumeRequestLater,重新提交消费请求
ConsumeRequest这个线程干了什么?(并发消费消息)
并发消息和顺序消费的区别点
①在从broker拉取完消息后,进行消息提交,即调用submitConsumeRequest方法
顺序消费消息:是将ConsumeRequest提交到线程池中,然后执行run方法,开始对消息队列进行加同步锁,然后从一个TreeMap<Long, MessageExt> msgTreeMap中pollFirstEntry,顺序拿到消息,接着调用监听器的消费消息方法,开始进行对消息的业务处理,如果消费成功,则提交偏移量,消费不成功,则会阻塞当前队列一段时间,过会继续重新将消息队列依次加入到上面的treeMap中,然后再延迟重新提交消费请求submitConsumeRequestLater
并发消费消息:①不用加锁,上来就获取批量消费消息大小,consumeBatchSize默认1次,然后用消息大小和consumeBatchSize批处理比较,如果消息大小小于等于consumeBatchSize,说明就一个消息,直接就将ConsumeRequest提交到线程池中,如果消息大小大于consumeBatchSize,那么则进行双层for循环处理
代码如下
②执行并发消费的ConsumeRequest中的run方法时
直接调用监听器的消费消息方法listener.consumeMessage(Collections.unmodifiableList(msgs), context);
默认自动提交偏移量
启动负载均衡服务rebalanceService线程,用来消费者通过负载均衡算法找到消息,并将消息最后存入到阻塞队列中,由PullMessageService来拉取消息(①重要)
首先来看看这个rebalanceService线程启动后,run方法
1:调用doRebalance开始进行负载均衡服务调用
2:再到rebalanceByTopic方法
首先选择一个负载均衡算法,包括随机、轮询、hash等
其次封装pullRequestList,然后开始转发该请求dispatchPullRequest(pullRequestList, 500);
dispatchPullRequest干了什么
1:遍历所有的PullRequest
2:针对每个PullRequest,都延时0.5秒后再执行executePullRequestImmediately方法
executePullRequestImmediately干了什么?
0 条评论
下一页