RockerMq从源码到跑路
2022-04-27 13:00:58 15 举报
RockerMQ,一款开源的分布式消息中间件,从源码到上线,历经千锤百炼。其设计理念注重可靠性、高吞吐量和低延迟,为各种业务场景提供了强大的支持。然而,随着时间的推移和技术的更迭,RockerMQ逐渐暴露出一些问题,如性能瓶颈、维护成本高等。为了追求更高的效率和更好的用户体验,开发者们决定对其进行优化和升级。经过一番努力,新版RockerMQ应运而生,它不仅继承了原版本的优秀特性,还在性能、稳定性和易用性上有了显著提升。最终,RockerMQ完成了它的使命,悄然离去,留下了一段传奇的历史。
作者其他创作
大纲/内容
defaultMQProducer
NameServer启动
BrokerServer启动
start(controller)
controller.start()
CommitLog.putMessage
启动之前初始化一大堆组件
NamesrvConfig()配置信息
CommitLogDispatcherBuildIndex
DefaultMQProducer
NettyServerConfig
addShutdownHook
初始化网络通信组件
核心组件
分析文件存盘的入口
this.sendKernellmpl实际发送消息
mappedFile.appendMessage写入文件(文件顺序写)
广播模式
TopicConfigManager管理Topic的功能组件
并行消费
this.defaultMQProducerOImpl.start()
messageStore的创建、加载、分发、存盘
this.serviceState服务状态在启动的时候是CREATE_JSUT
createBrokerController
brokersSent[times] = mq.getBrokerName();根据队列找到Broker
start
2个组件可以看到Broker和客户端建立了长连接
BuildIndex分发
DefaultMQProducerImpl.sendDefaultlmpl
flushCommitLogService的实现类:同步刷盘:new GroupCommitService()异步刷盘:new FlushRealTimeService()
注册几个核心配置对象
consuemMeassageSerivce
if(msg.getDelayTimeLevel() >0) 延迟消息写入对应的SCHEDULE_TOPIC_XXX的系统主题中
run()
AllocateMeassageQueueBycircle
mQClientFactory.start()
异步刷盘
BrokerController.this.registerBrokerAll定时注册心跳(分析路由注册的入口)
remothingServer处理netty请求
DefaultMQProducerImpl.send()
this.offsetStore.laod()
traceDispatcher.start()
RemotingServer
AllocateMeassageQueueAveragely
String brokerAddr = this.mQClientFactory.findBrokerAddresslnPublish(mq.getBrokerName()) 发送时会去NameServer同步路由信息本地缓存中找Broker,找不到级去NameServer请求
brokerOuterAPI.needRegister
deleteExpiredFiles
AllocateMeassageQueueRoomNearby
只能通过-c:指定配置文件-p -m 打印配置信息
offsetStore
NamesrvStartup
handleDiskFlush处理文件刷盘
集群模式
BrokerStartup
doRegisterBrokerAll
routeInfoManager.sacnNotActiveBroker()定期扫描不活跃的Broker
Consumequeue分发
main0
🔴routeInfoManager 路由管理组件
同步刷盘
treeDispatcher.start()消息轨迹相关
BrokerConfig
初始化业务功能组件
关闭定时任务
controller.initialize()
顺序处理
this.reputMessageService = new ReputMessageService()
AllocateMeassageQueueByMachineRoom
服务状态更变
-c:指定配置文件-p:指定属性值nettyServerConfig设置端口号
main
加载kv配置、定时打印、创建线程池、初始化NettyRemotingServer
ServiceState.RUNNING
NettyConfig
dispatcher.dispatch(req)
new BrokerContoroller
DefaultMessageStore.putMessage
对Topic的各种管理包括系统和业务的TOPIC
commitLogService.wakeup()
mQClientFactory
this.consumeMessageService.start()
AllocateMeassageQueueConsistentHash
consumeMessageConcurrentlyService.submitConsumeRequest
间隔1s
controller.shutdown()
DefaultPushConsumerImpl impl = (DefaultMQPushConsumerImpl)consumer;推模式的消费者最终还是会使用啦消息的方式 impl.pullMessage(pullRequest)
DefaultMQConsume
DefaultMessageStore.this.doDispatch(dispatchRequest)
各类线程池
32条一页
defaultMQPushConsumer
consumeMessageOrderlyService.submitConsumeRequest
关闭remothingServer
pullCallback
CleanConsumeQueueService
start(final NamesrvController controller)
注册服务关闭钩子
NettyServerConfig()网络配置信息
队列加锁
各种Send
pullMessageService.start拉消息服务
mappedfile映射commitLog文件
brokerOuterAPI 网络请求组件
consumer管理consumer的功能组件
根据状态,不重复的启动一堆实例
this.executeSendMessageHookAfter(context)消息发送完成之后执行钩子程序
this.consumeExecutor.submit(consumeRequest)
MessageStoreConfig
fastRemotingServer
service.putRequest(request)消息写入
ProducerManager管理Producer的功能组件
this.defaultMQPushConsumerImpl.start()
Producer启动
createNamesrvController
mQClientFactory.registerProducer注册Producer
rebalanceService.start负载均衡服务
flushCommitLogService.wakeup()
handleHA处理主从同步
请求到Broker的Netty服务接口
CommitLogDispatcherBuildConsuemeQueue
管理broker列表
cleanCommitLogService
this.tryToFindTopicPublishlnfo(msg.getTopic())根据Topic选择队列
状态变成RUNNING
AppendMessageCallback.doAppend
初始化mQClientFactory
ReblanceImpl.rebalanceByTopic
启动remothingServer
rebalanceImpl
this.mQClientFactory.getMQClientAPIImpl().sendMessage 发送消息
MQFaultStrategy.selectOneMeassageQueue()递增取模的方式选择messageQueue
AllocateMeassageQueueStrategy
PullRequest PullRequest = this.pullRequestUQueue.take()this.pullMessage(pullRequest)
this.doRequt()
删除过期失效的文件
收藏
收藏
0 条评论
下一页