RocketMQ运行原理学习
2022-05-13 16:15:29 1 举报
源码学习
作者其他创作
大纲/内容
Consumer
HAConnecttion从节点连接
ProcessQueue数据结构
消费者组topic
BrokerName相同 demoBroker01
主从同步
写入
写入到磁盘
消息存储
磁盘
Linux系统
心跳数据
无通信
顺序写入(内存)
HAClient主从响应线程
commitLog磁盘文件
业务系统
执行成功
延迟消息_topic_%
ACK机制
定时检查
ConsumeMessageThread消息消费线程
transientStorePoolEnabled
执行业务逻辑代码
offset 偏移量
有两种消费模式一种是顺序消费、一种是并发消费ConsumeMessageConcurrentlyServiceConsumeMessageOrderlyService如何顺序消费呢?加锁
消息
indexFile
关联
nameserver注册中心
后台线程
与commitLog一一对应
写入消息
offset之后数据
根据offset读取数据
消息处理成功 / 失败
看做基于topic的索引文件
并发处理
MappedFiled内存映射(page chche)
size,4个字节
broker数据存储节点brokerId -> 0
HAClient主从请求线程
write queueread queue的区别(虚关系),物理上面不存在write在缩容的时候,read保持原样,可以继续消费
PullMessageService循环拉取消息
监听
queues
broker数据存储节点brokerId -> 1
延迟消息后台线程
read queue
创建
ConsumeQueue
每30s发起一次心跳
发送offset之后的数据
broker主节点
执行
消息失败处理机制
message消息
改写
listenner.consumeMessage()业务自己实现
读写队列:举个例子,读队列由4缩容到2,read queue还是4,可以保证consumer继续消费
处理消息
注册
管理LRU淘汰
一个broker对应一个commitLog
CONSUME_SUCCESS/RECONSUME_LATER
500ms
tag hashCode,8个字节
读取消息
写入延迟消息
改写为新的topic
peers对等数据架构
定位到commitLog磁盘, 起始位置
写入topic
每隔10s元数据同步
连接
发送主从同步请求max offset
异步刷盘pdflush内核线程
拉取topic路由信息每隔30s刷新一次
写数据
映射
BrokerName相同 demoBroker02
定时刷数据
jvm heap堆外内存
cluster相同 - demoCluster
根据消息数据查找
处理失败的消息
发送失败
每隔10s检查120s没有心跳摘除broker
按照field进行hash
Producer
topic本地缓存
负载均衡
重试机制
commitLog
offset,8个字节
0 条评论
下一页