RocketMQ
2022-05-30 22:44:40 0 举报
AI智能生成
整理了RocketMQ的消息发送、消息存储、消息消费、集群等知识点
作者其他创作
大纲/内容
用来取代zk的,是rocketMQ的大脑所以也要先启动NameServer
NameServer比Zk性能高原因:不像ZK那样做很多事情
NameServer
基于文件顺序读写,内存映射机制
容忍设计缺陷:消息只消费一次
可以保证消息顺序消费
有消息文件过期机制
存储空间预警机制
消息堆积能力
消息顺序存储在同一文件中(commitlog)
消息存储性能
消息存储
消费者可以过滤消息(tag)
同步刷盘不会丢失
异步刷盘会少量丢失
单节点开启异步复制时,只丢失少量
可引入双写机制
消息高可用
不发生消息堆积时,以长轮询模式实现准实时的消息推送
消费低延时
利用ack确保消息至少消费一次
已经消费完的消息,可以根据业务要求重新消费
消息回溯
消息只能在指定时间之后才能被消费【16个等级,如5秒,15秒等】收费版才支持任意时间精度
定时消息
重发时不会发给同一broker,因为有可能该broker已经挂了【规避原则】
消息重试机制
RocketMQ设计思想
普通消息
延时消息
事务消息
消息类型
基础
发送同一类消息的生产者称之为一个生产者组
如果 Producer 中途意外宕机,Broker 会主动回调 Producer Group 内的任意一台机器来确认事务状态
Producer Group
Producer 实例线程安全
Producer
业务层面的唯一性标识,可以用来查找消息(有索引文件)
RocketMQ不能保证messegeId唯一,所以要设置唯一key
Message里的 Key
不同的消息类型(定时消息、事务消息)使用不同的主题
不同的业务类型消息(天猫订单、京东订单),使用不同的主题
不同的消息量级使用不同的主题
一个topic下默认4个分片,RocketMQ里没有队列的概念
topic
二级消息类型,用来进一步区分某个 Topic 下的消息分类,用于消费端过滤消息
淘宝交易订单的不同类型(女装订单、电器、化妆品订单)使用不同的tag
tag
单向发送:oneWay
发送后会在等到响应之后才算是发送完
消息的全局唯一标识:机器 IP+消息偏移量【所以可能重复】
MessageId
发送标识:成功、失败等
点击这里查看SendResult里状态标识
SendStatus
RocketMQ收到消息后,所有主题的消息都存储在commitlog中,然后采用异步转发到consumerQueue中ConsumeQueue中,每个主题的数据会做数据分片(Queue)。Kafka中的存储是没有队列的概念的
一个Topic,默认有4个Queue,就不能保证顺序了。会返回给生产者一个queueId
Queue
同步发送
消息发送后立刻返回,当消息完全完成发送后,会调用回调函数sendCallback来告知发送者本次发送是成功或者失败
关注 bug师姐 ,gongzhonghao回复:扣扣号 获取联系方式可免费获取编辑版本
异步发送
验证消息、查找路由(第一次从NameServer)、消息发送
生产者流程
一批的消息不超过4M,且具有相同主题、标签等。组装成一个List
批量发送
重发时,不会再发给同一broker【规避原则】,以提高重发成功率
消息重试
深入消息发送
当发送的消息不重要时,采用one-way方式,以提高吞吐量;
当发送的消息很重要是,且对响应时间不敏感的时候采用sync方式;
当发送的消息很重要,且对响应时间非常敏感的时候采用async方式;
选择
消息发送
DefaultMQProducerImpl
不管怎么重试,总的超时时间是不会超过方法入参里的timeout的
源码类名#方法名
int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
从上面源码可知,同步发送才有重试次数,单向发送和异步发送只有一次发送机会
getRetryTimesWhenSendFailed(),默认重试次数是 2
重试次数
由下面源码可知,如果获取MessageQueue和Topice超时超过总的超时限制(方法传进来的超时时间)则不会发送消息
long costTime = beginTimestampPrev - beginTimestampFirst;if (timeout < costTime) { callTimeout = true; break; // 跳出循环不走后面的发送代码,也不会重试 }
重试
Producer端重试
Consumer默认是16次
consumer重试有时间间隔,可在配置文件中配置
在广播模式下不会进行重试
异常重试:消费者发生异常在catch块里主动返回RECONSUME_LATER时会进行重试
超时重试:是指消费者没有返回RECONSUME_LATER,也没有返回consume_success时,会一直重试到最大重试次数
broker会一直重试到16次(默认)后丢进死信队列,已和架构组确认
3.2.6不对超时进行处理(即broker不会重发消息),在重启消费者的时候会触一次rebalance,重新找一个消费者处理。
4.x时如果消费者一直不回复broker,则broker会一直重试,直到最大重试次数后进入死信队列
版本问题
如果broker配置的重试次数是16,但我们消费者只想重试3次怎么办?
在消费代码的catch块里,获取重试次数reconsumeTimes,如果到了3次,直接返回成功
减少重试
若出现网络问题导致消息根本没有发送到消费者,则会重新投递
网络问题超时处理
Consumer端重试
重试机制
使用相同group Id的订阅者属于同一个消费群组
群组
一条消息只需要被群组内的任意一个消费者消费即可
群组消费(集群模式)
每条消息,会推送给群组内的所有消费者,且保证被每个消费者消费一次
广播消费(广播模式)
基本概念
不会保证失败重新投递时路由到同一台机器上
群组消费特点
不支持顺序消息,也不支持重置消费位点
每条消息要被群组的每台机器都消费一次
不会对消费失败的消息进行重新投递【业务自己关注】
消费者重启后,会从当前最新的消息开始消费,停服期间的消息不能再消费
广播消费特点
会将broker里的Queue都拉过来,也可以指定MessageQueue读取消息
读取的offset,需要自己保存处理;而推模式很多事都不用管,比如偏移量直接在broker里
拉取消息后,返回结果会有以下四种状态之一:Found(获取到消息),没有匹配到消息,没有新消息,非法偏移量
//客户端从broker一次拉取后,实际消费的最大数量 <= pullBatchSize private int consumeMessageBatchMaxSize = 1;
//客户端从broker一次拉取最大数量 <= 服务端的实际参数限制 private int pullBatchSize = 32;
拉取消息数量配置注意
拉模式
Consumer控制长轮询,broker发送请求时,5s内有新消息时,还是通过该连接返回给消费者
长轮询
判断 消息个数、消息总大小、offset 【最大和最小差距】三个指标,超过任一设定值时,就等会儿再拉取
流量控制
消息队列重新分布由MQClientInstance(最大的一个实例)持有的RebalanceService实现【专门一个线程处理】所有生产者都持有一个MQC,所有消费者持有一个MQC
一个消费者可以分配多个消息队列,同一消息队列只能分配给一个消费者ReblanceService每隔20S进行一次队列重新分配每次重新分配时,会查回所有消费者,并对消费者列表和消息队列排序
平均分配【优先】:AllocateMessageQueueAveragely
平均轮询分配【优先】:AllocateMessageQueueAveragelyByCircle
一致性Hash【不推荐】:AllocateMessageQueueConsistentHash
根据配置指定:AllocateMessageQueueByConfig
根据broker部署机房名:AllocateMessageQueueByMachineRoom
分配算法(5种)Rebalance策略
重新分布机制
PushConsumer会通过返回ConsumeConcurrentlyStatus里的不同状态作出不同处理
表示真正消费成功了
CONSUME_SUCCESS
稍后重试:默认是10秒,默认最多重试16次,最后会投到死信队列
可在broker.conf文件中配置Consumer端的重试次数和重试时间间隔
消息返回的是一个新的重试topic:%retry%+consurmergroup
集群模式时,Broker才会自动进行重试
RECONSUME_LATER
RocketMQ以consumerGroup+queue为单位管理消费进度只会记录批次消息中最小的offset值作为消费进度:保证都是消费成功问题是重新投递时可能会重复,需要自己保证幂等
如果超时了,既没有返回consume_success也没有返回reconsume_later,则broker认为消息没有发送到消费者,则也会重试
消息确认(ACK)
消费进度都存在本地:.rocketmq_offsets
广播模式
消息进度存在broker上
集群模式
消息进度存储
推模式,是先通过拉模式拉到本地后,再推送给PushConsumer的
拉取都是批量拉取的,再分发给PushConsumer的。流量控制这些也是拉取者做的
PushConsumer是一条条提交的,而拉取者本质上还是批量提交,且只提交最早的那个偏移量
拉的时候是阻塞的,所以不会数据混乱(Kafka是多线程的)
推模式原理
推模式(拉模式封装)
Offset是指某个 Topic下的一条消息在某个 Message Queue里的 位置
集群模式时,由 Broker 端存储和控制 Offset 的值,使用 RemoteBrokerOffsetStore 结构
广播模式时,每个 Consumer 都收到这个 Topic 的全部消息,各个 Consumer 间相互没有干扰, RocketMQ 使 用 LocalfileOffsetStore,把 Offset存到本地
rocketmq的指定offset消费,只能是刚启动时有用,后面都是接着上次的消费进度消费
offset详解
消费方式
消息消费
生产者和消费者的并发设置成1,Queue也设置成1【性能不高】
mqadmin update Topic -t AllOrder -c DefaultCluster -r 1 -w 1 -n 127.0.0.1:9876
全局顺序消息
一个主题根据sharding key进行分区,在同一分区内保证生产和消费的FIFO【通过不同的tag实现的】
部分顺序消息
顺序消息
等待一段时间后才投递给消费者
比如创建订单时发送一条延时消息,30分钟后投递给消费者,消费者就会去检查是否支付,未完成则关闭订单
共16个级别。不支持任意精度的解释是:任意精度的延时需要做消息排序,这是有损耗的
msg.setDelayTimeLevel(5)代表延迟一分钟:下标从1开始1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
解决问题:本地事务执行与消息发送的原子性问题
1、应用模块遇到要发送事务消息的场景时,先发送prepare消息给MQ。 默认的RMQ_SYS_TRANS_HALF_TOPIC主题2、prepare消息发送成功后,应用模块执行数据库事务(本地事务)。 在实现TransactionListener接口的类里处理3、根据数据库事务执行的结果,再返回Commit或Rollback给MQ。4、如果是Commit,MQ把消息下发给Consumer端【此时移到真实Topic里】 如果是Rollback,直接删掉prepare消息。5、如果第3步的执行返回结果是UNKNOW,或是超时的, 则启动定时任务(自己开发,麻破火山口)回查事务状态(最多重试15次, 超过了默认丢弃此消息),处理结果同第4步。也是在TransactionListener实现类里处理6、MQ消费的成功机制由MQ自己保证。
外框
事务流程
主要是为了不确认是否真正成功时的一个查询判断
通过transactionCheckInerval设置回查时间间隔,默认1分钟
事务回查机制
事务消息是不支持延迟消息和批量消息
无法被消费的消息称为死信消息(重试多次也不成功的消息),死信队列存储死信消息
死信3天后会被自动删除,所以要及时处理
一个死信队列对应一个GroupID,而不是单个消费者对应GroupID有死信之后才会创建死信队列。控制台可查询死信
一个死信队列拥有GroupID的所有死信,不论是哪个topic的
死信队列(DLQ)
网络闪断或客户端宕机时,客户端应答失败,生产者会重复发消息,消费者后面就会收到两条messageID一样的两条消息
投递给消费者时,网络闪断应答失败,也会重复投递
broker或客户端重启、扩容等会触发Rebalance时,也可能收到重复消息
重复场景
以唯一的消息key标识,例如订单号
有了唯一订单号,消费者在消费时可通过订单号先去查询判断是否已经处理过
解决办法
消费幂等
FilterServerConsumer类,4.3.0以后不支持了
类过滤
消费者可以指定tag消费,多个可以用“||”分隔,*表所有
由于在broker是通过消费者的tag的hashcode进行判断的,有哈希冲突,所以不准,还需要消费者自己再做判断tag是否是自己订阅的,不是的就丢弃
tag过滤
开启 SQL 过滤的话,Broker 需要开启参数 enablePropertyFilter=true,然后服务器重启生效
必须开启enablePropertyFilter = true
像写SQL语句一样,可以在SQL里写过滤自定义的属性的语句
一般最好不用,有可能类似慢询的sql语句,导致性能差。
SQL92过滤
表达式过滤
消息过滤
深入消息
目前只有ZeroMQ不需要存储
文件系统 > KV存储 > 关系数据库
存储效率
关系数据库 > KV存储 > 文件系统
可靠性
存储知识
所有主题的所有消息,都会顺序追加在commitlog(默认1G)文件中,顺序追加大大增加了写消息的速度
在broker里设置mapedFileSizeCommitLog可改变大小,写满一个commitlog文件又会新起一个
commitlog文件名为文件里第一条消息在整个commitlog文件组中的偏移量,当知道某条消息的offset时,减去文件名就能算出在该文件中的绝对地址
commitlog的文件存储逻辑视图中,每个主题有4个字节存储消息的总长度
存储了consumeQueue里的队列、messageKey、Tag等所有信息所以既保证了和consumequeue的一致性,也保证consumeQueue丢失也能找回来
消息写到commitlog文件中后,都是异步转发消息条目给ConsumeQueue和index文件的(不是转发的整个消息)
commitLog消息存储文件
commitlog采用顺序追加写消息,虽然写快了,但是我们消费消息的时候是按某个topic来消费的,如果直接去commitlog中查找的话需要全量遍历,所以为了按topic快速查找就有了consumeQueue
consumeQueue中存储的是消息条目,不会存储消息的全量信息
为什么要有consumeQueue?
每一条消息对应的consumeQueue结构是:8字节的commitlog偏移量+4字节消息长度+8字节的tag哈希码
consumeQueue里每条消息占20字节,消费的消费进度,实际上就是consumeQueue里的下标
消息到达commitlog文件后,会异步转发到这里对应的主题topic的consumeQueue里,然后供消费者消费
一个consumeQueue文件默认能存30万条消息条目,即占30万*20字节 =600万字节 = 5.8M
存储设计
当消费者拿着topic的消费进度查找消息时,先用消费进度*20算出该条消息在consumeQueue中的实际位置,然后取出20字节拿到该条消息在整个commitlog中的offset和消息长度k;然后通过offset去commitlog中定位到消息的起始位置,然后向后取出k字节数据就是我们想要的数据
consumeQueue也支持按存储时间查找,由于在定位到具体的consumeQueue后采用的二分查找,所以也不是特快
使用consumeQueue查找
在磁盘里每个主题topic会对应一个文件夹,每个主题默认4个分区(Queue),RocketMQ里没有队列的概念
每个分区又会对应一个文件夹,分区文件夹里就是consumeQueue文件,每个文件只有几兆,基本都能读到内存中
consumeQueue里的数据删除了,也可从commitlog里恢复
分区
consumeQueue消息消费队列
commitlog文件只支持按偏移量的快速查找,为了解决按某个属性key的快速查找,所以就有了index文件来存储key与offset的关系
index文件是基于磁盘文件实现的哈希索引
为什么要有index文件?
最小和最大存储时间各占8字节
最小和最大偏移量各占8字节
哈希槽数占4字节(总的槽数)
index条目使用数占4字节
40字节的index文件头
属性key的hash值取余能算出具体位置,里面的值存的在此刻最新index条目的下标
如果有两个key的哈希值一样了,哈希槽里的值是最新index条目的下标,最新index条目的最后4字节存的前一个冲突index条止的下标
500万个哈希槽
之所以采用哈希存储,是为了将index条目设计为定长结构,方便检索与定位条目
属性key的hashCode
物理偏移量offset(就是对应在commitlog文件中的位置)
该时间(timeDiff) = 消息存储时间 - index文件头里的最小存储时间(第一条消息时间戳)
之所以有这个值,是因为消息里的key本身就不是唯一 的,冲突的会比较多,那么在查找消息时(会带上key和消息生成时间),如果遇到冲突的,则通过“消息生成间 - 第一条消息时间” 与timediff作比较,如果前者大则说明些index条目不是我们要找的消息,则继续往前找。所以timediff存在的目的是为了帮助我们快速从众多冲突index条目中快速找到我们想要的如果找到多个index条目,则去把这些消息都查出来,然后比较key的真实内容作判断
时间戳
前一个相同哈希值key对应的index条目下标(占4字节)
index条目结构(每个占20字节)
新增index条目的偏移位置 = index头长度+ 哈希槽数*4 +当前index条目数*20
2000万index条目
consumeQueue在commitlog文件中查找数据时用到了数据长度才能准确将整条消息取出来,为什么index条目查找时,只用了offset定位到了就能取出来,不用长度吗????
疑问?
index存储结构
当出现一个key的哈希值已经存在时,则将哈希槽里的值更新为当前index条目的下标。前一个冲突的下标位置存在最新index条目的最后4个字节里
在查找数据时,通过哈希槽的存的下标找到最后一个相同哈希值的key对应的index条目,然后通过该index条目里的offset找到消息后去验证消息里的key与该key是否一致
如果一致则说明找到了,如果不一致,则通过当前找到的index条目的最后4字节里存的前一个index条目然后再通过当前offset去找到消息验证key,如果不是则继续找前一个条目
解决哈希冲突
index消息索引文件
像topics.json、subscribeOffset.json等配置文件,不要在线上打开此文件,不然会阻塞
config文件
abort文件是启动时创建,正常退出之前删除
启动时若出现abort文件,说明broker是异常退出的
正常退出时,会通过注册JVM的钩子函数删除abort文件
abort文件
存储commitlog文件最后一次刷盘时间戳、consumeQueue最后一次刷盘时间、index索引文件最后一次刷盘时间
checkpoint文件
全量消息都存储在commitlog文件中,然后异步生成转发任务更新ConsumeQueue文件、index文件由于是异步,在broker重启时,就有可能出现数据并没有真正更新到ConsumeQueue和index文件中
加载时,会先判断是否有abort文件,有则是异常退出,没有则是正常退出
commitlog存储了全量消息,checkpoint文件又存储了所有刷盘点,重启时直接根据上次的刷盘点进行重新转发就行
异步停止恢复时,会将最后一个有效文件中的所有消息全部重新转发到ConsumeQueue和index文件中,虽然确保了不丢失消息,但带来了消息重复的问题。所以消费者一定要做幂等设计
如何保证数据一致性?
存储结构
rocketmq的存储与读写是基于JDK NIO的内存映射机制,即先把消息写到内存,然后根据策略在不同的时机刷盘
内存映射就相当于是,一个文件的修改,会被同步到内存里,所以直接读取会很快
文件越大,效率越高
commitLog、consumeQueue、indexFile的单个文件都被设计为固定长度一个文件写满以后再创建一个文件,文件名是第一条消息对应的全局物理偏移量
内存映射
通过broker里的该属性配置是异步还是同步刷盘
flushDiskType
消息写入pageCache之后,立刻通知刷盘并等待,刷完了后再唤醒该线程,最后返回写入成功状态
同步刷盘SYNC_FLUSH
消息只是写了内存的页面缓存里就返回成功,会有一个单独的线程按照某个频率,统一执行刷盘操作
异步刷盘,可批量刷,提高性能但是需要通过集群或数据备份保证宕机不丢数据
堆内存——>堆外内存——>映射内存——>磁盘
异步刷盘ASYNC_FLUSH
consumeQueue的刷盘方式固定为异步刷盘,反正丢了也能重新生成
index文件是每更新一次index文件就将上一次的改动写入磁盘
补充
文件刷盘机制
commitLog、consumeQueue共用一套过期文件机制
删除由定时任务来做,默认10秒执行一次
通过fileReservedTime配置文件保留时间,默认72小时,超过这个时间该文件没有再次更新则认为过期
当多个物理文件删除时,需要间隔一段时间再删除另外一个
deletePhysicFilesInterval:默认100毫秒
删除文件时间间隔
destoryMapedFileIntervalForcibly
删除的文件还在被引用时,允许再等一段时间才删除
过期判断
deleteWhen:04 ,表示凌晨4点删除文件
指定删除时间点
diskSpaceCleanForciblyRatio:文件删除水位,默认85就会删除
磁盘是否充足
diskSpaceWarningLevelRatio:默认达到90%就会阻止新消息插入
物理使用率
diskMaxUsedSpaceRatio:默认75%,在该值以下表示正常
物理磁盘使用率
删除条件
过期文件删除
消息到达主之后,会同步到从服务上,一旦主服务挂了,从服务可以继续提供消费
broker主从机制
单Master
性能最高,但是有一个master宕机期间,未被消费的消息不能消费
多主无从
线上数据要求高的推荐,主与从之间采用同步方式复制数据,但刷盘方式都设置为异步刷盘
多主多从同步复制
主挂了,从还可以继续提供消息的消费。可能会丢失消息
多主多从异步复制
集群部署模式
rocketmq默认就有了2主2从同步、2主2从异步、2主无从三种部署方式配置
主服务监听从服务连接,从服务主动连接主服务器
从服务向主拉取消息偏移
主服务判断从服务中commitlog最大偏移,如果小于主服务中commitlog偏移,则主向从返回消息
主从复制原理
消费者从主服务拉取消息,如果主服务很忙或挂了,则从从服务拉取
读写分离机制
主从同步机制(HA)
消息实时性高,没有消息不用做其他的操作
优点
接收方可能处理不了这么多消息
缺点
可以有一个消息推一个,也可以缓存一批之后再推送
推模式
接收方可根据自己的情况来决定拉消息:处理不过来等会儿拉,或暂停拉取
消费者可以定义缓存多少数据后批量拉取回来
消息延迟:消费者不能频繁拉取,所以会有一定的延迟
太多无用拉请求:某段时间内一直没消息,则循环拉请求就会做无用功
默认都是推模式
生产者
拉模式,推也是拉封装的
消费者
推还是拉?
rocketMQ的推模式,是封装的拉模式,是先把数据拉到消费端,再做的推送
RebalanceService 线程会根据 topic 的队列数量和当前消费组的消费者个数做负载均衡,每个队列产生的 pullRequest 放入阻塞队列 pullRequestQueue 中
PullMessageService 线程不断的从阻塞队列 pullRequestQueue 中获取 pullRequest,然后通过网络请求 broker,这样实现的准实时拉取消息
消费端
PullMessageProcessor 里面的 processRequest 方法是用来处理拉消息请求的,有消息就直接返回
如果没有消息,且拉请求允许,则会b style=\
ReputMessageService 线程,这个线程用来不断地从 commitLog 中解析数据并分发请求,构建出 ConsumeQueue 和 IndexFile 两种类型的数据,并且也会有唤醒请求的操作
Broker
RocketMQ的长轮询
消费者去 Broker 拉消息,传了一个超时时间,如果有消息立马返回如 果没有的话,一直等到超时。超 时之后重新发起拉请求
Kafka 和 RocketMQ 的机制一样,也会在消息写入的时候提醒这些延迟请求消息来了
Kafka 中的长轮询
推拉比较
RocketMQ
0 条评论
下一页