RocketMQ源码分析_草稿图
2024-03-04 21:46:53 3 举报
这是一份草稿图,展示了RocketMQ的源码分析。图中详细标注了RocketMQ的主要组件和模块,包括NameServer、Broker、Producer、Consumer等。每个组件都用不同的颜色进行了区分,以便于理解和记忆。此外,图中还标注了一些关键的代码逻辑和数据流,如消息的发送和接收过程,以及各种消息过滤和处理策略。这张草稿图是学习RocketMQ源码的重要参考,可以帮助开发者快速理解其工作原理和设计思想。
作者其他创作
大纲/内容
保存消费位点
Java进程
5
#2
启动通信模块服务
4
对比已同步位点
CommitLog
Yes:返回Pull结果
带Tag的消息
异步转存服务
执行本地事务
初始化位点管理器,并且加载消费位点
1
start()
initialize()
本地事务是否成功
索引数据
按照Offset Pull消息
Push消费启动流程
返回生产者存储成功
启动rebalance服务
Tag过滤流程
1.保存前执行Hook2.解析请求参数3.死信重试消息处理4.保存后执行Hook5.返回结果
启动消费者实例
同步刷盘
handleHA()
消费消息
消费者
#5.4
业务消费代码
启动消费者实现类
启动Pull服务
通知刷盘请求处理结果
发送消息
等待offset同步完成锁
映射物文件
文件内容按Page保存
consumeMessage()
Half消息存储(此时不会被消费)
BrokerStartup启动
置为服务启动失败状态
拷贝订阅关系
查找CommitLog中的消息体内容
CommitLog异步刷盘成功后
启动
No
初始化Checkpoint文件
加载全部ConsumeQueue文件
Span=(N+K)-N
订阅
ClassRebalancePushImpl
s
DefualtMQProducer
....
将生产者状态置为启动失败
1.同步Topic配置2.同步消费位点信息3.同步延迟位点信息4.同步消费订阅信息
Master Broker
00000000236222320128
#1
MappedByteBuffer
config
SendMessageProcessor.java
2
0000000000000
Yes
初始化消息拉取服务
Broker异常退出检查
执行全部刷盘
拉取数目是否满足
文件内容
./bin/mqshutdownRocketMQ环境设置
NamesrvStartup.java
返回消息
发送消息请求
拉取消息
.....
Bloom过滤器是否通过
6789
启动各种存储服务
./bin/mqnamesrv
doCommit()
4字节
HAService
createTempFIle()
第1个子文件
MappedFileQueue
下一轮询
send()
commitlog
RMQ_SYS_TRANS_OP_HALF_TOPIC
查询消息
进程正常关闭时
Push消费流程
Pull线程
异步刷盘
20200420204115925
Consume Queue添加索引成功
#5
用户态内存
服务是否停止
NO
零拷贝读写文件流程
保存刷盘请求
Tag的Hash值
取消之前发送的Half消息(此时不会被消费)
abort文件创建流程
CommitLog复制流程
操作系统Page Cache
20200421055715499
是否有新消息
轮询全部队列Pull消息
shutdown()
doReput()
物理CommitLog文件2
Direct Memory-Page Cache读写分离
Yes:返回Pull结果
DefualtMQPullConsumer
SQL过滤是否通过
结束
执行刷盘
预读
DefaultMQProducer
文件映射-写
interfaceMQConsumer
ClassRemoteBrokerOffsetStore
一个消费者实例下线后Rebalance过程
发送Half消息
是否有推荐的Broker id
extends
返回全部消息
ClassRebalamcePullImpl
MappedFile实例2
RocketMQ Broker
生产者启动流程
MQClientAPIImpl
磁盘
Broker关闭流程
配置数据同步流程
Broker
线程等待刷盘请求处理结果通知
拉取前进行check
Consume Queue刷盘成功
N
N+1
N+2
N+K
20200421140640517
unregister_client
构造函数初始化实例
发送刷盘请求
同步完成,等待被唤醒
消息返回给消费者消费
编译SQL
调用
./bin/runbroker.sh
Pull
主要的发送逻辑:1.发送前check2.选择Queue发送3.可靠发送4.发送结果处理
拉取消息并进行消费
DefualtMQPushConsumerImpl
返回客户端消息存储结果
临时存储
文件映射-读
processRequest()
执行转存
putMessage()
BrokerController.java
1.恢复TopicQueue关系数据2.纠正ConsumeQueue最小消息位点信息
消息对象
GroupCommitService
DefaultMQProducerImpl
pullMessage()
BrokerStartup.java
appendMessage()
interfaceMQAdmin
更新logincsMsgTimestamp(最后一条写Consume Queue消息的存储时间)
CommitLog查询消息
保存消息体
#5.1
Commit/Rollback
启动生产者实例
注册生产者
notifyTransferSome()
DefaultMessageStore
MQClientInstance
拉取配置信息
Master BrokerOuterAPI
启动消息拉取服务
初始化Pull消费者
invokeOneWay
生产者
持久化消费位点
ClassLocalFileOffsetStore
默认生产者组名转换为进程id
Hash槽位
存储成功
lock
临时存储消息
yes
pull请求
8字节
删除aobrt文件
置为服务启动状态
Master Slave读写分离
启动RebalanceService服务
设置JVM参数
关闭各种存储服务、刷盘服务
call
Header
启动BrokerController
业务消费消息
关闭各个服务1.启动Broker监控服务2.启动Client探活服务3.关闭长轮询服务4.关闭远程通信服务Server5.关闭VIP通信服务Server6.关闭messageStore服务7.关闭定时任务8.取消注册Broker-to-Namesrv9.关闭消息发送处理线程池10.关闭Pull请求处理线程池11.关闭Admin管理请求处理线程池12.关闭brokerOuterAPI服务13.持久化消费位点14.关闭Filter Server服务15.关闭快速失败服务16.持久化消费过滤进度信息
加载延迟消息的位点信息
subscriptionGroup.json
Push消费者拉取消息过程
记录异常关闭日志
恢复全部数据
线程池
checkpoint
RemotingClient
启动失败
循环
0
synchornizedswapRequests()
classDefaultPullConsumer
DefaultMessageStore.java
Pull 消息
无消息
abort
处理同步刷盘请求
返回拉取结果
异常恢复CommitLogrecoverAbnormally()
启动脚本
DefualtMQProducer构造函数初始化实例
调用发送方法
启动Push服务
发送失败
查找Consume Queue中位点对应的PageCache
第2个子文件
加载全部ConsumeQueue数据
更新indexMsgTimestamp(被刷盘的indexFile的最后更新时间)
带SQL的订阅信息
...
Filter Server
回查服务
NamesrvController.java
设置同步发送
3
全部Queue
topics.json
Pull Consumer
YES
同步刷盘过程
20200420134806913
Slave BrokerController
#5.3
开始
更新当前Slave已同步位点
消息成功存储到映射文件
等待转存时间间隔
创建abort文件
消息物理位点
设置超时时间
物理CommitLog文件N
更新physicMsgTimestamp(最后一条刷盘的CommitLog存储时间)
CommitLog同步刷盘成功后
sendKernelImpl()
初始化Pull接口包装类
查询Topic全部Queue
执行全部转存
启动各种定时任务
1.将消费者状态置为启动失败2.消费者参数校验3.默认消费者组名转换为进程id
查找Consume Queue
处理关闭消费者实例
main0()
HA
记录Checkpoint信息
同步刷盘服务线程
提交消费位点
向Pull接口包装类注册消息过滤器
消息不返回消费者
HAConnection
MessageListener
恢复ConsumeQueue
执行一次rebalance
KV配置加载、初始化通信层,初始化各个定时任务
ClassGroupCommitService
interfaceRunnable
Slave Broker
doAppend()
6
本地注册消费者
业务层
DefualtMQProducerImpl
B、C、D消费者实例
生产者参数校验
notify_consumer_ids_changed
delayOffset.json
线程
MappedFile实例M
加载IndexFile索引
RocketMQ Client
BrokerController
映射物理文件
直接内存(DM)
存储消息线程
通知复制
Broker保存消息流程
JVM Shutdown Hook
启动命令
consumeOffset.json
获取Topic全部队列信息
Java堆内存
链表
syncall()
Abstract ClassRebalanceImpl
链表下一个节点指针
处理同步复制
通过createBrokerController()方法解析命令行参数,初始化全部配置:1.初始化Remoting配置2.初始化Broker配置3.初始化Broker存储配置4.通过JVM Hook关闭进程
启动Client实例
执行过滤方法
全部CommitLog是否已创建索引
命令行参数处理启动配置文件加载
设置消息对消费者可见
appendMessagesInner()
拉取并消费消息
registerProducer
CommitLog.java
是否成功
异步实时刷盘
异步定时刷盘
数据一致性
中
低
高
数据可靠性
数据可用性
系统吞吐量
唤醒异步刷盘线程
时间差
忽略该消息
准备消息对象
事务消息发送过程
A消费实例
拉取消息、Master高负载判断
1.保存Topic配置2.保存消费位点信息3.保存延迟位点信息4.保存消费订阅关系
load()
Broker启动流程
Direct Memory
GroupTransferService
MappedFile实例1
BrokerStartup JVM关闭hook
waitForRunning(10)
是否发送成功
ConsumeMessageService
./bin/runserver.sh
关闭命令
FlushRealTimeService
初始化Push消费者
MappedFile实例N
Topic
No:Rollback消息
Index File刷盘成功
no
1234
Yes:Commit消息
implements
shutdown
内核态内存
consumequeue
存储消息
./bin/mqbroker
#3
index
waitForFlush()等待同步Slave结果
发送网络请求,发送消息
interfaceMQPullConsumer
是否异常退出
正常恢复CommitLogrecoverNormally()
文件头
物理CommitLog文件M
无Hash碰撞
ClassServiceThread
Namesrv启动流程
copy
消息发送流程
消费方式/对比项
Push
备注
是否需要主动拉取
理解分区后,需要主动拉取各个分区消息
自动
Pull消息灵活;Push使用更简单
位点管理
用户自行管理或者主动提交给Broker管理
Broker管理
Pull自主管理位点,消费灵活;Push位点交由Broker管理
Topic路由变更是否影响消费
否
Pull模式需要编码实现路由感知,Push模式自动执行Rebalance,以适应路由变更
Pull消费启动流程
是否初始化启动
Index Data
保存消费进度
main()
注册本地路由信息
PageCache
putRequest()
等待刷盘时间间隔
执行方法start()
物理CommitLog文件1
sendDefaultImpl()
CommitRealTimeService
SlaveACK的数据位点是否大于Master上标记的位点
执行方法start()1.启动messageStore服务2.启动远程通信服务Server3.启动VIP通信服务Server4.启动BrokerQuterAPI服务5.启动长轮询服务6.启动Client探活服务7.启动FilterServer服务8.注册Broker-to-Namesrv9.启动自动注册Broker-to-Namesrv定时任务10.启动Broker监控服务11.启动快速失败服务
抛出程序异常
发送request封装
消费者拉取消息
interfaceOffsetStore
Pull请求
执行方法main()
获取刷盘参数
#5.2
ClassCOmmitRealTimeService
消息Key的Hash值
在Consume Queue中找到下一条消息物理位点、消息大小、Tag的Hash值
CREATE_JUST
DefaultMQPullCOnsumerImpl
PullMessageService
FilterServer过滤流程
submitConsumeRequest()
启动网络通信模块
设置重平衡服务参数
功能点
Zookeeper
Namesrv
角色
协调者
配置保存
持久化到磁盘
保存内存
是否支持选举
是
强一致
弱一致,各个节点无状态,互不通信,依靠心跳保持数据一致
是否高可用
设计逻辑
支持ZAB选举,逻辑复杂难懂,排查问题较难
CRUD,仅此而已
Pull消费流程
存储恢复过程
Thread.sleep(10)
HashSlot
notifyTransferObject.wakeup()
消息不返回给消费者
初始化定时任务
DefaultMQPushConsumerImpl
调用初始化方法
消费
查询Queue的消息
初始化、启动Push消费服务
ConsumeQueue
invokeAsync()
invokeSync()
ClassFlushRealTimeService
获取转存参数
保存前校验1.存储服务关闭或者设置不可写时,不能写2.Slave Broker不能写3.Topic、扩展信息太大不能写4.PageCache忙不能写
是否被Tag的Hash过滤掉
客户端
WriteSocketService将请求的offset之后的数据发送给Slave
Hash碰撞
#4
abort文件删除流程
正常同步刷盘
服务名
功能
SlaveSynchronize
Slave从Master同步配置数据的服务
Slave从Master同步CommitLog数据
Slave连接信息
HAConnection.WriteSocketService
Master:将CommitLog写入网络,发送给SlaveSlave:上报本地offset的请求
HAConnection.ReadSocketService
Master:读取Slave发送的offset请求Slave:读取Master发送过来的CommitLog数据
HAClient
Slave处理与Master通信的客户端封装
同步复制时,提供新数据通知服务
AcceptSocketService
Master接受Slave发送的上报offset请求的服务
初始化Push消费者实例
MappedFile.java
启动生产者实现类DefualtMQProducerImpl
DefaultMQPushConsumer
消息体大小
读取文件流程
加载全部CommitLog文件
物理位点
1235
SQL过滤流程
提交消费状态
关闭BrokerController
IndexService
RMQ_SYS_TRANS_HALF_TOPIC
Thread.sleep()
第一次Tag过滤条件
classClientConfig
第二次Tag过滤满足条件
ReadSocketService接收Slave上报Offset请求
序列化消息,保存到Page Cache
0 条评论
下一页