Broker数据存储
2021-07-20 21:28:08 2 举报
rocketmq之broker存储
作者其他创作
大纲/内容
宕机或扩容
消息
CommitLogDispatcherBuildConsumeQueue
预读取
Broker接收到消息后,存入commitlog磁盘文件中,每个commitlog文件大小1GB,顺序写入
ReputMessageService线程间隔1毫秒
同步刷盘
DefalutMessageStore.cleanFilesPeriodically方法fileReservedTime指定删除文件的时间,默认72小时
获取要转发的消息
消息数据
进入rebalance环节,重新进行messagequeue的分配;将所有的messagequeue负载重平衡到所有的消费者机器。
寻找消息
OS Cache
写入文件
commitlog顺序存储broker接受到的真实消息,而每个topic都messagequeue都对应一个consumequeue文件,此文件记录了messagequeue的消息地址,也就是偏移量。可以通过此文件找到某个topic的某个messagequeue在commitlog中的具体的消息。
FlushCommitLogService--线程CommitRealTimeService.run()CommitLog.this.mappedFileQueue.commitmappedFile.commit(xx)this.fileChannel.write(byteBuffer)
GroupCommitServiceCommitLog.this.mappedFileQueue.flush(0)mappedFile.flush(xx)this.mappedByteBuffer.force();
异步刷盘
messagequeue
储存地址
写入内存
对应
本质上是消费者组,找到topic对应的messagequeue所在的broker,然后通过messagequeue找到对应的consumequeue,拿到消息的偏移量,再从commitlog中读取消息
MappedByteBuffer内存映射缓冲区
定期清理
磁盘
doReput方法
消息消费
消费消息
引用
1.先写入System Page Cache2.OS线程异刷入到磁盘commitlog文件3.commitlog文件采用顺序写入4.存在消息丢失的风险,broker宕机,消息会丢失
1.先写入System Page Cache中2.同时将消息刷入到磁盘commitlog文件中3.如果broker宕机,则对于producer来说是消息发送失败4.消息不会丢失,但是吞吐性能会下降
this.cleanConsumeQueueService.run();
broker
topic
先写入系统缓存
CommitLogDispatcherBuildIndex
优先写入
转发消息
commitlog存储位置:rocketmq安装目录,store/commitlog/目录
删除条件1.当磁盘使用率达到85%时触发2.磁盘没满,则每天定时清理超过72小时的文件(包括未消费的消息);broker.conf中deleteWhen指定任务开始执行时间,默认04,也就是凌晨4点
MappedFile映射内存
this.cleanCommitLogService.run();
预读取一部分数据,方便高效查询
对应consumequeue文件,rocketmq安装目录,store/consumequeue目录,目录下以topic名称命名的目录store/consumequeue/{topic}/{queueId}/{fileName}
消息持久化
IndexFile
consumequeue文件储存messagequeue消息在commitlog中的地址,也就是偏移量
异步
消息者机器消费消息完毕后,会回传一个消费进度到broker上,也就是offset,然后broker去存储消费进度。
0 条评论
下一页