RocketMQ面试题
2024-02-15 22:30:41 18 举报
AI智能生成
RocketMQ面试题主要涉及消息队列的基本概念、RocketMQ的架构和组件、消息的发送与消费方式、消息的顺序性和可靠性保证、消息的过滤和路由机制等方面。此外,还可能考察如何进行消息的持久化存储、如何实现消息的高可用性、如何处理消息的延迟和重试机制等。面试者需要具备对消息队列技术的理解和应用能力,能够根据实际场景选择合适的消息队列解决方案,并能够解决常见的问题和挑战。
作者其他创作
大纲/内容
参考自
订单支付成功需要调用库存服务、营销服务等,引入消息队列,下游服务自己去调用即可,不仅解耦还缩短链路
解耦
降低响应时间
异步
削峰填谷
削峰
1 为什么使用消息队列
思路:从可靠性、性能、吞吐量考虑、社区活跃度
ActiveMQ,老古董了,不考虑
性能和吞吐量不太理想
RabbitMQ
虽然性能和吞吐量比较好,但是延迟比较高
Kafka
性能好,高吞吐量,稳定可靠,低延迟
RocketMQ
技术选型对比
综合考虑:面向C端客户,对性能、吞吐量、低延迟、可靠有较高的要求,所以选择RocketMQ
2 为什么选择RocketMQ
单机吞吐量:十万级
可用性高
可靠性高
功能完善,有顺序消息、事务消息、定时消息
支持10亿级别的消息堆积,不会因为堆积导致性能下降
分布式架构,扩展性高
电商业务场景在阿里双11已经经历了多次考验
优点
支持的客户端语言只有Java和C++(不成熟)
缺点
3 RocketMQ优缺点
多个生产者往一个队列发消息,一个队列被多个消费者消费,每条消息只能被一个消费者消费
队列模型
生产者发布消息到主题中,消费者从主题中订阅消息,每份订阅中,订阅者都可以接收到主题的所有消息
发布/订阅模型
4 消息队列有哪些模型
5 RocketMQ的消息模型是怎么样的
消费者组内的消费者共同分担消息,一个队列只被一个消费者消费
集群模式(默认)
消费者组内的消费者独立消费全量消息
广播模式
6 消息的模式有哪些
RocketMQ 一共有四个部分组成:NameServer,Broker,Producer 生产者,Consumer 消费者,它们对应了:发现、存、发、收,为了保证高可用,一般每一部分都是集群部署的
7 RocketMQ的基本架构是怎么样的?
Broker、Producer、Consumer三者都与NameServer建立长连接且会定时注册/获取topic路由信息,Broker是向所有节点建立链接,而Producer和Consumer只选择其中一个;Producer只与Broker的Master建立长连接(要发消息,只选Master写入),而Consumer与Master、Slave建立长连接(要订阅消息,可以从Master/Slave读取);
通信方面
管理Broker(心跳检测)
管理Topic、Broker的路由信息
NameServer
消息存储和转发
Broker 内部维护着一个个 Consumer Queue,用来存储消息的索引,真正存储消息的地方是 CommitLog(日志文件)
单个Broker与所有NameServer保持长连接(底层是Netty),发送心跳包和Topic注册
Broker
分布式集群部署,通过负载策略(默认是轮询)发给Broker集群
同步阻塞,有响应。一般用于重要消息通知,比如重要邮件通知、营销短信
同步发送
异步非阻塞,有响应。一般用于链路耗时较长且对响应时间敏感的场景,比如用户视频上传后通知启动转码服务
异步发送
只负责发送,没有响应。适用于用于某些耗时非常短但对可靠性要求并不高的场景,例如日志收集
单向发送
3种发送方式
Producer
集群模式
2种消费模式(消息模式)
自己拉取,自己控制重试
Pull型
消费者发起长轮询给Broker,一有消息就会投递给消费者
有维护好的重试机制、消费进度管理
Push型
2种消费者类型
消费者
功能方面
8 能介绍下这四部分吗?
通过请求确认机制保证可靠传递
有重试机制,默认3次
业务方做异常兜底(重试机制不一定能发送成功)
同步发送,如果响应为发送失败或异常,应该重试
异步发送,在回调方法检查,发送失败或异常,应该重试
请求确认机制做保障
生产阶段
配置可靠性优先的Broker参数避免宕机丢消息
消息只要持久化到CommitLog(日志文件)中,即使Broker宕机,未消费的消息也能重新恢复再消费。
Broker的刷盘机制:同步刷盘和异步刷盘,不管哪种刷盘都可以保证消息一定存储在pagecache中(内存中),但是同步刷盘更可靠,它是Producer发送消息后等数据持久化到磁盘之后再返回响应给Producer。
Broker通过主从模式保证高可用
存储阶段
消费重试机制,默认16次
消费进度管理
消费阶段
从3个阶段考虑
9 如何保证消息不丢失?
保证每条消息都有一个惟一的编号,通常是业务相关的,比如订单号,消费的记录需要落库,而且需要保证和消息确认这一步的原子性。
业务幂等
消息去重
业务方自己实现
10 如何处理消息重复的问题呢?
考虑:消息积压是由于消息未被消费,堆积导致的,考虑提高消费能力即可
当消息Queue的数量>消费者数量时,增加消费者来提高消费能力
消费者扩容
当消息Queue的数量<=消费者数量时,再扩容消费者就没什么用了,就得考虑扩容消息Queue。新建一个临时topic,设置多一些Queue,使用部分消费者转发消息到新的topic,转发很快的,用新的消费者消费新topic,消费完后恢复原样
消息迁移Queue扩容
合理调整线程池的参数,提高消费的并发性,加快消费
调整消费者线程池的配置
如果不是顺序消息,考虑使用线程池将消费逻辑异步化
优化消费逻辑的代码
可能由于消费失败导致重试,才导致了消息积压,合理调整重试的策略
调整消费重试策略
11 怎么处理消息积压?
效果显著,而且易实现、可靠
是什么:顺序消息是指消费顺序与生产顺序一致
某个 Topic 下的所有消息都要保证顺序;
全局顺序
只要保证每一组消息被顺序消费即可
部分顺序
2种顺序消息
RocketMQ 默认情况下不保证顺序,比如创建一个 Topic ,默认八个写队列,八个读队列,这时候一条消息可能被写入任意一个队列里;在数据的读取过程中,可能有多个 Consumer ,每个 Consumer 也可能启动多个线程并行处理,所以消息被哪个 Consumer 消费,被消费的顺序和写人的顺序是否一致是不确定的。
背景
为了保证整个 Topic全局消息有序,只能消除所有的并发处理,各部分都设置成单线程处理 ,这时候就完全牺牲RocketMQ的高并发、高吞吐的特性了
先把 Topic 的读写队列数设置为 一,然后Producer Consumer 的并发设置,也要是一
同一个生产者将同一组消息 串行地发送给同一个队列
消费者不能并发处理顺序消息(代码中要使用顺序消费模式)
如何实现
12 顺序消息如何实现
按照 Consumer 的去重逻辑进行过滤,这样做的好处是避免了无用的消息传输到 Consumer 端,缺点是加重了 Broker 的负担,实现起来相对复杂
Filter Server过滤
Broker端过滤
consumer.subscribe(\"TOPIC\
Tag过滤
SQL过滤
Consumer端过滤
总结:一般采用Cosumer端过滤,如果希望提高吞吐量,可以采用Broker过滤。
13 如何实现消息过滤
实现起来简单,缺点是有大量无用的消息到达了 Consumer 端只能丢弃不处理。
private String messageDelayLevel = \"1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h\";
目前RocketMQ支持的延时级别是有限的
Broker收到延时消息了,会先发送到主题(SCHEDULE_TOPIC_XXXX)的相应时间段的Message Queue中,然后通过一个定时任务 轮询这些队列,到期后,把消息投递到目标Topic的队列中,然后消费者就可以正常消费这些消息。
总结:临时存储+定时任务
怎么实现延时消息
14 延时消息了解吗?
半消息:是指暂时还不能被 Consumer 消费的消息,Producer 成功发送到 Broker 端的消息,但是此消息被标记为 “暂不可投递” 状态,只有等 Producer 端执行完本地事务后经过二次确认了之后,Consumer 才能消费此条消息。
事务消息的流程
15 怎么实现分布式消息事务的?
死信队列用于处理无法被正常消费的消息,即死信消息
当一条消息初次消费失败,消息队列 RocketMQ 会自动进行消息重试;达到最大重试次数后,若消费依然失败,则表明消费者在正常情况下无法正确地消费该消息,此时,消息队列 RocketMQ 不会立刻将消息丢弃,而是将其发送到该消费者对应的特殊队列中,该特殊队列称为死信队列。
不再被消费者消费
有效期与正常消息相同,均为 3 天,3 天后会被自动删除。因此,需要在死信消息产生后的 3 天内及时处理。
死信消息的特点
一个死信队列对应一个 Group ID, 而不是对应单个消费者实例
如果一个 Group ID 未产生死信消息,消息队列 RocketMQ 不会为其创建相应的死信队列
一个死信队列包含了对应 Group ID 产生的所有死信消息,不论该消息属于哪个 Topic
死信队列的特点
16 死信队列知道吗?
因为是无状态,且不相互通信的,所以只要集群部署就可以保证高可用
概述:Broker的高可用是通过集群和主从实现的
Broker有2种角色,Master和Slave。Producer只能向Master角色的Broker写入消息,Cosumer可以从Master和Slave角色的Broker读取消息。
Consumer 的配置文件中,并不需要设置是从 Master 读还是从 Slave读,当 Master 不可用或者繁忙的时候, Consumer 的读请求会被自动切换到从 Slave。有了自动切换 Consumer 这种机制,当一个 Master 角色的机器出现故障后,Consumer 仍然可以从 Slave 读取消息,不影响 Consumer 读取消息,这就实现了读的高可用。
消费端高可用
在创建 Topic 的时候,把 Topic 的多个Message Queue 创建在多个 Broker 组上(相同 Broker 名称,不同 brokerId机器组成 Broker 组),这样当 Broker 组的 Master 不可用后,其他组Master 仍然可用, Producer 仍然可以发送消息 RocketMQ 目前还不支持把Slave自动转成 Master ,如果机器资源不足,需要把 Slave 转成 Master ,则要手动停止 Slave 色的 Broker ,更改配置文件,用新的配置文件启动 Broker。这就实现了写的高可用
发送端高可用
17 如何保证RocketMQ的高可用?
作为消息队列,它是发-存-收的一个模型,对应的就是Producer、Broker、Cosumer;
作为分布式系统,它要有服务端、客户端、注册中心,对应的就是Broker、Producer/Consumer、NameServer
RocketMQ是一个分布式消息队列,也就是消息队列+分布式系统
它主要的工作流程:RocketMQ由NameServer注册中心集群、Producer生产者集群、Consumer消费者集群和若干Broker(RocketMQ进程)组成
1 Broker在启动的时候去向所有的NameServer注册,并保持长连接,每30s发送一次心跳
2 Producer在发送消息的时候从NameServer获取Broker服务器(Master)地址,根据负载均衡算法选择一台服务器来发送消息
3 Conusmer消费消息的时候同样从NameServer获取Broker(Master/Slave)地址,然后主动拉取消息来消费(长轮询,主动发起请求,服务端有消息则推送消息给Consumer)
交互流程
18 说一下RocketMQ的整体工作流程?
基于可用性的考虑,根据CAP理论,同时最多只能满足两个点,而Zookeeper满足的是CP,也就是说Zookeeper并不能保证服务的可用性,Zookeeper在进行选举的时候,整个选举的时间太长,期间整个集群都处于不可用的状态,而这对于一个注册中心来说肯定是不能接受的,作为服务发现来说就应该是为可用性而设计
基于性能的考虑,NameServer本身的实现非常轻量,而且可以通过增加机器的方式水平扩展,增加集群的抗压能力,而Zookeeper的写是不可扩展的(ZK的写只能由Leader写入并且要同步给所有节点才能认为写入成功,存在延迟,影响性能),Zookeeper要解决这个问题只能通过划分领域,划分多个Zookeeper集群来解决,首先操作起来太复杂,其次这样还是又违反了CAP中的A的设计,导致服务之间是不连通的
持久化的机制来带的问题,ZooKeeper 的 ZAB 协议对每一个写请求,会在每个 ZooKeeper 节点上保持写一个事务日志,同时再加上定期的将内存数据镜像(Snapshot)到磁盘来保证数据的一致性和持久性,而对于一个简单的服务发现的场景来说,这其实没有太大的必要,这个实现方案太重了。而且本身存储的数据应该是高度定制化的。
消息发送应该弱依赖注册中心,而RocketMQ的设计理念也正是基于此,生产者在第一次发送消息的时候从NameServer获取到Broker地址后缓存到本地,如果NameServer整个集群不可用,短时间内对于生产者和消费者并不会产生太大影响
19 为什么RocketMQ不采用Zookeeper作为注册中心呢?
CommitLog文件
ConsumeQueue文件
Index文件
RocketMQ主要的存储文件包括
ConsumeQueue:消息消费队列,引入的目的主要是提高消息消费的性能。Consumer根据ConsumeQueue来查找待消费的消息。其中,ConsumeQueue(逻辑消费队列)作为消费消息的索引,保存了指定Topic下的队列消息在CommitLog中的起始物理偏移量offset,消息大小size和消息Tag的HashCode值。ConsumeQueue文件可以看成是基于Topic的CommitLog索引文件,故ConsumeQueue文件夹的组织方式如下:topic/queue/file三层组织结构,具体存储路径为:$HOME/store/consumequeue/{topic}/{queueId}/{fileName}。同样ConsumeQueue文件采取定长设计,可以像数组一样随机访问每一个条目
IndexFile:IndexFile(索引文件)提供了一种可以通过key或时间区间来查询消息的方法。Index文件的存储位置是: {fileName},文件名fileName是以创建时的时间戳命名的,固定的单个IndexFile文件大小约为400M,一个IndexFile可以保存 2000W个索引,IndexFile的底层存储设计为在文件系统中实现HashMap结构,故RocketMQ的索引文件其底层实现为hash索引。
消息存储的整体设计
RocketMQ采用的是混合型的存储结构,即为Broker单个实例下所有的队列共用一个日志数据文件(即为CommitLog)来存储。
针对Producer和Consumer分别采用了数据(CommitLog)和索引(ConsumeQueue、IndexFile)部分相分离的存储结构,Producer发送消息至Broker端,然后Broker端使用同步或者异步的方式对消息刷盘持久化,保存至CommitLog中。
只要消息被刷盘持久化至磁盘文件CommitLog中,那么Producer发送的消息就不会丢失。正因为如此,Consumer也就肯定有机会去消费这条消息。当无法拉取到消息后,可以等下一次消息拉取,同时服务端也支持长轮询模式,如果一个消息拉取请求未拉取到消息,Broker允许等待30s的时间,只要这段时间内有新消息到达,将直接返回给消费端。这里,RocketMQ的具体做法是,使用Broker端的后台服务线程—ReputMessageService不停地分发请求并异步构建ConsumeQueue(逻辑消费队列)和IndexFile(索引文件)数据。
总结
20 Broker是怎么保存数据的呢?
RocketMQ对文件的读写巧妙地利用了操作系统的一些高效文件读写方式——PageCache、顺序读写、零拷贝
在RocketMQ中,ConsumeQueue逻辑消费队列存储的数据较少,并且是顺序读取,在page cache机制的预读取作用下,Consume Queue文件的读性能几乎接近读内存,即使在有消息堆积情况下也不会影响性能。而对于CommitLog消息存储的日志数据文件来说,读取消息内容时候会产生较多的随机访问读取,严重影响性能。如果选择合适的系统IO调度算法,比如设置调度算法为“Deadline”(此时块存储采用SSD的话),随机读的性能也会有所提升。
页缓存(PageCache)是OS对文件的缓存,用于加速对文件的读写。一般来说,程序对文件进行顺序读写的速度几乎接近于内存的读写速度,主要原因就是由于OS使用PageCache机制对读写访问操作进行了性能优化,将一部分的内存用作PageCache。对于数据的写入,OS会先写入至Cache内,随后通过异步的方式由pdflush内核线程将Cache内的数据刷盘至物理磁盘上。对于数据的读取,如果一次读取文件时出现未命中PageCache的情况,OS从物理磁盘上访问读取文件的同时,会顺序对其他相邻块的数据文件进行预读取。
PageCache、顺序读取
另外,RocketMQ主要通过MappedByteBuffer对文件进行读写操作。其中,利用了NIO中的FileChannel模型将磁盘上的物理文件直接映射到用户态的内存地址中(这种Mmap的方式减少了传统IO,将磁盘文件数据在操作系统内核地址空间的缓冲区,和用户应用程序地址空间的缓冲区之间来回进行拷贝的性能开销),将对文件的操作转化为直接对内存地址进行操作,从而极大地提高了文件的读写效率(正因为需要使用内存映射机制,故RocketMQ的文件存储都使用定长结构来存储,方便一次将整个文件映射至内存)
零拷贝
1. 从磁盘复制数据到内核态内存;2. 从内核态内存复制到用户态内存;3. 然后从用户态内存复制到网络驱动的内核态内存;4. 最后是从网络驱动的内核态内存复制到网卡中进行传输。
所以,可以通过零拷贝的方式,减少用户态与内核态的上下文切换和内存拷贝的次数,用来提升I/O的性能。零拷贝比较常见的实现方式是mmap,这种机制在Java中是通过MappedByteBuffer实现的
说说什么是零拷贝?
21 说说RocketMQ怎么对文件进行读写的?
在消息达到Broker的内存之后,必须刷到commitLog日志文件中才算成功,然后返回Producer数据已经发送成功。
同步刷盘
异步刷盘是指消息达到Broker内存后就返回Producer数据已经发送成功,会唤醒一个线程去将数据持久化到CommitLog日志文件中。
异步刷盘
提供了2种策略
Broker 在消息的存取时直接操作的是内存(内存映射文件),这可以提供系统的吞吐量,但是无法避免机器掉电时数据丢失,所以需要持久化到磁盘中。
刷盘的最终实现都是使用NIO中的 MappedByteBuffer.force() 将映射区的数据写入到磁盘,如果是同步刷盘的话,在Broker把消息写到CommitLog映射区后,就会等待写入完成。
异步而言,只是唤醒对应的线程,不保证执行的时机,流程如图所示。
22 消息刷盘怎么实现的呢?
Producer端在发送消息的时候,会先根据Topic找到指定的TopicPublishInfo,在获取了TopicPublishInfo路由信息后,RocketMQ的客户端在默认方式下selectOneMessageQueue()方法会从TopicPublishInfo中的messageQueueList中选择一个队列(MessageQueue)进行发送消息。具这里有一个sendLatencyFaultEnable开关变量,如果开启,在随机递增取模的基础上,再过滤掉not available的Broker代理。
所谓的\"latencyFaultTolerance\",是指对之前失败的,按一定的时间做退避。例如,如果上次请求的latency超过550Lms,就退避3000Lms;超过1000L,就退避60000L;如果关闭,采用随机递增取模的方式选择一个队列(MessageQueue)来发送消息,latencyFaultTolerance机制是实现消息发送高可用的核心关键所在
Producer端发送消息时候的负载均衡
Consumer端的两种消费模式(Push/Pull)都是基于拉模式来获取消息的,而在Push模式只是对pull模式的一种封装,其本质实现为消息拉取线程在从服务器拉取到一批消息后,然后提交到消息消费线程池后,又“马不停蹄”的继续向服务器再次尝试拉取消息。如果未拉取到消息,则延迟一下又继续拉取。在两种基于拉模式的消费方式(Push/Pull)中,均需要Consumer端知道从Broker端的哪一个消息队列中去获取消息。因此,有必要在Consumer端来做负载均衡,即Broker端中多个MessageQueue分配给同一个ConsumerGroup中的哪些Consumer消费
在Consumer启动后,它就会通过定时任务不断地向RocketMQ集群中的所有Broker实例发送心跳包(其中包含了,消息消费分组名称、订阅关系集合、消息通信模式和客户端id的值等信息)。Broker端在收到Consumer的心跳消息后,会将它维护在ConsumerManager的本地缓存变量—consumerTable,同时并将封装后的客户端网络通道信息保存在本地缓存变量—channelInfoTable中,为之后做Consumer端的负载均衡提供可以依据的元数据信息。
Consumer端的心跳包发送
(1) 从rebalanceImpl实例的本地缓存变量—topicSubscribeInfoTable中,获取该Topic主题下的消息消费队列集合(mqSet);(2) 根据topic和consumerGroup为参数调用mQClientFactory.findConsumerIdList()方法向Broker端发送通信请求,获取该消费组下消费者Id列表;(3) 先对Topic下的消息消费队列、消费者Id排序,然后用消息队列分配策略算法(默认为:消息队列的平均分配算法),计算出待拉取的消息队列。这里的平均分配算法,类似于分页的算法,将所有MessageQueue排好序类似于记录,将所有消费端Consumer排好序类似页数,并求出每一页需要包含的平均size和每个页面记录的范围range,最后遍历整个range而计算出当前Consumer端应该分配到的的MessageQueue。(4)太长了,详情见https://javabetter.cn/sidebar/sanfene/rocketmq.html#_22-%E8%83%BD%E8%AF%B4%E4%B8%8B-rocketmq-%E7%9A%84%E8%B4%9F%E8%BD%BD%E5%9D%87%E8%A1%A1%E6%98%AF%E5%A6%82%E4%BD%95%E5%AE%9E%E7%8E%B0%E7%9A%84
在Consumer实例的启动流程中的启动MQClientInstance实例部分,会完成负载均衡服务线程—RebalanceService的启动(每隔20s执行一次)。通过查看源码可以发现,RebalanceService线程的run()方法最终调用的是RebalanceImpl类的rebalanceByTopic()方法,这个方法是实现Consumer端负载均衡的核心。rebalanceByTopic()方法会根据消费者通信类型为“广播模式”还是“集群模式”做不同的逻辑处理
Consumer端实现负载均衡的核心类—RebalanceImpl
Consumer端订阅消息的负载均衡
RocketMQ中的负载均衡都在Client端完成,主要可以分为
23 能说下 RocketMQ 的负载均衡是如何实现的?
所谓的长轮询,就是Consumer 拉取消息,如果对应的 Queue 如果没有数据,Broker 不会立即返回,而是把 PullReuqest hold起来,等待 queue 有了消息后,或者长轮询阻塞时间到了,再重新处理该 queue 上的所有 PullRequest。
24 RocketMQ消息长轮询了解吗?
RocketMQ面试题
0 条评论
下一页