RocketMQ Broker接收并存储消息流程
2022-03-16 15:26:46 8 举报
RocketMQ Broker接收并存储消息流程
作者其他创作
大纲/内容
唤醒异步刷盘服务线程去刷盘
是否批量消息
将消息存储到commitLog文件中
CommitLogDispatcherBuildIndex
判断刷盘类型
处理批量消息
......
processRequest
将消息写入MappedFile映射的一块内存
只是写入映射的内存
构建消息类
如果为空或文件满了,需要创建一个新的
消费者发送的重试消息
使用MessageStore组件将消息存储在本地文件
创建一个刷盘request提交给同步刷盘服务线程,并唤醒该线程去刷盘
CommitLogDispatcherBuildConsumeQueue
同步
获取commitLog文件中最新的消息
写入映射内存,由后台线程任务异步刷盘
处理单条消息
对写入的结果进行处理,成功则继续,失败就抛异常
获得最新的CommitLog文件
交由IndexService写入IndexFile
异步
等待刷盘完成
SendMessageProcessor
DefaultMessageStore
CommitLog
判断请求码
处理刷盘
检查消息发送是否合理
每次间隔1ms执行
这步还不是很清楚
处理broker之间的主从同步
判断几种消息拒绝写入文件的情况
生产者发送的普通消息
后台线程服务reputMessageService
遍历每条消息,分别转发给ConsumerQueue和IndexFile构建类
0 条评论
下一页