rocketMQ-2-启动流程
2023-08-03 16:44:52 0 举报
带你从模块级、核心步骤去看RocketMQ的启动流程。
作者其他创作
大纲/内容
ExecutorService remotingExecutor初始化远程通信执行器
MQClientInstance.start()启动客户端实例
初始化RPC请求钩子RPCHook
mQClientFactory.rebalanceImmediately()
根据配置的通信模式,注册监听器:当证书变化时,重载ssl context
mQClientAPIImpl.start():remotingClient.start()启动远程客户端
1.并发消费:延迟15m后,每过15分钟清理一次超时未处理完消息2.顺序消息:延迟1s后,每20s锁一次broker中mq队列+消费者端ProcessQueue
startScheduledTask()开启定时任务,1)延迟10ms后,获取每2分钟更新一次nameserver地址2)延迟10ms后,每30s去nameserver获取最新topic路由3)延迟1s后,每30s移除一次下线的broker+给所有broker发送心跳(版本号)4)延迟10s后,每5s持久化“消费offset”到broker/本地。5)延迟1ms后,每1分钟,根据待消息数,动态调整消费者线程池核心数。(被阉割,目前是空)
consumeMessageService.start()初始化消费消息服务,并启动
brokerOuterAPI.start():remotingClient.start()启动OuterAPI专用远程通信客户端
checkConfig()校验配置
初始化9种线程池执行器
pullAPIWrapper创建pull请求包装器
服务状态更新为RUNNING
1.BrokerController.initialize()初始化
核心步骤
rebalanceImpl创建消费端负载均衡
messageStore启动消息存储服务:reputMessageService.start():
consumer消费者启动流程
start
DefaultMQPushConsumerImpl.start()
producer生产者启动流程
filterServerManager.start()启动过滤服务
初始化AccessValidator
brokerStatsManager.start()启动状态管理器
NamesrvStartup.start()
初始化事务相关服务、监听器
brokerFastFailure.start()启动快速失败,清除过期请求
BrokerStartup.start()
MQClientInstance创建客户端实例
初始化NettyRemotingServer远程通信服务端
通信流
NameServer名字服务启动流程
printAllPeriodically定时任务:1s后,每10s周期打印kv配置参数
pullRequestHoldService.start()开启请求长轮训服务(持有5S请求)
offsetStore初始化偏移量
构建NettyRemotingServer初始化netty通信实例
fileWatchService.start()启动SSL证书监听服务
RocketMQ启动流程
updateTopicSubscribeInfoWhenSubscriptionChanged()
pullMessageService.start()开启守护线程,执行拉取消息
注册SSL证书变化监听器
copySubscription复制订阅信息
mQClientFactory.sendHeartbeatToAllBrokerWithLock()
载入各种配置文件(topic配置、消费偏移量、订阅群组、消费过滤)
remotingServer.start()启动远程通信服务端(netty)
broker启动流程
rebalanceService.start()启动负载均衡器
mQClientFactory.checkClientInBroker()
1.NamesrvController.initialize()初始化
定时任务:每30s向nameserver注册所有broker
启动客户端实例
DefaultMQProducerImpl.start()
2.BrokerController.start()
广播模式:client存储集群模式:broker存储
registerProcessor():注册处理器
DefaultMQProducerImpl().start()启动消息生产者
mQClientAPIImpl.fetchNameServerAddr()获取nameserver地址
客户端实例状态更新为RUNNING
初始化DefaultMessageStore消息存储器
kvConfigManager.load()加载kv配置
fastRemotingServer.start()启动VIP远程通信服务端(netty)
synchronized
初始化9个定时任务
2.NamesrvController.start()启动
0 条评论
下一页