普通消息生产详细流程
2022-04-12 17:22:11 11 举报
登录查看完整内容
普通消息处理流程,持久化
作者其他创作
大纲/内容
ReputMessageService
是
27.计算index已经存储到的位置 absIndexPos
12.ReputMessageService
否
3.监听生产者者
16.是否找到
记录该slot当前index,如果hash冲突(即absSlotPos一致)作为下一次该slot新增的前置index
index数量加1
23.对k取hash值k=topic + \"#\" +key
this.indexHeader.incIndexCount();
15.根据topic和queueid 查找 ConsumeQueue 对象
消息是否设置keys
5.SendMessageProcessor#processRequest()
写入到commitlog文件中。持久化
获取到
则将写入这个indexFile文件中第一条消息的物理偏移量和存储时间写到indexHeader中
mappedFile对象每次创建都保存到集合中,它本质上是缓冲区,消息内容先写到内部的ByteBuffer
HAService
17.返回ConsumeQueue 对象
26.从缓冲区中读取该位置slot存储的值slotValue
处理consumequeue
这里是优先写入缓冲区
从commit中读取消息
21.CommitLogDispatcherBuildIndex#dispatch()
这里刷盘都是异步的
结束
2.BrokerController#start()
生产消息
11.CommitLog#submitFlushRequest()
24.用k对hashSlotNum 取模得到slotPos(hashSlotNum=500w)
6.DefaultMessageStore#asyncPutMessage()
消息写到commitlog 过程
获取不到
开始写入消息
8.获取最新的MappedFile文件
消息索引写到consumequeue 和 indexFile 过程
开始
25.计算slot在indexFile的实际位置absSlotPos = 40 + slotPos * 4
ScheduleMessageService
何时刷盘?
同步
是否为延迟消息
处理indexFile
处理consumequeue 和 indexFile 的服务
NettyRemotingServer
1.先判断文件是否存在、是否写满2.如果不存在或者最后一个文件写满,则创建一个文件3.如果存在,直接返回该文件4.如果创建了新文件,启动一个线程将前一个写满的文件异步刷盘
slotValue = this.mappedByteBuffer.getInt(absSlotPos)
则创建 ConsumeQueue 对象,并且在内部会创建一个 MappedFileQueue 对象,保存了存储路径为 $HOME/store/consumequeue/topic/queueid最终返回 ConsumeQueue 对象
4.Producer
7.CommitLog#asyncPutMessage()
索引条目内容=commitlog 物理偏移量 + 消息大小 + tag 的hashcode其大小依次为:8byte + 4byte + 8byte
提交刷盘请求
10.将消息体内容写入到缓冲区
18.保存索引条目信息到缓冲区
消息分发
19.计算这个queueId下的ConsumeQueue 最大,最小物理偏移量
22.获取 IndexFile文件
异步
9.MappedFile#appendMessage()
14.CommitLogDispatcherBuildConsumeQueue#dispatch()
如果是第一次写入
20.刷盘结束
。。。。。
1.BrokerStartup#main()
29.然后将刚刚定位的indexCount写入到当前slot槽中
13.DefaultMessageStore#doDispatch()
0 条评论
回复 删除
下一页