MQ_03RocketMQ高性能的核心原理
2023-04-27 17:25:52 11 举报
AI智能生成
RocketMQ高性能的核心原理
作者其他创作
大纲/内容
读写分离的思想。RocketMQ在最MessageQueue的路由策略时,就可以通过指向不同的队列来实现读写分离
会创建对应的存储文件,负责消息写入
写队列
会记录Consumer的Offset,负责消息读取
读队列
会有一部分写队列无法写入到读队列中,这一部分的消息就无法被读取,就会造成消息丢失
消息存入了,但是读不出来
写队列>读队列
一部分读队列里是没有消息写入的。如果有一个消费者被分配的是这些没有消息的读队列,那这些消费者就无法消费消息,造成消费者空转,极大的浪费性能
写队列<读队列
通常在运行时,都需要设置读队列=写队列
写队列=读队列
原来四个队列,现在要缩减成两个队列
先缩减读写队列,那么被缩减的MessageQueue上没有被消费的消息,就会丢失
先缩减写队列,待空出来的读队列上的消息都被消费完了之后,再来缩减读队列,这样就可以比较平稳的实现队列缩减了
队列缩减·
场景分析
读队列与写队列
RocketMQ消息直接采用磁盘文件保存消息,默认路径在${user_home}/store目录。这些存储目录可以在broker.conf中自行指定
存储消息的元数据
所有消息都会顺序存入到CommitLog文件当中
CommitLog由多个文件组成,每个文件固定大小1G。以第一条消息的偏移量为文件名
件大小是固定的,但是其中存储的每个消息单元长度是不固定
具体格式
文件结构
CommitLog
存储消息在CommitLog的索引
一个MessageQueue一个文件,记录当前MessageQueue被哪些消费者组消费到了哪一条CommitLog
msgPhyOffset(8byte,消息在文件中的起始位置)
msgSize(4byte,消息在文件中占用的长度)
msgTagCode(8byte,消息的tag的Hash值)
常量CQ_STORE_UNIT_SIZE=20,这个常量就表示一个数据块的大小
固定由30万个固定大小20byte的数据块组成
ConsumerQueue
为了消息查询提供了一种通过key或时间区间来查询消息的方法,这种通过IndexFile来查找消息的方法不影响发送与消费消息的主流程
indexHeader(固定40byte)
slot(固定500W个,每个固定20byte)
index(最多500W*4个,每个固定20byte)
IndexFile
存储文件主要部分
数据存盘检查点
主要记录commitlog文件、ConsumeQueue文件以及IndexFile文件最后一次刷盘的时间戳
checkpoint
将RocketMQ的一些关键配置信息进行存盘保存
例如Topic配置、消费者组配置、消费者组消息偏移量Offset
config/*.json
RocketMQ用来判断程序是否正常关闭的一个标识文件
正常情况下,会在启动时创建,而关闭服务时删除
如果遇到一些服务器宕机,或者kill -9这样一些非正常关闭服务的情况,这个abort文件就不会删除,因此RocketMQ就可以判断上一次服务是非正常关闭的,后续就会做一些数据恢复的操作
abort
辅助的存储文件
好处是可以减少查找目标文件的时间,让消息以最快的速度落盘
Kafka需要寻找消息所属的Partition文件,再完成写入
当Topic比较多时,这样的Partition寻址就会浪费比较多的时间,所以Kafka不太适合多Topic的场景
RocketMQ的这种快速落盘的方式在多Topic场景下,优势就比较明显
对比kafka
1.所有生产者发过来的消息,都会无差别的依次存储到Commitlog文件当中
2.每次存CommitLog文件时,都会去检查当前CommitLog文件空间是否足够,如果不够的话,就重新创建一个CommitLog文件。文件名为当前消息的偏移量
每个文件夹对应RocketMQ中的一个MessageQueue,文件夹下的文件记录了每个MessageQueue中的消息在CommitLog文件当中的偏移量
消费者通过ComsumeQueue文件,就可以快速找到CommitLog文件中感兴趣的消息记录
消费者在ConsumeQueue文件当中的消费进度,会保存在config/consumerOffset.json文件当中
3.ConsumeQueue文件主要是加速消费者的消息索引
消费者进行消息消费时,通过ConsumeQueue文件就足够完成消息检索了
如果要按照MeessageId或者MessageKey来检索文件IndexFile文件就是用来辅助这类消息检索的
4.IndexFile文件主要是辅助消息检索
要点
消息持久化
消息既然要持久化,就必须有对应的删除机制
CommitLog文件和ConsumeQueue文件都是以偏移量命名
对于非当前写的文件,如果超过了一定的保留时间,那么这些文件都会被认为是过期文件,随时可以删除
保留时间就是在broker.conf中配置的fileReservedTime属性
所以,RocketMQ的消息堆积也是有时间限度的
RocketMQ判断文件是否过期的唯一标准就是非当前写文件的保留时间,而并不关心文件当中的消息是否被消费过
判断过期文件
RocketMQ内部有一个定时任务,对文件进行扫描,并且触发文件删除的操作
用户可以指定文件删除操作的执行时间。在broker.conf中deleteWhen属性指定。默认是凌晨四点
broker的磁盘空间不要少于4G(官方建议)
RocketMQ还会检查服务器的磁盘空间是否足够,如果磁盘空间的使用率达到一定的阈值,也会触发过期文件删除
删除过期文件
过期文件删除
所谓的零拷贝技术,其实并不是不拷贝,而是要尽量减少CPU拷贝
应用程序需要与网卡、磁盘等硬件进行数据交互时,就需要在用户态和内核态之间来回的复制数据
这些操作,原本都是需要由CPU来进行任务的分配、调度等管理步骤
当发生大规模的数据读写操作时,CPU的占用率会非常高
CPU拷贝
由DMA来负责这些频繁的IO操作
DMA是一套独立的指令集,不会占用CPU的计算资源
DMA拷贝
应用程序对磁盘文件的读与写,都需要经过内核态与用户态之间的状态切换,每次状态切换的过程中,就需要有大量的数据复制
在这个过程中,总共需要进行四次数据拷贝
磁盘与内核态之间的数据拷贝,在操作系统层面已经由CPU拷贝优化成了DMA拷贝
零拷贝技术优化的重点,就是内核态与用户态之间的这两次拷贝
文件的内存起始地址
文件大小
真实的数据,也不需要在用户态留存,可以直接通过操作映射,在内核态完成数据复制
mmap文件映射的方式,就是在用户态不再保存文件的内容,而只保存文件的映射
内核态与用户态之间的拷贝依然是CPU拷贝
文件的读写操
mmap文件映射机制
早期的sendfile实现机制其实还是依靠CPU进行页缓存与socket缓存区之间的数据拷贝
在拷贝过程中,并不直接拷贝文件的内容,而是只拷贝一个带有文件位置和长度等信息的文件描述符FD,这样就大大减少了需要传递的数据
最后,sendfile机制在内核态直接完成了数据的复制,不需要用户态的参与,所以这种机制的传输效率是非常稳定的
sendfile机制非常适合大数据的复制转移
sendFile机制
零拷贝技术加速文件读写
通常应用程序往磁盘写文件时,由于磁盘空间不是连续的,会有很多碎片
这个过程中有大量的寻址操作,会严重影响写数据的性能
我们去写一个文件时,也就无法把一个文件写在一块连续的磁盘空间中,而需要在磁盘多个扇区之间进行大量的随机写
顺序写机制是在磁盘中提前申请一块连续的磁盘空间,每次写数据时,就可以避免这些寻址操作,直接在之前写入的地址后面接着写就行
DefaultAppendMessageCallback的doAppend方法。在这个方法中,会以追加的方式将消息先写入到一个堆外内存byteBuffer中,然后再通过fileChannel写入到磁盘
顺序写加速文件写入磁盘
当应用程序写入一个文件时,文件内容并不会直接写入到硬件当中,而是会先写入到操作系统中的一个缓存PageCache中
PageCache缓存以4K大小为单位,缓存文件的具体内容
PageCache依然是内存状态,所以一断电就会丢失
需要将内存状态的数据写入到磁盘当中,这样数据才能真正完成持久化,断电也不会丢失。这个过程就称为刷盘
操作系统只会在某些特定的时刻将PageCache写入到磁盘
对于有数据修改的PageCache,会标记为Dirty(脏页)状态。当Dirty Page的比例达到一定的阈值时,就会触发一次刷盘操作
操作系统也提供了一个系统调用,应用程序可以自行调用这个系统调用,完成PageCache的强制刷盘
PageCache
在返回写成功状态时,消息已经被写入磁盘
消息写入内存的PAGECACHE后,立刻通知刷盘线程刷盘, 然后等待刷盘完成,刷盘线程执行完成后唤醒等待的线程,返回消息写 成功的状态
同步刷盘机制会更频繁的调用fsync,所以吞吐量相比异步刷盘会降低,但是数据的安全性会得到提高
同步刷盘
在返回写成功状态时,消息可能只是被写入了内存的PAGECACHE,写操作的返回快,吞吐量大
当内存里的消息量积累到一定程度时,统一触发写磁盘动作,快速写入
异步刷盘
刷盘方式是通过Broker配置文件里的flushDiskType 参数设置的,这个参数被配置成SYNC_FLUSH、ASYNC_FLUSH中的 一个
配置方式
刷盘机制保证消息不丢失
高效文件写
Broker以一个集群的方式部署,会有一个master节点和多个slave节点,消息需要从Master复制到Slave上
同步复制是等Master和Slave都写入消息成功后才反馈给客户端写入成功的状态
在同步复制下,如果Master节点故障,Slave上有全部的数据备份,这样容易恢复数据。但是同步复制会增大数据写入的延迟,降低系统的吞吐量
同步复制
异步复制是只要master写入消息成功,就反馈给客户端写入成功的状态。然后再异步的将消息复制给Slave节点
在异步复制下,系统拥有较低的延迟和较高的吞吐量。但是如果master节点故障,而有些数据没有完成复制,就会造成数据丢失
异步复制
消息复制方式是通过Broker配置文件里的brokerRole参数进行设置的,这个参数可以被设置成ASYNC_MASTER、 SYNC_MASTER、SLAVE三个值中的一个
消息主从复制
Producer发送消息时,默认会轮询目标Topic下的所有MessageQueue,并采用递增取模的方式往不同的MessageQueue上发送消息,以达到让消息平均落在不同的queue上的目的
Producer负载均衡
Consumer也是以MessageQueue为单位来进行负载均衡。分为集群模式和广播模式
集群消费模式下,每条消息只需要投递到订阅这个topic的Consumer Group下的一个实例即可
每当实例的数量有变更,都会触发一次所有实例的负载均衡,这时候会按照queue的数量和实例的数量平均分配queue给每个实例
将同机房的Consumer和Broker优先分配在一起
可以通过一个machineRoomResolver对象来定制Consumer和Broker的机房解析规则
还需要引入另外一个分配策略来对同机房的Broker和Consumer进行分配
AllocateMachineRoomNearby
平均分配。将所有MessageQueue平均分给每一个消费者
AllocateMessageQueueAveragely
轮询分配。轮流的给一个消费者分配一个MessageQueue
AllocateMessageQueueAveragelyByCircle
不分配,直接指定一个messageQueue列表。类似于广播模式,直接指定所有队列
AllocateMessageQueueByConfig
按逻辑机房的概念进行分配。又是对BrokerName和ConsumerIdc有定制化的配置
AllocateMessageQueueByMachineRoom
一致性哈希策略只需要指定一个虚拟节点数,是用的一个哈希环的算法,虚拟节点是为了让Hash数据在换上分布更为均匀
AllocateMessageQueueConsistentHash
分配算法分别对应AllocateMessageQueueStrategy下的六种实现类
集群模式
每一条消息都会投递给订阅了Topic的所有消费者实例,所以也就没有消息分配这一说
Consumer分配Queue时,所有Consumer都分到所有的Queue
关键是将消费者的消费偏移量不再保存到broker当中,而是保存到客户端当中,由客户端自行维护自己的消费偏移量
广播模式
Consumer负载均衡
负载均衡
RocketMQ高性能的核心原理
0 条评论
下一页