消费偏移量offset
2022-04-30 23:59:23 8 举报
理解rocketmq的消费偏移量offset
作者其他创作
大纲/内容
this.reputFromOffset += size
15.BrokerController#initialize()
long expectLogicOffset = cqOffset * 20cqOffset 就是上面步骤5中的第一个操作,第一次是0,然后根据 expectLogicOffset 获取MappedFile 文件(每个索引单元固定是20byte)
启动持久化定时任务
mappedFile文件写到哪里它是记录的
topicQueueTable.get(key)查询consumequeue的 offfset 是多少,key是\"topic-queueId\
getResult.setNextBeginOffset(nextBeginOffset);
1.PullMessageService
3
readSize += size
消息消费过程
20byte
4.MappedFile#appendMessage()
6.计算下一次的从consumequeue 中的哪个索引单元开始消费
Broker 启动时会启动很多定时任务,其中就有持久化 comsumer offset 的
reputFromOffset 是多少?第一次默认是0,继续往后看
13.发送netty 命令到broker
开始遍历读取到的消息,它这个遍历的维度是 “消息的大小size”
font color=\"#000000\" style=\"\
nextBeginOffset = offset + (i / 20)
开始处理从broker拉取到的消息
10.调用线程池定时任务持久化方法persistAllConsumerOffset()
2.DefaultMessageStore#asyncPutMessage()
在写入消息的物理便量commitlog offset读取消息最终是根据它来读取的
index=0
1
接收消息生产者发送的请求并开始保存消息到broker
14.ConsumerManageProcessor#commitOffset()
font color=\"#f44336\
7.将下次消费的索引单元的起始offset 设置到响应体中,返回给消费者,消费者下次消费在带过来
处理拉取到的消息
8.PullCallback
下次循环
消息分发处理
这样某个queueId 下的索引数据就写完了
MappedFile中有一个 wrotePosition 属性,会记录每次写入消息的位置,每次按照消息大小递增
MappedFile#wrotePosition.addAndGet(result.getWroteBytes())
5.遍历索引数据
5.开始准备消息体信息
根据topic和queueId 获取 ConsumeQueue 对象,一个queueid 对应一个 ConsumeQueue,一个 ConsumeQueue 对应一个 MappedFileQueue, 一个 MappedFileQueue 对应多个 MappedFile
消息生产过程
6.记录mappedFile文件写入的位置
从上面的响应结果中读出 nextBeginOffset,设置到 PullRequest 中的 nextOffset 属性中,等待下次拉取消息
16.ConsumerOffsetManager#persist()
当broker启动的时候会启动很多定时任务,其中就有消息分发(就是处理索引的)
2.PullMessageProcessor
RequestCode.UPDATE_CONSUMER_OFFSET
12.RemoteBrokerOffsetStore#updateConsumeOffsetToBroker()
每次自增消息的总大小
offset = 5
先处理索引
3.DefaultMessageStore#getMessage()
9.开始遍历消息
只关注offset
4.处理索引数据
持久化
只分析和offset相关,有些过程我就直接略过了
11.从 offsetTable 中读取每个 queue 消费过了的 offset
offsetTable
就假设消费者开始第一次消费
20bye
这里所说的消费偏移量值的是集群模式下客户端的消费进度,默认在$home/store/config/consumerOffset.json 文件中
3.CommitLog#asyncPutMessage()
默认持久化到 $home/store/config/consumerOffset.json 文件中
消费者消费中和 offset 无关的操作
8.从commitlog读取消息CommitLog.getData(reputFromOffset)
17.将当前ConsumerOffsetManager序列化成JSON字符串(就是在序列化offsetTable 这个Map属性)
7.ReputMessageService
2
就是这个类
1.SendMessageProcessor
根据索引数据开始遍历读取消息
9.消费者客户端启动时会创建很多定时任务,其中就有持久化offset 的定时任务
记录了已经读取的消息,他是循环的判断条件,知道就行,不影响理解
将读取到的消息放入集合中,等待返回给消费者
0 条评论
下一页
为你推荐
查看更多