rocketmq
2022-10-22 03:37:22 8 举报
登录查看完整内容
rocketmq源码分析
作者其他创作
大纲/内容
初始化,创建一堆线程池
NettyRemotingAbstract#processMessageReceived
BrokerOuterAPI#registerBrokerAll
CommitLog#putMessage
设置netty客户端
有响应
mQClientFactory.start()
CountDownLatch,交给线程池
mQClientAPIImpl.start()
main0(args)
ctx.writeAndFlush(response)
MQClientAPIImpl#getTopicRouteInfoFromNameServer
clientHousekeepingService.start()
开启定时任务
new BrokerConfig()
创建producer
MQClientAPIImpl#start
监听kv配置文件
无响应
this.tryToFindTopicPublishInfo
NettyServerHandler
NamesrvController.start()
ChannelFuture#writeAndFlush
MQClientInstance#updateTopicRouteInfoFromNameServer
MQClientManager.getInstance().getOrCreateMQClientInstance
创建钩子函数,在JVM退出时释放controller资源
回调
设置nameServer地址
BrokerController.this.registerBrokerAll
putMessage
consumer.setNamesrvAddr
刷盘
NettyRemotingServer#start()
controller.initialize()
NettyRemotingClient#invokeSync
创建controller
producer.start()
MQClientInstance#start
createNamesrvController
接收消息
BrokerController#start()
启动
pullRequestHoldService.start()
registerBrokerWithFilterServer
获取queue(负载均衡)
返回controller
MQClientInstance#registerProducer
NettyRemotingAbstract#scanResponseTable
NettyRemotingAbstract#invokeSyncImpl
BrokerStartup#main
pullMessageService.start()
checkConfig()
ChannelFuture sync = this.serverBootstrap.bind().sync()
NameServer启动入口
加载KV配置
发送netty请求
selectOneMessageQueue
new MessageStoreConfig()
获取路由信息
KVConfigManager#printAllPeriodically
new NettyClientConfig()
请求nameServer
返回路由信息
setNamesrvAddr
发送
this.topicPublishInfoTable.get(topic)
Listener#onChanged
向所有Broker发送心跳
发送nett请求
this.remotingClient.invokeSync
接收到消息
DefaultMQProducer#start
brokerOuterExecutor.execute
启动存盘任务
注册
获取client管理器
返回结果到客户端
this.defaultEventExecutorGroup = new DefaultEventExecutorGroup
defaultMQPushConsumerImpl.start()
defaultMQProducer.changeInstanceNameToPID()
brokerOuterAPI.start()
如果Channel为空,会执行this.bootstrap.connect,返回Channel
mQClientFactory.registerConsumer
创建线程池
producer.send
PullMessageService#start
创建并填充NamesrvConfig(nameServer业务参数)
start(controller)
push服务启动
scheduleAtFixedRate
KVConfigManager#load()
更新注册表
this.sendDefaultImpl
设置ip:port
addShutdownHook
注册broker
注册监听器,并处理监听到的消息
controller.start()
consumer.subscribe
通过netty发送消息,RequestCode.SEND_MESSAGE
创建客户端实例
this.offsetStore
DefaultMQProducerImpl#start()
rebalanceService.start()
注册请求处理器
startScheduledTask()
消费服务启动
向NameServer注册路由,并每隔10s发送心跳
设置订阅的Topic
每隔10分钟打印一次KV配置
remotingExecutor
consumer.start()
Broker启动
fileWatchService.start()
Producer发送消息
callback(response)
registerProcessor()
pull模式启动
channelRead0
consumeMessageService.start()
NettyRemotingClient#start
NettyRemotingServer#start
RouteInfoManager#scanNotActiveBroker
创建ServerBootstrap
Bootstrap
this.mQClientFactory.getMQClientAPIImpl().sendMessage
DefaultMQProducerImpl#send
设置producer名称为进程号
每隔10秒扫描一次broker,移除不活跃的broker
https://www.yuque.com/u21869650/al/bvv7kw#xelfa
processRequestCommand
Consumer启动
NamesrvStartup#main
doRegisterBrokerAll
ChannelPipeline.addLast
fastRemotingServer.start()
createBrokerController
Producer启动
执行启动controller
启动netty服务
new NettyServerConfig()
开启定时任务2
创建netty服务端
获取
consumer.registerMessageListener
创建remotingServer
服务本地注册
new DefaultMQPushConsumer
DefaultMessageStore#run
messageStore.start()
每隔1毫秒,读取commitLog转发到ConsumerQueue和IndexFile
创建Consumer
DefaultMQProducer
doReput()
registerBroker
this.sendKernelImpl
更新缓存
开启定时任务1
NettyRemotingAbstract#invokeOnewayImpl
创建并填充NettyServerConfig(nameServer网络参数)
扫描响应表
代码地址:
brokerAddrTable.put
DefaultRequestProcessor#processRequest
new NettyRemotingServer
sendHeartbeatToAllBroker
remotingServer.start()
缓存没有就从nameServer获取
返回结果
处理消费模式:集群|广播
mQClientFactory
NettyRemotingClient#invokeOneway
发送netty请求到NameServer
0 条评论
回复 删除
下一页