普通消息消费过程
2022-04-25 21:09:50 9 举报
普通消息消费过程
作者其他创作
大纲/内容
rebalanceImmediately()
启动消费者
MQClientInstance
1.发送请求获取NameServerAddr2.启动netty客户端3.启动各种定时任务4.启动消息拉取服务5.启动重平衡服务
DefaultMQPushConsumer
start()
2.start()
循环监听阻塞队列 pullRequestQueue,提交了拉取请求就取broker拉取消息1.重平衡,分配了新的Queue 2.单次拉取结束后(流控)
DefaultMQPushConsumerImpl
subscribe订阅topic
启动消费者实现类
updateTopicSubscribeInfoWhenSubscriptionChanged
创建拉取消息的核心类
启动客户端实例
checkClientInBroker() SQL92过滤表达式上传到broker编译
copySubscription()
启动消费服务
拷贝订阅关系到RebalanceImpl,Consumer在重平衡时需要用到,除了拷贝给定的Topic订阅关系,Consumer还会自动订阅ConsumerGroup的重试队列
ConsumeMessageService
加载消费进度
PullMessageService
PullAPIWrapper
OffsetStore
启动消息拉取服务
new
1.subscribe()
创建ConsumeMessageService并启动,如果是有序消息,创建ConsumeMessageOrderlyService,并发消费创建ConsumeMessageConcurrentlyServiceConsumeMessageService是一个线程池,消息拉取服务拉取到消息后,会构建ConsumeRequest对象交给线程池调度执行
1.校验GroupName 2.校验消费模式:集群/广播 3.校验ConsumeFromWhere 4.校验开始消费的指定时间 5.校验AllocateMessageQueueStrategy 6.校验订阅关系 7.校验是否注册消息监听 8.校验消费线程数 9.校验单次拉取的最大消息数 10.校验单次消费的最大消息数
1.构建订阅关系对象 SubscriptionData 2.将订阅关系保存到Map中
checkConfig()
rebalanceImmediately()自身马上重平衡一次
注册监听器registerMessageListener
根据消息消费模式,创建对应的消息存储实例广播:LocalFileOffsetStore集群:RemoteBrokerOffsetStore
0 条评论
下一页
为你推荐
查看更多