RocketMQ消费者消费消息流程
2025-02-12 19:55:14 0 举报
一文读懂RocketMQ消费者消费消息流程
作者其他创作
大纲/内容
计算从队列上拉取消息的偏移量
run方法
消费顺序
ReadWriteLock
key3
调用start()启动消费者
遍历每条消息
msgTreeMap不为空?
释放锁
RebalancePushImpl
TagA
完成
检查配置
此处可以看到消息还是通过拉的方式,并不是推模式,只是外面封装为push操作而已!!!!
实现类
TagC
for循环for (int i = 0; i < batchSize; i++) {
②
提交偏移量,offerSet加一
ConsumeMessageService
key5
付款
dispatchPullRequest
将偏移量存储对象load到本地缓存offsetTable中(广播模式)
获取订阅该topic的订阅数据
pullMessageAsync
推送
③
executePullRequestImmediately或executePullRequestLater
TagD
key2
实例化消息监听器对应的消费消息service
ProcessQueue
LocalFileOffsetStore
集群模式
key8
pullMessage(pullRequest)方法
DefaultMQPushConsumerImpl
pullStatus为Found?
key7
RebalanceImpl
pullAPIWrapper
遍历所有的pullRequestList
switch根据不同的serviceState走不同的逻辑
ConsumeRequest
①②③④
key6
SHUTDOWN_ALREADY
updateProcessQueueTableInRebalance
queueId:1
operationSucceed
MessageListenerOrderly顺序消息监听器
处理顺序消息:ConsumeMessageOrderlyService并发处理消息:ConsumeMessageConcurrentlyService
org.apache.rocketmq.example.quickstart.Consumer为例
synchronized同步锁加锁
订阅topic
communicationMode默认为ASYNC
RemoteBrokerOffsetStore
获取批量消费消息的最大大小,一次性最大拉多少消息消费
将ConsumeRequest线程提交到线程池consumeExecutor中
线程二
InvokeCallback
设置消息者组
orderId=15103111039
一
①
key9
设置消费者组设置消费模式设置消息队列分配策略设置MQClientInstance
返回List<MessageExt> msgs
这里消费者真正的接收到消息,进行业务处理!
START_FAILED
二
pullKernelImpl真正拉取消息的入口
重点关注
根据topic拿到消息队列集合根据消费者组拿到所有的消费者id选择一个负载均衡算法策略,默认是平均分配负载均衡算法
ConsumeMessageOrderlyService
拷贝订阅关系
创建
getOrCreateMQClientInstance和消息发送者启动流程一样,先从本地缓存中查询,查询不到,则创建,然后放入缓存中
构建PullMessageRequestHeader请求
构建ConsumeRequest(实际上是个线程!)
ReadWriteLock treeMapLock = new ReentrantReadWriteLock();加写锁treeMapLock.writeLock().lockInterruptibly();
pullCallback.onSuccess(pullResult);
ReadWriteLock consumeLock = new ReentrantReadWriteLock();consumeLock加读锁
RUNNING
doRebalance方法
pullRequest包含:①从这个mq上计算出要消费的下一个偏移量②消费者组③消息队列④处理队列
广播模式
orderId=15103117235
加写锁
key1
NettyRemotingClient
processPullResponse,返回一个PullResult
启动消费消息service(开启线程)
由于 TreeMap 是有序的,所以每次调用 pollFirstEntry() 都会移除并返回键值对中键值最小的条目,这对于需要按顺序处理元素的应用场景非常有用
和生产者启动流程一样
invokeAsync
拉取消息成功后的回调处理
key0
三
以消费顺序消息为例
fetchLockObject
①遍历所有的消息队列②构建pullRequest,然后放入pullRequestList
pullResult
DefaultMQPushConsumer
orderId=15103111065
Consumer
queueId:3
MQClientInstance
一二三
MQClientAPIImpl
CREATE_JUST(初始值)
该线程一直处于runnable状态
从font color=\"#e74f4c\
msgTreeMap = {TreeMap@3456} size = 7 0 = {TreeMap$Entry@5452} \"16\" -> \
从队列中take消息
queueId:2
实例化广播或者集群模式下的偏移量存储对象
processPullResult
线程一
key4
注册消息监听器,如MessageListenerConcurrently表示并发消费监听器
注册消费者到本地缓存consumerTable中
new ConsumeOrderlyContext(this.messageQueue);
开启重平衡线程RebalanceService
消息监听器广播模式集群模式
④
将pullRequest请求存入一个阻塞队列中LinkedBlockingQueue<MessageRequest> messageRequestQueue
这里和消息者启动前设置的消息监听器对应!
释放锁treeMapLock
开启拉取消息服务线程PullMessageService
RequestCode.PULL_MESSAGE
这里是真正的提交消费消息请求
MessageQueueLock
是
不断地从队列中拉消息
执行run方法
start方法
将PullResult中的List<MessageExt> msgFoundList这个消息集合putMessage中
rebalanceByTopic
takeMessages批量获取消息
PullMessageService(线程)
0 条评论
下一页