rocketMQ-4-负载均衡-消费者
2023-08-03 17:27:52 0 举报
rocketMQ消息分配给消费者的负载均衡策略详解,全网唯一!!!架构师必会!
作者其他创作
大纲/内容
impl
ConsumerManageProcessorgetConsumerListByGroup()根据group获取consumerIdListcode:GET_CONSUMER_LIST_BY_GROUP
3.pullMessage(pullRequest)无限循环拉取消息请求(发给broker)
开启
messageQueue5
(3)新增
2.RebalanceImpl.updateProcessQueueTableInRebalance()
consumerId1
pullRequestQueue
getConsumerGroupInfo()
PullMessageService拉取消息服务
RebalanceImpl.rebalanceByTopic()遍历当前客户端所有订阅的topic(集群消费为例)做负载均衡:默认使用平均算法执行分配
messageQueue3
messageQueue2
算法
registerConsumer()
TopicA 负载均衡算法分配的mqSet
消费者启动DefaultMQPushConsumerImpl.start()
messageQueue7
topicSubscribeInfoTable根据topic获取mqSet
AllocateMessageQueueAveragely.allocate()先排序,再用默认的平均分配策略
consumerId2
messageQueue1
(2)超时,移除
消费者负载均衡全流程(messageQueue和consumerId分配,基于DefaultMQPushConsumerImpl实现)
AllocateMessageQueueAveragely.allocate()分均分配
ClientManageProcessorheartBeat()注册消费者code:HEART_BEAT
messageQueue8
put
messageQueue4
this.rebalanceService.start()启动负载均衡服务线程:1.每20s执行一次消费端负载均衡mqClientFactory doRebalance()
NettyRequestProcessor接口processRequest()
MQClientInstance客户端实例启动
messageQueue6
不变
更新指定topic,分配新的mqSet对应的processQueueTable
老的processQueueTable
broker
take
this.pullMessageService.start();启动拉起消息服务线程
(1)多余,移除
get
put添加进待拉取请求队列
ConsumerManager
0 条评论
下一页