Spring Cloud Alibaba 微服务框架 RocketMQ:分布式消息队列
2022-05-21 12:12:46 1 举报
AI智能生成
Spring Cloud Alibaba 微服务框架 RocketMQ 分布式消息队列
作者其他创作
大纲/内容
RocketMQ介绍
概要
系统间的通信协作方式
HTTP/RPC通信:优点是通信实时,缺点是服务之间的耦合性高
消息队列通信:优点是降低了服务之间的耦合性,提高了系统的处理能力,缺点是通信非实时
Rocket简介
RocketMQ是一个低延迟、高可靠、可伸缩、易于使用的分布式消息中间件(也称消息队列)
RocketMQ具有高吞吐、低延迟、海量消息堆积等优点,同时提供顺序消息、事务消息、定时消息、消息重试与追踪等功能
非常适合在电商、金融等领域广泛使用
RocketMQ的应用场景
削峰填谷
诸如秒杀、抢红包、企业开门红等大型活动皆会带来较高的流量脉冲,很可能因没做相应的保护而导致系统超负荷甚至崩溃
或因限制太过导致请求大量失败而影响用户体验,RocketMQ可提供削峰填谷的服务来解决这些问题
异步解耦
交易系统作为淘宝/天猫主站最核心的系统,每笔交易订单数据的产生会引起几百个下游业务系统的关注
包括物流、购物车、积分、流计算分析等,整体业务系统庞大而且复杂,RocketMQ可实现异步通信和应用解耦,确保主站业务的连续性
顺序收发
证券交易过程中的时间优先原则,交易系统中的订单创建、支付、退款等流程,航班中的旅客登机消息处理等
与先进先出(First In First Out,缩写FIFO)原理类似,RocketMQ提供的顺序消息即保证消息的FIFO
分布式事务一致性
交易系统、红包等场景需要确保数据的最终一致性,大量引入RocketMQ的分布式事务
既可以实现系统之间的解耦,又可以保证最终的数据一致性
大数据分析
数据在“流动”中产生价值,传统数据分析大都基于批量计算模型,无法做到实时的数据分析
利用RocketMQ与流式计算引擎相结合,可以很方便地实现对业务数据进行实时分析
分布式缓存同步
双11大促商品需要实时感知价格的变化,大量并发访问会导致会场页面响应时间长
集中式缓存因为带宽瓶颈限制商品变更的访问流量,通过RocketMQ构建分布式缓存,可实时通知商品数据的变化
RocketMQ的安装
RocketMQ依赖Java环境,要求有JDK 1.8以上版本
RocketMQ支持三种集群部署模式
RocketMQ的安装方式有两种,一种是源码安装,另一种是使用已经编译好直接可用的安装包
按源码的方式安装步骤
在官网下载RocketMQ的最新版本4.6.0
解压源码并编译打包
启动集群管理NameServer,默认端口是9876
启动消息服务器Broker,指定NameServer的IP地址和端口,支持指定多个NameServer
RocketMQ如何发送消息
使用Spring Cloud Stream对RocketMQ发送消息
在pom.xml中引入jar包:spring-cloud-stream-binder-rocketmq
配置application.properties
name-server指定RocketMQ的NameServer地址
将指定名称为output的Binding消息发送到TopicTest
使用Binder发送消息
@EnableBinding({Source.class})表示绑定配置文件中名称为output的消息通道Binding
Source类中定义的消息通道名称为output
发送HTTP请求http://localhost:8081/send?msg=tcever将消息发送到RocketMQ中
自定义消息通道的名称
RocketMQ如何消费消息
使用Spring Cloud Stream对RocketMQ 接收消息
在pom.xml中引入Jar包:spring-cloud-stream-binder-rocketmq
配置application.properties
name-server指定RocketMQ的NameServer地址
将指定名称为input的Binding消息接收TopicTest消息
定义消息监听消费消息
@EnableBinding({Sink.class})表示绑定配置文件中名称为input的消息通道Binding
Sink类中定义的消息通道的名称为input
@StreamListener表示定义一个消息监听器,接收RocketMQ中的消息
自定义消息通道的名称
RocketMQ集群管理
概要
在分布式服务SOA架构中,任何中间件或者应用都不允许单点存在
服务发现机制是必备的
服务实例有多个,且数量是动态变化的
注册中心会提供服务管理能力,服务调用方在注册中心获取服务提供者的信息,从而进行远程调用
整体架构设计
解决kafka不支持的功能
消费失败重试
定时消息
事务消息
顺序消息有明显缺陷
RocketMQ部署架构图
图片
RocketMQ部署架构中的角色
Producer:消息发布的角色,主要负责把消息发送到Broker,支持分布式集群方式部署。
Consumer:消息消费者的角色,主要负责从Broker订阅消息消费,支持分布式集群方式部署。
Broker:消息存储的角色,主要负责消息的存储、投递和查询,以及服务高可用保证,支持分布式集群方式部署。
NameServer:服务管理的角色,主要负责管理Broker集群的路由信息,支持分布式集群方式部署。
RocketMQ的基本概念
Message:消息,系统所传输信息的物理载体,生产和消费数据的最小单位
每条消息必须属于一个Topic
每条消息拥有唯一的MessageID,且可以携带具有业务标识的Key
Topic:主题,表示一类消息的集合
每个主题都包含若干条消息
每条消息都只能属于一个主题
Topic是RocketMQ进行消息订阅的基本单位
Queue:消息队列,组成Topic的最小单元
默认情况下一个Topic会对应多个Queue
Topic是逻辑概念,Queue是物理存储
在Consumer消费Topic消息时底层实际则拉取Queue的消息
Tag:为消息设置的标志,用于同一主题下区分不同类型的消息
来自同一业务单元的消息,可以根据不同业务目的在同一主题下设置不同标签
标签能够有效地保持代码的清晰度和连贯性,并优化RocketMQ提供的查询系统
消费者可以根据Tag实现对不同子主题的不同消费的处理逻辑,实现更好的扩展性
UserProperties:用户自定义的属性集合,属于Message的一部分
ProducerGroup:同一类Producer的集合
这类Producer发送同一类消息且发送逻辑一致
如果发送的是事务消息且原始生产者在发送之后崩溃,则Broker服务器会联系同一生产者组的其他生产者实例以提交或回溯消费
ConsumerGroup:同一类Consumer的集合
这类Consumer通常消费同一类消息且消费逻辑一致
消费者组使得在消息消费方面,实现负载均衡和容错的目标变得非常容易
要注意的是,消费者组的消费者实例必须订阅完全相同的Topic
RocketMQ的注册中心NameServer
NameServer简介
是一个非常简单的Topic路由注册中心
其角色类似于Dubbo中依赖的ZooKeeper,支持Broker的动态注册与发现
核心功能
服务注册发现
NameServer接收Broker集群的注册信息,保存下来作为路由信息的基本数据
并提供心跳检测机制,检查Broker是否还存活
路由信息管理
NameServer保存了Broker集群的路由信息,用于提供给客户端查询Broker的队列信息
Producer和Consumer通过NameServer可以知道Broker集群的路由信息,从而进行消息的投递和消费
不采用zookeeper选举的原因
ZooKeeper具备选举功能,选举机制的原理就是少数服从多数
ZooKeeper的选举机制必须由ZooKeeper集群中的多个实例共同完成
ZooKeeper集群中的多个实例必须相互通信,如果实例数很多,网络通信就会变得非常复杂且低效
NameServer的设计简介
NameServer的设计目标是让网络通信变简单,从而使性能得到极大的提升
为了避免单点故障,NameServer也必须以集群的方式部署,但集群中各实例间相互不进行网络通信
NameServer是无状态的,可以任意部署多个实例
Broker向每一台NameServer注册自己的路由信息,因此每一个NameServer实例都保存一份完整的路由信息
NameServer中的Broker、Topic等状态信息不会持久存储,都是由各个角色定时上报并存储到内存中的(NameServer支持配置参数的持久化,一般用不到)
NameServer与每台Broker机器保持长连接,间隔30s从路由注册表中将故障机器移除
NameServer为了降低实现的复杂度,并不会立刻通知客户端的Producer和Consumer
NameServer是CAP中的AP架构
NameServer的心跳机制
单个Broker跟所有NameServer保持心跳请求,心跳间隔为30秒,心跳请求中包括当前Broker所有的Topic信息。NameServer会反查Broker的心跳信息, 如果某个Broker在2分钟之内都没有心跳,则认为该Broker下线,调整Topic跟Broker的对应关系。但此时NameServer不会主动通知Producer、Consumer有Broker宕机
Consumer跟Broker是长连接,会每隔30秒发心跳信息到Broker。Broker端每10秒检查一次当前存活的Consumer,若发现某个Consumer 2分钟内没有心跳, 就断开与该Consumer的连接,并且向该消费组的其他实例发送通知,触发该消费者集群的负载均衡(rebalance)
Producer每30秒从NameServer获取Topic跟Broker的映射关系,更新到本地内存中。再跟Topic涉及的所有Broker建立长连接,每隔30秒发一次心跳。 在Broker端也会每10秒扫描一次当前注册的Producer,如果发现某个Producer超过2分钟都没有发心跳,则断开连接
NameServer集群环境下可能出现的问题
问题1:当某个NameServer因宕机或网络问题下线了,Broker如何同步路由信息?
由于Broker会连接NameServer集群的每个实例
Broker仍然可以向其他NameServer同步其路由信息,Producer和Consumer仍然可以动态感知Broker的路由信息
问题2:NameServer如果检测到Broker宕机,并没有立即通知Producer和Consumer, Producer将消息发送到故障的Broker怎么办?Consumer从Broker订阅消息失败怎么办?
问题3:由于NameServer集群中的实例相互不通信,在某个时间点不同NameServer实例保存的路由注册信息可能不一致
这对发送消息和消费消息并不会有什么影响
解决方案:由客户端解决,Producer和Consumer操作失败重试(Consumer消费也只是出现多一点延迟而已)
RocketMQ与Kafka集群部署对比
Kafka的部署拓扑图
在Kafka中,Topic是逻辑概念,分区(Partition)是物理概念。
1个Topic可以设置多个分区,每个分区可以设置多个副本(Replication),即有1个Master分区、多个Slave分区
拓扑图
图片
拓扑描述
搭建3个Broker构成一个集群,创建一个Topic取名为TopicA,分区是3个,副本数是2个
part表示分区,M表示Master,S表示Slave
在Kafka中消息只能发送到Master分区中,消息发送给Topic时会发送到具体某个分区
如果发送给part0就只会发送到Broker0这个实例
再由Broker0同步到Broker1和Broker2中的part0副本中
如果发送给part1就只会发送到Broker1这个实例,再由Broker1同步到Broker0和Broker2中的part1副本中
RocketMQ的部署拓扑图
在RocketMQ中,Topic也是逻辑概念,队列(Queue)是物理概念(对应Kafka中的分区)。
1个Topic可以设置多个队列,每个队列也可以有多个副本,即有1个Master队列、多个Slave队列。
拓扑图
图片
拓扑描述
同样创建了一个Topic取名为TopicA,队列是3个,副本数也是2个,但构成Broker集群的实例有9个(为什么有9个?)
对比分析
在Kafka中
Master和Slave在同一台Broker机器上
Broker机器上有多个分区,每个分区的Master/Slave身份是在运行过程中选举出来的
Broker机器具有双重身份,既有Master分区,也有Slave分区
在RocketMQ中
Master和Slave不在同一台Broker机器上(为什么不在同一台机器上?)
每台Broker机器不是Master就是Slave,只能是一个身份
Broker的Master/Slave身份是在Broker的配置文件中预先定义好的,在Broker启动之前就已经决定了
如何实现顺序消息
顺序消息的使用场景
交易系统中的订单创建、支付、退款等流程,先创建订单才能支付,支付完成的订单才能退款,这需要保证先进先出(缩写FIFO)
数据库的BinLog消息,数据库执行新增语句、修改语句,BinLog消息的顺序也必须保证是新增消息、修改消息
如何发送和消费顺序消息
顺序发送(局部有序)
在applicaton.properties文件中设置同步发送
指定producer.sync=true
默认是异步发送,此处改为同步发送
MessageBuilder设置Header信息头,表示这是一条顺序消息,将消息固定地发送到第0个消息队列
顺序消费(局部有序)
在applicaton.properties文件中设置顺序消费
指定consumer.orderly=true
默认是并发消费,此处改成顺序消费
消费实现代码不用改
顺序发送的技术原理
顺序消息分2种情况
局部有序
指发送同一个队列的消息有序,可以在发送消息时指定队列,在消费消息时也按顺序消费
例如同一个订单ID的消息要保证有序,不同订单ID的消息没有约束,相互不影响,不同订单ID之间的消息是并行的
全局有序
设置Topic只有1个队列可以实现全局有序,创建Topic时手动设置
此类场景极少,性能差,通常不推荐使用
消息发送有3种方式
同步
发送网络请求后会同步等待Broker服务器的返回结果,支持发送失败重试
适用于较重要的消息通知场景
异步
异步发送网络请求,不会阻塞当前线程,不支持失败重试
适用于对响应时间要求更高的场景
单向
单向发送原理和异步一致,但不支持回调
适用于响应时间非常短、对可靠性要求不高的场景,例如日志收集
顺序消息发送原理
原理描述
同一类消息发送到相同的队列即可
为了保证先发送的消息先存储到消息队列,必须使用同步发送的方式
否则可能出现先发的消息后到消息队列,此时消息就乱序了
核心源码
同步发送
图片
选择队列
图片
流程描述
根据hashKey计算hash值,hashKey是我们前面例子中的订单ID,因此相同订单ID的hash值相同
用hash值和队列数mqs.size()取模,得到一个索引值,结果小于队列数
根据索引值从队列列表中取出一个队列mqs.get(value),hash值相同则队列相同
在队列列表的获取过程中,由Producer从NameServer根据Topic查询Broker列表,缓存在本地内存中,以便下次从缓存中读取
普通发送的技术原理
特殊消息
顺序消息
事务消息
延迟消息
普通消息
日常开发过程中最常用
最常用的场景
系统间的异步解耦
流量的削峰填谷
尽量保证消息高性能收发即可
发送选择队列有2种机制
轮询机制(默认):一个Topic有多个队列,轮询选择其中一个队列
故障规避机制(也称故障延迟机制)
普通消息轮询机制原理
核心源码
图片
过程描述
路由信息TopicPublishInfo中维护了一个计数器sendWhichQueue
每发送一次消息需要查询一次路由,计算器就进行“+1”
通过计数器的值index与队列的数量取模计算来实现轮询算法
弊端
如果轮询选择的队列是在宕机的Broker上,会导致消息发送失败
即使消息发送重试的时候重新选择队列,也可能还是在宕机的Broker上
无法规避发送失败的情况,因此就有了故障规避机制
顺序消费的技术原理
RocketMQ支持两种消息模式
集群消费(Clustering:默认):在集群消费模式下每条消息只会被ConsumerGroup的一个Consumer消费
多数场景都使用集群消费
消息每消费一次代表一次业务处理
集群消费表示每条消息由业务应用集群中任意一个服务实例来处理
广播消费(Broadcasting):在广播消费模式下每条消息会被ConsumerGroup的每个Consumer消费
少数场景使用广播消费
例如数据发生变化,更新业务应用集群中每个服务的本地缓存,这就需要一条消息整个集群都消费一次
顺序消费技术原理
消费核心代码
图片
消费流程描述
同一个消息队列只允许Consumer中的一个消费线程拉取消费
Consumer中有个消费线程池,多个线程会同时消费消息
在顺序消费的场景下消费线程请求到Broker时会先申请独占锁,获得锁的请求则允许消费
消息消费成功后,会向Broker提交消费进度,更新消费位点信息,避免下次拉取到已消费的消息
顺序消费中如果消费线程在监听器中进行业务处理时抛出异常,则不会提交消费进度
消费进度会阻塞在当前这条消息,并不会继续消费该队列中后续的消息,从而保证顺序消费
在顺序消费的场景下,特别需要注意对异常的处理,如果重试也失败,会一直阻塞在当前消息,直到超出最大重试次数
从而在很长一段时间内无法消费后续消息造成队列消息堆积
并发消费的技术原理
RocketMQ支持两种消费方式
顺序消费,也称为有序消费
并发消费(默认),也称为乱序消费
原理描述
同一个消息队列提供给Consumer中的多个消费线程拉取消费
Consumer中会维护一个消费线程池,多个消费线程可以并发去同一个消息队列中拉取消息进行消费
如果某个消费线程在监听器中进行业务处理时抛出异常,当前消费线程拉取的消息会进行重试,不影响其他消费线程和消息队列的消费进度
消费成功的线程正常提交消费进度
并发消费相比顺序消费没有资源争抢上锁的过程,消费消息的速度比顺序消费要快很多
消息的幂等性
概要
RocketMQ不保证消息不被重复消费
如果业务对消费重复非常敏感,必须要在业务层面进行幂等性处理
具体实现可以通过分布式锁来完成
消费消息有3种模式
at-most-once(最多一次):广泛使用
消息投递后不论消费是否成功,不会再重复投递,有可能会导致消息未被消费
RocketMQ未使用该方式
at-least-once(最少一次):广泛使用
消息投递后,消费完成后,向服务器返回ACK(消费确认机制),没有消费则一定不会返回ACK消息
由于网络异常、客户端重启等原因,服务器未能收到客户端返回的ACK,服务器则会再次投递,这就会导致可能重复消费
RocketMQ通过ACK来确保消息至少被消费一次
exactly-only-once(精确仅一次)
必须同时满足2个条件
发送消息阶段,不允许发送重复的消息
消费消息阶段,不允许消费重复的消息
RocketMQ未实现该方式
该模式的缺点:实现该模式需要巨大的开销
分布式消息系统都是在三者间取平衡,前两者是可行的并且被广泛使用的
如何实现事务消息
事务消息的使用场景
电商系统中用户下单后新增了订单记录,对应的商品库存需要减少,怎么保证新增订单后商品库存减少呢?
红包业务,张三给李四发红包,张三的账户余额需要扣减,李四的账户余额需要增加,怎么保证张三账户扣钱后李四账户加钱呢?
如何发送事务消息
使用RocketMQ事务消息来模拟下单减库存的场景
发送订单的事务消息,预提交
配置文件
图片
核心代码
图片
代码分析
Order对象保存了订单信息,随机生成一个ID作为消息的事务ID
定义了一个名为OrderTransactionGroup的事务组,用于下一步接收本地事务的监听
此时消息已经发送到Broker中,但还未投递出去,Consumer暂时还不能消费这条消息
执行订单信息入库的事务操作,提交或回滚事务消息
核心代码
图片
代码分析
实现RocketMQLocalTransactionListener接口
使用@RocketMQTransactionListener注解用于接收本地事务的监听
txProducerGroup是事务组名称,和前面定义的OrderTransactionGroup保持一致
RocketMQLocalTransactionListener接口有两个实现方法
executeLocalTransaction:执行本地事务
消息发送成功会回调执行
一旦事务提交成功,下游应用的Consumer能收到该消息
在这里demo的本地事务就是保存订单信息入库
checkLocalTransaction:检查本地事务执行状态
如果executeLocalTransaction方法中返回的状态是未知UNKNOWN或者未返回状态
默认会在预处理发送的1分钟后由Broker通知Producer检查本地事务
在Producer中回调本地事务监听器中的checkLocalTransaction方法
检查本地事务时,可以根据事务ID查询本地事务的状态,再返回具体事务状态给Broker
消费订单消息
配置文件
图片
核心代码
图片
消费事务消息与消费普通消息的代码是一样的,无须做任何修改
事务消息的技术原理
原理概要
RocketMQ采用了2PC的方案来提交事务消息
第一阶段Producer向Broker发送预处理消息(也称半消息),此时消息还未被投递出去,Consumer不能消费
第二阶段Producer向Broker发送提交或回滚消息
流程分析
发送预处理消息成功后,开始执行本地事务
如果本地事务执行成功,发送提交请求提交事务消息,消息会投递给Consumer
图片
如果本地事务执行失败,发送回滚请求回滚事务消息,消息不会投递给Consumer
图片
如果本地事务状态未知,网络故障或Producer宕机,Broker未收到二次确认的消息。 由Broker端发送请求给Producer进行消息回查,确认提交或回滚。
如果消息状态一直未被确认,需要人工介入处理
图片
原理分析
RocketMQ的消息的存储是由ConsumeQueue和CommitLog配合来完成的
ConsumeQueue中只存储很少的数据,消息主体都是通过CommitLog来进行读写
如果某个消息只在CommitLog中有数据,而ConsumeQueue中没有,则消费者无法消费
RocketMQ的事务消息实现就利用了这一点
高性能设计
概要
高性能设计体现在3个方面
数据存储设计
顺序写盘
消费队列设计
消息跳跃读
数据零拷贝
动态伸缩的能力
消息队列扩容
Broker集群扩容
消息实时投递
高吞吐量
这主要得益于其数据存储方式的设计
数据存储的核心由两部分组成
CommitLog数据存储文件
ConsumeQueue消费队列文件
数据存储流程
Producer将消息发送到Broker服务器
Broker会把所有消息都存储在CommitLog文件中
再由CommitLog转发到ConsumeQueue文件提供给各个Consumer消费
顺序写盘
顺序写盘指写磁盘上的文件采用顺序写的方式
一次磁盘请求(读或写)完成过程由3个动作组成
寻道:磁头移动定位到指定磁道,时间很长,是指找到数据在哪个地方。
旋转延迟:等待指定扇区旋转至磁头下,机械硬盘和每分钟多少转有关系,时间很短。
数据传输:数据通过系统总线从磁盘传送到内存,时间很短。
顺序写盘性能高的原因
磁盘读写最慢的动作是寻道
缩短寻道时间就能有效提升磁盘的读写速度,最优的方式就是不用寻道
随机写会导致磁头不停地更换磁道,时间都花在寻道上了
顺序写几乎不用换磁道,或者寻道时间很短
RocketMQ顺序写盘流程
CommitLog文件是负责存储消息数据的文件
所有Topic的消息都会先存在{ROCKETMQ_HOME}/store/commitlog文件夹下的文件中
消息数据写入CommitLog文件是加锁串行追加写入
为了保证消息发送的高吞吐量,使用单个文件存储所有Topic的消息
保证消息存储是完全的磁盘顺序写
但这样给文件读取(消费消息)带来了困难
当消息到达CommitLog文件后,会通过线程异步几乎实时地将消息转发给消费队列文件
每个CommitLog文件的默认大小是1GB,写满1GB再写新的文件,大量数据I/O都在顺序写同一个CommitLog文件
文件名按照该文件起始的总的字节偏移量offset命名,文件名固定长度为20位,不足20位前面补0
第一个文件起始偏移量是0,即文件名是00000000000000000000
第二个文件起始偏移量是1024×1024×1024=1073741824(1GB=1073741824 B),
即文件名是0000000001073741824
文件名这样设计的目的
在消费消息时能够根据偏移量offset快速定位到消息存储在某个CommitLog文件
从而加快消息的检索速度
消息数据文件中每条消息数据的具体格式
消费队列设计
设计背景
消费Broker中存储消息的实际工作就是读取文件,但消息数据文件中所有Topic的消息数据是混合在一起的
消费消息时是区分Topic消费的,这就导致如果消费时也读取CommitLog文件会使得消费消息的性能差、吞吐量低
为了解决消息数据文件顺序写难以读取的问题,RocketMQ中设计了消费队列ConsumeQueue
设计原理
ConsumeQueue负责存储消费队列文件
在消息写入CommitLog文件时,会异步转发到ConsumeQueue文件,然后提供给Consumer消费
ConsumeQueue文件中并不存储具体的消息数据,只存CommitLog的偏移量offset、消息大小size、消息Tag Hashcode
ConsumeQueue文件中存储的内容
图片
ConsumeQueue文件
每个Topic在某个Broker下对应多个队列Queue,默认是4个消费队列Queue
每一条记录的大小是20B,默认一个文件存储30万个记录
文件名同样也按照字节偏移量offset命名,文件名固定长度为20位,不足20位前面补0
第一个文件起始偏移量是0,文件名是00000000000000000000,与CommitLog文件一致
第二个文件起始偏移量是20×30w=6000000,第二个文件名是00000000000006000000
客户端消费ConsumeQueue原理
在集群模式下,Broker会记录客户端对每个消费队列的消费偏移量,定位到ConsumeQueue里相应的记录
并通过CommitLog的Offset定位到CommitLog文件里的该条消息
消费定位流程图
图片
消息跳跃读取
概述
消费Broker中存储消息的实际工作就是读取文件,消息队列文件是一种数据结构上的设计
磁盘顺序读写和消息队列文件的设计,为了高性能读数据,除此之外,还使用了操作系统中的PageCache机制
RocketMQ读取消息依赖操作系统的PageCache
PageCache命中率越高则读性能越高
操作系统会尽量预读数据,使应用直接访问磁盘的概率降低
消息队列文件的读取流程
检查要读的数据是否在上次预读的Cache中
如果没有命中Cache,操作系统从磁盘中读取对应的数据页,并将该数据页之后的连续几页一起读入Cache中,
再将应用需要的数据返回给应用,这种方式称为跳跃读取
如果命中Cache,上次缓存的数据有效,操作系统认为在顺序读盘,则继续扩大缓存的数据范围,
将之前缓存的数据页之后的几页数据再读取到Cache中
由于ConsumeQueue和CommitLog文件都是顺序写入的,客户端消费也是按顺序消费,所以能极大的提高PageCache命中率
计算机系统中CPU、RAM、DISK的速度
按速度高低排列为:CPU>RAM>DISK
CPU与RAM之间、RAM与DISK之间的速度和容量差异是指数级的
为了在速度和容量上折中
在CPU与RAM之间使用CPU Cache以提高访存速度
在RAM与磁盘之间,操作系统使用Page Cache提高系统对文件的访问速度
数据零拷贝
在网络通信过程中,通常情况下对文件的读写要多经历一次数据拷贝,
例如写文件数据要从用户态拷贝到内核态,再由内核态写入物理文件
RocketMQ中的文件读写主要通过Java NIO中的MappedByteBuffer来进行文件映射
利用了Java NIO中的FileChannel模型
可以直接将物理文件映射到缓冲区的PageCache
少了一次数据拷贝过程,提高了读写速度
普通拷贝和零拷贝对比
图片
图片
零拷贝技术出现的背景
许多web应用都会向用户提供大量的静态内容,这意味着有很多data从硬盘读出之后,会原封不动的通过socket传输给用户
这种操作看起来可能不会怎么消耗CPU,但是实际上它是低效的
kernal把数据从disk读出来,然后把它传输给user级的application
然后application再次把同样的内容再传回给处于kernal级的socket
这种场景下,application实际上只是作为一种低效的中间介质,用来把disk file的data传给socket
data每次穿过user态和kernel边界时,都会被copy,这会消耗cpu,并且占用RAM的带宽
零拷贝
零拷贝指的是用户态与内核态间copy数据的次数为零
零拷贝避免了用户态和内核态间的copy、减少了两次用户态内核态间的切换
零拷贝可以提高数据传输效率,但对于需要在用户传输过程中对数据进行加工的场景(如加密)就不适合使用零拷贝
使用kernel buffer做中介的原因
为什么不是直接把data传到user buffer中,看起来比较低效(多了一次copy),原因如一下:
kernel buffer是用来提高性能的
在进行读操作的时候,kernel buffer起到了预读cache的作用
当写请求的data size比kernel buffer的size小的时候,这能够显著的提升性能(批处理逻辑)
在进行写操作时,kernel buffer的存在可以使得写请求完全异步
缺点:当请求的data size远大于kernel buffer size的时候,这个方法本身变成了性能的瓶颈
原因:data需要在disk,kernel buffer,user buffer之间拷贝很多次(每次写满整个buffer)
解决方法:zero copy正是通过消除这些多余的data copy来提升性能
Java中的零拷贝
Java NIO中的FileChannel拥有transferTo和transferFrom两个方法
可直接把FileChannel中的数据拷贝到另外一个Channel
或直接把另外一个Channel中的数据拷贝到FileChannel
该接口常被用于高效的网络/文件的数据传输和大文件拷贝
在操作系统支持的情况下,通过该方法传输数据并不需要将源数据从内核态拷贝到用户态,再从用户态拷贝到目标通道的内核态
同时也减少了两次用户态和内核态间的上下文切换,也即使用了“零拷贝”
动态伸缩能力
消息队列扩容/缩容
一个Consumer实例可以同时消费多个消息队列中的消息
消息队列数在创建Topic时可以指定,也可以在运行中修改
如果一个Topic的消息量特别大,但Broker集群水位压力还是很低,
就可以对该Topic的消息队列进行扩容,Topic的消息队列数跟消费速度成正比
如果一个Topic的消息量特别小,但该Topic的消息队列数很多,则可以对该Topic消息队列缩容
Broker集群扩容/缩容
如果一个Topic的消息量特别大,但Broker集群水位很高,此时就需要对Broker机器扩容
如果Broker集群水位很低,则可以适当减少Broker服务器来节约成本
扩容方式
直接加机器部署Broker即可
新的Broker启动后会向NameServer注册
Producer和Consumer通过NameServer发现新Broker并更新路由信息
消息实时投递
MQ两种获取消息的方式
Push推模式
当消息发送到服务端时,由服务端主动推送给客户端Consumer
优点:客户端Consumer能实时地接收到新的消息数据
缺点
如果Consumer消费一条消息耗时很长,消费推送速度大于消费速度时,Consumer消费不过来会出现缓冲区溢出
一个Topic往往会对应多个ConsumerGroup,服务端一条消息会产生多次推送,可能会对服务端造成压力
Pull拉模式
由客户端Consumer主动发送请求,每间隔一段时间轮询去服务端拉取消息
优点:Consumer可以根据当前消费速度选择合适的时机触发拉取(能实现批量拉取)
缺点:拉取的间隔时间不好控制
间隔时间如果很长,会导致消息消费不及时,服务端容易积压消息
间隔时间如果很短,服务端收到的消息少,会导致Consumer可能多数拉取请求都是无效的(拿不到消息),
从而浪费网络资源和服务端资源
RocketMQ获取消息的方式
Push和Pull这两种获取消息方式的缺点都很明显,单一的方式难以应对复杂的消费场景
所以RocketMQ中提供了一种推/拉结合的长轮询机制来平衡推/拉模式各自的缺点
长轮询本质上是对普通pull模式的优化
即还是以客户端Consumer轮询的方式主动发送拉取请求到服务端Broker
Broker如果检测到有新的消息就立即返回Consumer
但如果没有新消息则暂时不返回任何信息,挂起当前请求缓存到本地
Broker后台有个线程去检查挂起请求,等到新消息产生时再返回Consumer
平常使用的DefaultMQPushConsumer的实现就是推、拉结合的,既能解决资源浪费问题,也能解决消费不及时问题
Kafka获取消息的方式
kafka也是采用pull模式
对pull模式做了优化
通过参数可以让consumer阻塞知道新消息到达再拉取
也可以阻塞知道消息的数量达到某个特定的量再拉取
高可用设计
概述
计算机系统的可用性用平均无故障时间来度量,系统的可用性越高,则平均无故障时间越长
RocketMQ的高可用设计的体现
消息发送的高可用
场景描述
在消息发送时可能会遇到网络问题、Broker宕机等情况
而NameServer检测Broker是有延迟的
虽然NameServer每间隔10秒会扫描所有Broker信息,但要Broker的最后心跳时间超过120秒以上才认为该Broker不可用,
所以Producer不能及时感知Broker下线
如果在这期间消息一直发送失败,那么消息发送失败率会很高,这在业务上是无法接受的
RocketMQ采用了一些发送端的高可用方案,来解决发送失败的问题,其中最重要的两个设计是重试机制与故障延迟机制
消息存储的高可用
场景描述
发送成功的消息不能丢
Broker不能发生单点故障
出现Broker异常宕机、操作系统Crash、机房断电或断网等情况保证数据不丢
RocketMQ主要通过消息持久化(也称刷盘)、主从复制、读写分离机制来保证
消息消费的高可用
场景描述
实际业务场景中无法避免消费消息失败的情况
可能由于网络原因导致,也可能由于业务逻辑错误导致
但无论发生任何情况,即使消息一直消费失败,也不能丢失消息数据
RocketMQ主要通过消费重试机制和消息ACK机制来保证
集群管理的高可用
场景描述
当部分NameServer节点宕机时不会有什么糟糕的影响
只剩一个NameServer节点RocketMQ集群也能正常运行
即使NameServer全部宕机,也不影响已经运行的Broker、Producer和Consumer
RocketMQ主要通过NameServer无状态设计保证
消息发送重试机制
描述:在消息发送出现异常时会尝试再次发送,默认最多重试3次
特点
重试机制仅支持同步发送方式,不支持异步和单向发送方式
根据发送失败的异常类型处理策略略有不同
如果是网络异常RemotingException和客户端异常MQClientException会重试
如果是Broker服务端异常MQBrokerException和线程中断异常InterruptedException则不会再重试,且抛出异常
核心源码
图片
故障规避机制
故障规避机制用来解决当Broker出现故障,Producer不能及时感知而导致消息发送失败的问题(默认不开启)
故障规避机制逻辑
在开启的情况下,消息发送失败的时候会将失败的Broker暂时排除在队列选择列表外
规避时间是衰减的,如果Broker一直不可用,会被NameServer检测到并在Producer更新路由信息时进行剔除
选择查找路由时,选择消息队列的关键步骤
先按轮询算法选择一个消息队列
图片
从故障列表判断该消息队列是否可用
判断其是否在故障列表中,不在故障列表中代表可用
图片
在故障列表faultItemTable中还需判断当前时间是否大于等于故障规避的开始时间startTimestamp
使用这个时间判断是因为通常故障时间是有限制的,Broker宕机之后会有相关运维去恢复
故障机器FaultItem在消息发送失败时进入故障列表faultItemTable中
FaultItem存储了Broker名称、响应时长、故障规避开始时间,最重要的是故障规避开始时间,会用来判断Queue是否可用
故障周期时长为0就代表了没有故障
当响应时长超过100毫秒时代表Broker可能机器出现问题,也会进入故障列表
网络异常、Broker异常、客户端异常则设定故障周期为10分钟
同步刷盘与异步刷盘
刷盘是指消息数据发送到Broker之后,写入磁盘中做持久化,保障在Broker出现故障重启时数据不会丢失
两种刷盘机制
同步刷盘
在同步刷盘的模式下,当消息写到内存后,会等待数据写到磁盘的CommitLog文件
同步刷盘源码分析
源码入口:CommitLog#handleDiskFlush
图片
GroupCommitRequest是刷盘任务,提交刷盘任务后,会在刷盘队列中等待刷盘
而刷盘线程GroupCommitService每间隔10毫秒写一批数据到磁盘
不直接写的主要原因是磁盘I/O压力大、写入性能低
每间隔10毫秒写一次可以提升磁盘I/O效率和写入性能
GroupCommitService是刷盘线程,内部有两个刷盘任务列表
图片
在执行service.putRequest(request)时仅提交刷盘任务到任务列表
request.waitForFlush会同步等待GroupCommitService将任务列表中的任务刷盘完成
这里有两个队列读写分离,requestsWrite是写队列,用于保存添加进来的刷盘任务,requestsRead是读队列,
在刷盘之前会把写队列的数据放到读队列
图片
刷盘的时候依次读取requestsRead中的数据写入磁盘,写入完成后清空requestsRead。
读写分离设计的目的是在刷盘时不影响任务提交到列表。
图片
mappedFileQueue.flush(0)是刷盘操作,通过MappedFile映射的CommitLog文件写入磁盘
图片
异步刷盘
默认采用异步刷盘,异步刷盘又有两种策略:开启缓冲池和不开启缓冲池
不开启缓冲池(默认)
处理流程
刷盘线程FlushRealTimeService会每间隔500毫秒尝试去刷盘
这间隔500毫秒仅仅是尝试,实际去刷盘还得满足一些前提条件
即距离上次刷盘时间超过10秒,或者写入内存的数据超过4页(16KB)
这样即使服务器宕机,丢失的数据也是在10秒内的或大小在16KB以内的
核心源码
开启缓冲池
处理流程
RocketMQ会申请一块和CommitLog文件相同大小的堆外内存用来做缓冲池,数据会先写入缓冲池
提交线程CommitRealTimeService也每间隔500毫秒尝试提交到文件通道等待刷盘
刷盘最终还由FlushRealTimeService来完成,和不开启缓冲池的处理一致
使用缓冲池的目的是多条消息合并写入,从而提高I/O性能
核心源码
主从复制
设计背景
RocketMQ为了提高消息消费的高可用性
避免Broker发生单点故障引起存储在Broker上的消息无法及时消费
同时避免单个机器上硬盘坏损出现消息数据丢失
复制方式
同步复制:SYNC_MASTER
Master服务器和Slave服务器都写成功后才返回给客户端写成功的状态。
优点:如果Master服务器出现故障,Slave服务器上有全部数据的备份,很容易恢复到Master服务器
缺点:由于多了一个同步等待的步骤,会增加数据写入延迟,并且降低系统的吞吐量
异步复制:ASYNC_MASTER
仅Master服务器写成功即可返回给客户端写成功的状态
优点:由于没有那一次同步等待的步骤,服务器的延迟较低且吞吐量较高
缺点:如果Master服务器出现故障,有些数据因为没有被写入Slave服务器,未同步的数据有可能会丢失
通过配置文件${ROCKETMQ_HOME}/conf/broker.conf里的brokerRole参数进行设置复制方式
实践经验
在实际应用中需要结合业务场景,合理设置刷盘方式和主从复制方式
不建议使用SYNC_FLUSH同步它刷盘方式,因为它会频繁地触发写磁盘操作,性能下降很明显
高性能是RocketMQ的一个明显特点,因此放弃性能是不合适的选择
通常可以把Master和Slave设置成ASYNC_FLUSH异步刷盘、SYNC_MASTER同步复制,这样即使有一台服务器出故障,仍然可以保证数据不丢失
读写分离
读写分离机制也是高性能、高可用架构中常见的设计
实现方式
RocketMQ的Consumer在拉取消息时,Broker会判断Master服务器的消息堆积量以决定Consumer是否从Slave服务器拉取消息消费
默认一开始从Master服务器上拉取消息
如果Master服务器的消息堆积超过了物理内存的40%,则会在返回给Consumer的消息结果里告知Consumer,
下次需要从其他Slave服务器上拉取消息
消费重试机制
背景
实际业务场景中无法避免消费消息失败的情况
消费失败可能是因为业务处理中调用远程服务网络问题失败,不代表消息一定不能被消费,通过重试可以解决
重试队列
进入机制
在Consumer由于业务异常导致消费消息失败时
将消费失败的消息重新发送给Broker保存在重试队列
这样设计的原因是不能影响整体消费进度又必须防止消费失败的消息丢失
特点
重试队列的消息存在一个单独的Topic中,不在原消息的Topic中,Consumer自动订阅该Topic
重试队列的Topic名称格式为“%RETRY%+consumerGroup”
每个业务Topic都会有多个ConsumerGroup,每个ConsumerGroup消费失败的情况都不一样,因此各对应一个重试队列的Topic
死信队列
进入机制
由于业务逻辑Bug等原因,导致Consumer对部分消息长时间消费重试一直失败
为了保证这部分消息不丢失,同时不能阻塞其他能重试消费成功的消息
超过最大重试消费次数之后的消息会进入死信队列
特点
消息进入死信队列之后就不再自动消费,需要人工干预处理
死信队列也存在一个单独的Topic中,名称格式为“%DLQ%+consumerGroup”
消费重试流程
通常故障恢复需要一定的时间,如果不间断地重试,重试又失败的情况会占用并浪费资源
所以RocketMQ的消费重试机制采用时间衰减的方式,使用了自身定时消费的能力
首次在10秒后重试消费,如果消费成功则不再重试,如果消费失败则继续重试消费
第二次在30秒后重试消费,依此类推,每次重试的间隔时间都会加长
直到超出最大重试次数(默认为16次),则进入死信队列不再重试
重试消费过程中的间隔时间使用了定时消息
重试的消息数据并非直接写入重试队列,而是先写入定时消息队列,再通过定时消息的功能转发到重试队列
定时消息
也称延迟消息,等待指定的延迟时间后再进行消费(RocketMQ支持)
除了支持消费重试机制,延迟消息也适用于一些处理异步任务的场景
RocketMQ不支持任意时间精确的延迟消息
仅支持1s、5s、10s、30s、1min、2min、3min、4min、5min、6min、7min、8min、9min、10min、20min、30min、1h、2h
ACK机制
背景
在实际业务场景中,业务应用在消费消息的过程中偶尔会出现一些异常情况
例如程序发布导致的重启,或网络突然出现问题
此时正在进行业务处理的消息可能消费完了,也可能业务逻辑执行到一半没有消费完
通过消息的ACK机制识别有没有消费完
广播模式的消费进度保存在客户端本地
集群模式的消费进度保存在Broker上
集群模式中RocketMQ中采用ACK机制确保消息一定被消费
ACK机制处理流程
在消息投递过程中,不是消息从Broker发送到Consumer就算消费成功了
需要Consumer明确给Broker返回消费成功状态才算(提交ACK给broker成功)
如果从Broker发送到Consumer后,已经完成了业务处理,但在给Broker返回消费成功状态之前
Consumer发生宕机或断电、断网等情况,Broker未收到反馈,则不会保存消费进度(提交ACK给broker失败)
Consumer重启之后,消息会重新投递,此时也会出现重复消费的场景
因此消息幂等性需要业务自行保证
Broker集群部署
概要
Broker集群部署是消息存储高可用的基本保障
最直接的表现是Broker出现单机故障或重启时,不会影响RocketMQ整体的服务能力
单Master模式
单Master模式仅部署一台Broker机器,属于非集群模式
优点:部署方式简单
缺点
存在单点故障的风险,一旦Broker重启或者宕机,会导致整个服务不可用
不建议线上环境使用,仅可以用于本地测试
多Master模式
一个集群全部都是Master机器,没有Slave机器,属于不配置主从复制的场景
优点
配置简单,单个Master宕机或重启维护对应用无影响
在磁盘配置为RAID10时,即使机器宕机不可恢复,由于RAID10磁盘非常可靠,
消息也不会丢失(异步刷盘丢失少量消息,同步刷盘一条不丢),性能最高
缺点
单台机器宕机期间,这台机器上未被消费的消息在机器恢复之前不可订阅
消息的实时性会受到影响,这个缺点是致命的
一段时间内部分消费不可用
违背系统的可用性原则
不建议线上环境使用
异步复制的多Master多Slave模式
每个Master配置一个Slave,有多对Master-Slave,主从复制采用异步复制方式,主备有短暂消息延迟(毫秒级)
优点
即使磁盘损坏,消息丢失非常少,且消息实时性不会受影响
同时Master宕机后,消费者仍然可以从Slave消费
而且此过程对应用透明,不需要人工干预,性能同多Master模式几乎一样
缺点
在Master宕机且磁盘损坏的情况下可能会丢失少量消息
出现这种场景的概率很小,有风险但是很低
同步复制的多Master多Slave模式
每个Master配置一个Slave,有多对Master-Slave,主从复制采用同步复制方式,即只有主备都写成功,才向应用返回成功
优点
数据与服务都无单点故障
在Master宕机的情况下,消息无延迟,服务可用性与数据可用性都非常高
缺点
性能比异步复制模式略低(大约低10%左右)
发送单个消息的RT会略高
线上推荐使用异步刷盘+同步复制的多Master多Slave模式
Spring Cloud Alibaba RocketMQ
Spring Cloud Stream
Spring Cloud Stream是Spring Cloud体系内的一个框架,用于构建与共享消息传递系统连接的高度可伸缩的事件驱动微服务,
其目的是简化消息业务在Spring Cloud应用程序中的开发
架构图
图片
架构描述
应用程序通过Spring Cloud Stream注入的输入通道inputs和输出通道outputs与消息中间件Middleware通信
消息通道通过特定的中间件绑定器Binder实现连接到外部代理
Spring Cloud Stream的实现基于发布/订阅机制
核心构成部分
Spring Messaging(Spring Framework)
Spring Framework中的统一消息编程模型
核心对象
Message:消息对象,包含消息头Header和消息体Payload。
MessageChannel:消息通道接口,用于接收消息,提供send方法将消息发送至消息通道。
MessageHandler:消息处理器接口,用于处理消息逻辑。
Spring Integration(Spring Framework)
Spring Framework中用于支持企业集成的一种扩展机制,作用是提供一个简单的模型来构建企业集成解决方案,
对Spring Messaging进行了扩展
核心对象
MessageDispatcher:消息分发接口,用于分发消息和添加删除消息处理器。
MessageRouter:消息路由接口,定义默认的输出消息通道。
Filter:消息的过滤注解,用于配置消息过滤表达式。
Aggregator:消息的聚合注解,用于将多条消息聚合成一条。
Splitter:消息的分割,用于将一条消息拆分成多条。
Binders(Spring Cloud Stream)
目标绑定器,负责与外部消息中间件系统集成的组件
核心模块
doBindProducer:绑定消息中间件客户端发送消息模块。
doBindConsumer:绑定消息中间件客户端接收消息模块。
Bindings(Spring Cloud Stream)
外部消息中间件系统与应用程序提供的消息生产者和消费者(由Binders创建)之间的桥梁
提供者
Spring Cloud Stream官方提供了Kafka Binder
Spring Cloud Stream官方提供了RabbitMQ Binder
Spring Cloud Alibaba官方提供了RocketMQ Binder
功能总结
Spring Cloud Stream提供了简单易用的消息编程模型,内部基于发布/订阅模型实现
Spring Cloud Stream的Binder提供标准协议,不同的消息中间件都可以按照标准协议接入
Binder提供bindConsumer和bindProducer接口协议,分别用于构造生产者和消费者
Spring Cloud Alibaba RocketMQ
架构图
图片
核心部分
MessageChannel(output):
消息通道,用于发送消息,Spring Cloud Stream的标准接口。
MessageChannel(input):
消息通道,用于订阅消息,Spring Cloud Stream的标准接口。
Binder bindProducer:
目标绑定器,将发送通道发过来的消息发送到RocketMQ消息服务器,
由Spring Cloud Alibaba团队按照Spring Cloud Stream的标准协议实现。
Binder bindConsumer:
目标绑定器,将接收到RocketMQ消息服务器的消息推送给订阅通道,
由Spring Cloud Alibaba团队按照Spring Cloud Stream的标准协议实现。
Spring Cloud Stream 消息发送流程
消费发送流程图
图片
源码分析
Spring Cloud Stream 消息订阅流程
消息订阅流程图
图片
源码分析
RocketMQ Binder 集成消息发送
RocketMQ Binder 集成消息订阅
参考资料
RocketMQ吐血总结
如果导图对您有用,请在右上角给点个赞吧
0 条评论
下一页