RocketMQ知识整理
2021-04-06 06:37:37 796 举报
AI智能生成
RocketMQ是阿里巴巴开源的一款分布式消息中间件,它能够处理大量并发的消息传递,并且具有高可用性、可扩展性和可靠性。RocketMQ采用了发布/订阅模式,支持多种消息类型,包括普通消息、顺序消息、事务消息等。它还提供了丰富的监控和管理功能,包括消息轨迹追踪、性能监控和故障排查等。RocketMQ广泛应用于电商、金融、物流等领域,为企业提供了高效、可靠的消息传递解决方案。
作者其他创作
大纲/内容
技术选型
Kafka
优点
吞吐量高,常规机器配置,一台机器可达每秒十几万的QPS,相当强悍。
性能高,基本上发送给kafka都是毫秒级的性能。可用性很高。
可支持集群部署,部分机器宕机可以继续运行。
官方文档很详细
缺点
数据丢失问题
kafka收到消息后会写入磁盘缓冲区里,没有直接落到物理磁盘上去,所以机器本身故障了,可能导致磁盘缓冲区里的数据丢失。
功能单一,主要支持发送消息,消费消息。其他场景比较受限制。
适用场景
各大公司一般把kafka用在用户行为日志的采集和传输上,比如大数据团队要收集APP上用户的一些日志,这种日志就是用kafka来收集和传输的,因为这种日志适当丢失数据也没有关系,而且一般量特别大,要求吞吐量高,一般就是收发消息,不需要太多高级功能。所以比较适合这种场景。
kafka快的原因
顺序读写
索引
批量读写和文件压缩(清除已经更新消息的历史版本)
零拷贝
拓展知识点
用户空间和内核空间
传统I/O过程
topic存储结构
RabbitMQ
优点
可以保证数据不丢失。
也能保证高可用性,即集群部署的时候部分机器宕机可以继续运行。
支持部分高级功能,比如死信队列,消息重试之类的。
官方文档很详细
缺点
吞吐量比较低,一般每秒几万级别,所以遇到特别高并发的情况,支撑起来很困难。
集群扩展很麻烦。
还有一个致命缺陷是开发语言是erlang,国内少有精通erlang语言的工程师,因此也没法阅读源码,甚至修改源码。
适用场景
一些不需要很高吞吐量,也不需要部署特别大规模集群,无需阅读和修改rabbitmq源码的中小型公司
高可用架构
普通集群/镜像集群
RocketMQ
优点
RocketMQ是阿里的开源消息中间件,久进沙场,非常可靠,几乎同时解决了kafka和Rabbitmq的缺陷。
吞吐量很高,单机可达10万QPS以上
可以保证高可用性,性能很高,而且支持通过配置保证数据不丢失,可以部署大规模的集群。
可支持各种高级功能,比如延迟消息,事务消息,消息回溯,死信队列,消息积压等。
基于java开发,符合国内大多数公司技术栈,很容易阅读他的源码,甚至修改他的源码。
缺点
官方文档相对简单。
运用场景
国内很多一线互联网大厂都切换使用RocketMQ了,他们需要rocketMQ的高吞吐量,大规模部署能力,还有各种高阶功能去支撑自己的各种业务场景,同时还可以根据自己的需求定制修改RocketMQ的源码。
基于MQ的数据同步方案
介绍
Mysql Binlog同步系统,会监听Mysql数据库的Binlog,发送给你的系统,让你处理这些增删该操作日志
开源技术方案
阿里开源的Canal
Linkedin开源的Databus
具体方案
采用Canal监听MYSQL binlog,然后直接发送到RocketMQ里
大数据团队的数据同步系统从RocketMQ种获取到Mysql Binlog,接着把其中的数据库操作还原到自己的数据库种就可以。
要解决的问题
MQ消息乱序,导致大数据存储中的数据都错乱了。
万一有顺序的消息处理失败了可以走重试队列吗?
处理不关注表的binlog,很浪费时间。
对应解决方法
业务id相同,必须保证进入同一个MessageQueue,可以采用取模的方法,用id对MessageQueue的数量进行取模。
这时候不能返回RECONSUME_LATER状态,就必须返回SUSPEND_CURRENT_QUEUE_A_MOMENT,意思是先等一会,一会再继续处理这批消息,而不能把这批消息放入重试队列去,然后直接处理下一批消息。
发送消息的时候,给消息设置tag和属性。消费消息的时候,则根据tag和属性过滤。
代码
过滤语法
(1)数值比较,比如:>,>=,<,<=,BETWEEN,=;
(2)字符比较,比如:=,<>,IN;
(3)IS NULL 或者 IS NOT NULL;
(4)逻辑符号 AND,OR,NOT;
(5)数值,比如:123,3.1415;
(6)字符,比如:'abc',必须用单引号包裹起来;
(7)NULL,特殊的常量
(8)布尔值,TRUE 或 FALSE
代码
秒杀系统技术方案
难点分析
高并发的读
高并发的写
如何解决秒杀活动压力过大的问题
加数据库服务器方案
不靠谱,会导致公司服务器成本急剧飙升。
商品详情页系统的架构设计优化
页面数据静态化+多级缓存方案
把秒杀活动的商品详情页做成静态化的。即提前从数据库里把这个页面需要的数据都提取出来组装成一份静态数据放在别的地方,避免每次访问都要访问后端数据库。
CDN+Nginx+Redis多级缓存架构
第一级缓存
请求秒杀商品详情页数据时,从就近的cdn上加载,不需要每次请求都到上海机房。
Nginx基于lua脚本实现本地缓存
提前把秒杀商品详情页的数据放到Nginx中缓存,如果请求发送过来,可以从Nginx中直接加载缓存数据。不需要把请求转发到我们商品系统上去。
Nginx上存在缓存数据过期之类的问题,导致没找到我们需要的数据,此时可以由Nginx中的Lua脚本发送请求到Redis集群中加载我们提前放进去的秒杀商品数据。
如果还是没找到,把请求转发到商品详情页系统里加载即可
用答题的方案避免作弊以及延缓下单
在前端/客户端设置秒杀答题,错开大量人下单的时间,阻止作弊器刷单。
为秒杀独立出来一套订单系统,专门负责处理秒杀请求
如果秒杀下单请求和普通下单请求都由一套订单系统来承载,那么可能导致秒杀下单请求耗尽了订单系统资源,或者导致系统不稳定。然后后导致其他普通下单请求也出现问题。没法完成下单。所以我们一般会对订单系统部署两个集群,一个集群式秒杀订单系统集群,一个集群是普通订单系统集群。
基于redis实现下单时精准扣减库存,一旦库存扣减完则秒杀结束
一般会将每个秒杀商品的库存提前写入Redis中,然后在请求到来之后,直接对redis中库存进行扣减。
抢购完毕后提前过滤无效请求,大幅度消减转发到后端流量
在redis中库存被扣减完成后,说明后续其他请求没有必要发送到秒杀系统中了。因为商品已经被抢购完成了。此时我们可以让Nginx接收到后续请求的时候,直接把后续请求过滤掉。
比如一旦商品抢购完毕,可以在Zookeeper中写入一个秒杀完毕的标志位,然后ZK会反向通知Nginx中我们自己写的Lua脚本,通过Lua脚本后续在请求过来的时候直接过滤掉,不要向后转发了。
瞬时高并发下单请求进入RocketMQ进行削峰,订单系统慢慢拉取消息完成下单操作
如果判断发现通过redis完成了库存扣减,此时直接发送消息到RocketMQ中即可,让普通订单系统从RocketMQ中消费秒杀成功的消息进行常规性的流程处理即可,比如创建订单等等。此时的化,瞬间上万并发的压力会被RocketMQ轻松扛下来,然后普通订单系统可以根据自己的工作负载慢慢的从RocketMQ中拉取秒杀成功的消息,然后进行后续操作就可以了,不会对订单数据库造成过大的压力。
后续订单系统以每秒几千的速率慢慢处理,延迟个可能几十秒,这些下单请求就会处理完毕。
总结
避免直接基于数据库进行高并发库存扣减
架构优化的核心时独立出来一套系统专门处理,避免高并发请求落在mysql上。
后续占据99%的请求,直接在Nginx层面拦截掉
订单请求,写入RocketMQ进行削峰,让RocketMQ轻松抗下高并发压力,让订单系统慢慢消费和处理下单操作
消息零丢失方案
消息发送零丢失
事务消息机制
先发送half消息给mq,如果失败,就无法与mq通信,则回滚本地事务。
如果成功,MQ会把half消息写入到自己内部的“RMQ_SYS_TRANS_HALF_TOPIC”这个topic对应的一个ConsumeQueue,此时则发送消息给生产者,消费者则完成本地任务。
如果更新本地任务失败(成功),发送rollback(commit)请求。通知删除half消息。本地可以写入支付失败记录,开启后台线程在MYSQL恢复之后,把订单状态更新为已关闭。
当mq系统没有收到half消息commit/rollback响应,mq后台会有定时任务,定时任务会扫描RMQ_SYS_TRANS_HALF_TOPIC的half消息,如果超过一定时间还是half消息,即mq没有收到commit/rollback操作,会回调生产者接口,(最多回调15次,如果15次后你没法告知他half消息的状态,就直接标记为rollback)查询half消息是否应该rockback还是commit,再做处理。
在RocketMQ执行rollback操作时,在内部OP_TOPIC中写入一条rollback OP记录到这个topic中,标记某个half消息是否rollback了。
如果生产者执行了commit操作,RocketMQ就会在OP_TOPIC里写入一条记录,标记half消息已经是commit状态,接着需要把放在RMQ_SYS_TRANS_HALF_TOPI中half消息给写入到对应topic的ConsumeQueue里。然后消费者就可以进行消费了。
总结
如此复杂的机制(事务消息机制)可能导致整体性能比较差,而且吞吐量比较低,这时可以用同步发送消息+反复重试多次的方案去确保消息绝对投递到了MQ中,似乎还是不够的。
代码
异步刷盘调整为同步刷盘
以免将half topic写入对应topic时系统宕机造成消息丢失。
flushDiskType配置设置为SYNC_FLUSH,默认值时ASYNC_FLUSH,默认是异步刷盘。
通过主从架构模式避免磁盘故障导致的消息丢失
保证数据在Master Broker和Slave Broker上有多个副本的冗余。
消息消费零丢失
代码
消费者获取到一批消息后,回调监听器函数,处理这一批消息,处理完毕后,返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS作为消费成功的示意
接着提交这批消息的offset到broker中去。如果系统宕机,没有提交offset给broker,broker不会认为你已经处理完了这批消息,此时他其实会感知到consumer挂了。会把没处理完的消息交到其他消费者机器去进行处理,这种情况下,消息也绝对不会丢失。
总结
发送消息MQ零丢失
方案一(同步发送消息+反复多次重试)
方案二(事务消息机制),两者都有保证消息发送零丢失的效果,但是经过分析,事务消息方案整体会更好一些
MQ收到消息之后的零丢失
开启同步刷盘策略+主从架构同步机制,只要让一个Broker收到消息之后同步写入磁盘,同时复制给其他Broker,然后再返回响应给生产者说写入成功,此时就可以保证MQ自己不会再弄丢消息。
消费消息的零丢失
采用RocketMQ的消费者天然就可以保证你处理完消息之后,才会提交消息的offset到broker去,只要记住别采用多线程异步处理消息的方式即可。
不能有任何差错的核心链路采用消息零丢失方案,非核心链路采用同步发送消息+反复重试几次的方案。
生产实战
由于消费者系统故障导致的RocketMQ百万消息积压问题
如果消息允许丢失,就可以紧急修改消费者系统代码,在代码中对所有消息都获取到的就直接丢弃,不做任何处理,这样可以迅速的让积压在MQ的百万消息被处理掉,只不过处理方式是全部丢弃而已。
临时申请16台机器多部署16个消费者系统的实例,然后20个消费者同时消费,每个人消费一个MessageQueue的消息,此时你会发现消费的速度提高五倍,很快积压的百万消息都会被处理完毕。处理完成后,直接下线新加的机器。
如果messageQueue数量有限,只有4个,然后就没办法扩容消费者系统,因为加再多的消费者系统,还是只有4个messageQueue,没法并行消费,所以可以临时修改4个消费者的代码,让他们获取消息后不写nosql,而是直接把消息写入新的topic,这个速度很快,因为仅仅只是读写mq,然后新的messageQueue有20个,可以再部署20台临时追加的消费者系统,去消费新的topic后写入数据到NoSQL里去,这样子也可以迅速增加消费者的并行处理能力,使得用一个新的topic来允许更多的消费者系统并行处理。
金融级的系统如何针对RocketMQ集群崩溃设计高可用方案
通常会在你发送消息到MQ的那个系统中设计高可用的降级方案,这个降级方案通常的思路是,需要在你发送消息到MQ代码里去try catch捕获异常,如果你发送消息到MQ有异常,此时你需要进行重试。
如果连续重试3次还是失败,说明此时可能就是你的MQ集群崩溃了,此时你必须把这条重要的消息写入本地存储中去,可以说是写入数据库中,也可以是写入到机器本地磁盘文件里,或者noSQL存储里,几种方式我们都做过。之后只要你不停的发送消息到MQ去,一旦发现MQ集群恢复了,你必须有一个后台线程可以把之前持久化存储的消息都查询出来,然后依次按照顺序发送到MQ集群里去,这样才能保证你的消息不会因为MQ奔溃而彻底丢失。
你把消息写入存储中暂存时,一定要保证他的顺序,比如按照顺序一条一条的写入本地磁盘文件去暂存消息。一旦集群故障了,你后续的所有写消息代码必须严格按照顺序把消息写入到本地磁盘文件里暂存,这个顺序是要严格保证的。只要有这个方案在,哪怕你的MQ集群突然崩溃了,你的系统也是不会丢失消息的。
给RocketMQ增加消息限流功能保证其高可用性
接收消息时,必须引入限流机制,可以采用令牌桶算法。
设计一套Kafka到RocketMQ的双写+双读技术方案。
如果你要做MQ集群迁移,是不可能简单粗暴的,在某个时间点突然之间就说把所有的producer系统都停机了,然后更新他的代码,接着全部重新上线,然后所有的Producer系统都把消息写入到RocketMQ里去了。
一般来说,首先你要做到双写,也就是说你所有的Producer系统中,要引入一个双写的代码,让他同时往kafka和rocketmq中写入消息,然后多写几天,起码双写要持续1周左右,因为mq一般都是实时数据,里面数据最多保留一周。
当你双写持续一周过后,你会发现你的kafka和rocketmq里面的数据看起来几乎一模一样了,因为MQ也只保留近几天的数据。
但是光双写还是不够的,还需要同时进行双读,也就是说在你双写的同时,你所有的consumer系统都需要同时从kafka和rocketmq获取消息,分别用一摸一样的逻辑处理一遍,只不过从kafka获取到的消息还是走核心逻辑去处理,然后落库或者别的存储什么的,但是对于rocketmq用一样的逻辑处理,只是不落库。
你的consumer系统在同时从kafka和rocketmq进行消息读取的时候,需要统计每个mq当日读取和处理的消息数量,这点非常重要,同时对于rocketmq读取的消息处理之后的结果。可以写入临时存储中。
当你观察一段时间,发现双写和双读一段时间之后,如果所有的consumer系统通过对比发现,从kafka和rocketmq读取和处理的消息数量一致,同时处理之后得到的结果一致,此时就可以判断说当前kafka和rocketmq里的消息一致,可以计算出来的结果也是一致的。
这个时候就可以实施正式的切换了,可以停机Producer,再重新修改上线后,全部修改为仅仅写rocketmq这个时候数据也不会丢失,因为之前已经双写了一段时间后,然后所有的consumer系统可以全部下线后,修改代码再上限,全部基于rocketmq来获取消息,计算和处理,结果写入存储中。
源码分析
IDEA源码9(RocketMQ)导入
目录解析
broker
RocketMQ的Broker相关的代码,这里的代码可以用来启动Broker进程
client
RocketMQ的Producer、Consumer这些客户端的代码,生产消息、消费消息的代码都在里面
common
一些公共的代码
dev
开发相关的一些信息
distribution
部署RocketMQ的一些东西,比如bin目录 ,conf目录,等等
example
RocketMQ的一些例子
filter
RocketMQ的一些过滤器的东西
logappender和logging
RocketMQ的日志打印相关的东西
namesrv
NameServer的源码
openmessaging
开放消息标准,这个可以先忽略
remoting
这个很重要,这里放的是RocketMQ的远程网络通信模块的代码,基于netty实现的
srvutil
一些工具类
store
消息在Broker上进行存储相关的一些源码
style、test、tools
这里放的是checkstyle代码检查的东西,一些测试相关的类,还有就是tools里放的一些命令行监控工具类
源码调试方法
在源码中打一些端点,观察rocketmq源码运行过程,然后在这个过程中,需要从rocketmq实际运行和使用的角度,去观察他的源码运行流程。概括来讲可以分为三点:场景驱动+通俗语言+大量画图。
场景驱动
按照我们平常使用RocketMQ的各种场景进行源码分析,在一个场景中把各种源码串联起来分析。
通俗语言
用通俗易懂的语言表达
大量画图
尽量多画图
NameServer源码调试
启动类
NameSrvStartup
关键类
NamesrvController
NamesrvController controller = createNamesrvController(args);
start()
initialize()
this.kvConfigManager.load();
this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);
this.serverBootstrap= new ServerBootstrap();
remotingServer.start()
localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort())) 设置Netty服务器监听端口号,默认9876
最核心的是用于接收网络请求的初始化Netty网络服务器
Runtime类注册了一个JVM关闭时后的shutdown钩子
registerProcessor()
DefaultRequestProcessor处理broker注册请求
registerBroker()
RouteInfoManager这个数据管理组件里面有如何把一个broker机器的数据放入RouteInfoManager中维护的路由数据表的代码。
核心思路十分简单,就是用一些类似Map的数据结构,去存放Broker的路由数据。包括Broker的clusterName,brokerId,brokerName这些核心数据。
基于java并发包下ReadWriteLock进行读写锁加锁,因为在这里更新了那么多的内存Map数据结构,必须要加一个写锁,此时只能有一个线程更新他们。
启动一个定时调度线程,每隔10s扫描目前不活跃的Broker,使用的是RouteInfoManager中scanNotActiveBroker()方法。
配置类
NamesrvConfig
NettyServerConfig
结合NameServer启动日志观察其启动过程。
总结
初步了解了NameServer是如何启动的,了解到他最核心的就是基于Netty实现了一个网络服务器,然后监听9876端口,可以接收到Broker和客户端的网络请求。
Broker源码调试
启动类
BrokerStartup
createBrokerController()
创建初始化以及启动BrokerController这个broker管理控制组件,让BrokerController去控制管理Broker在JVM进程中的一切行为。包括初始化自己的netty服务器接收客户端的网络请求,包括管理磁盘上的消息数据,以及一代对后台线程的运行。
controller.getConfiguration().registerConfig(properties);
controller.initialize();
一种线程池是处理别人发送过来请求的
一种是后台定时调度任务的
关键类
BrokerController
if(null == System.getProperty()){
NettySystemConfig.socketSndBufSize=131072
}
NettySystemConfig.socketSndBufSize=131072
}
设置Netty网络通信相关变量,socket发送缓冲大小
场景驱动
启动时干了哪些事
Broker启动了,必然要去注册自己到NameServer去,所以BrokerOuterAPI这个组件是核心组件。
Broker启动后,必然有个网络服务器去接收别人的请求,此时NettyServer这个组件是必须知道的。
当你的NettyServer接收到网络请求后,需要有线程池来处理,你这里应该有个处理各种请求的线程池。
处理请求的线程池在处理每个请求的时候,需要各种核心功能组件的协调,比如写入消息到CommitLog,然后写入索引到indexfile和consumer queue文件里去,此时你需要一些messageStore之类的组件来配合。
此外,需要一些后台定时调度运行的线程来工作,比如发送定时心跳到NameServer去。
Broker如何发送心跳进行故障感知
在BrokerController启动的时候,其实不仅仅是发送一次注册请求,而是启动一个定时任务,会每隔一段时间(30s)就发送一次注册请求。
配置类
NettyServerConfig
NettyClientConfig
BrokerConfig
MessageStoreConfig
总结
- BrokerController一旦初始化完成后,其实就准备好了Netty服务器,可以用于接收网络请求,然后准备好了处理各种请求的线程池,准备好了各种执行后台定时调度任务的线程池。同时也会在完成启动过程中向NameServer进行注册,以及保持心跳。
消息生产与消费配置,代码
Producer代码
如何从NameServer拉取topic元数据
如何选择MessageQueue发送
如何与broker进行网络通信
作用
异步化提升性能
降低系统耦合
流量削峰
系统A发送过来的每秒1万请求是一个流量洪峰,然后MQ直接扛下来了,都存储在本地磁盘,这个过程就是流量削峰的过程,瞬间把一个洪峰给削下来了,让系统B后续慢慢获取消息来处理。
集群架构
核心成员
NameServer
作用
路由中心
负责管理集群中所有Broker的信息,让MQ系统可以感知到集群中有哪些Broker。
特点
支持部署多台机器集群化部署,保证高可用性。
每个Broker(不论Master还是Slave)启动都会向所有的NameServer进行注册。
生产者/消费者获取Broker集群信息时,会主动去NameServer中拉取Broker信息。
心跳机制
Broker每隔30s会给所有的NameServer发送心跳,告诉每个NameServer自己还活着。NameServer收到心跳后会更新它的心跳时间。
Broker同时会告诉NameServer自己当前的数据情况,比如哪些topic在自己这里,这些信息都属于路由信息的一部分。
NameServer每隔10s会运行一个任务,检查各个Broker最近一次的心跳时间。如果某个Broker超过120s没有发送心跳,就认为这个Broker挂了。
集群化部署的特点
Peer-to-peer,集群化部署,但是里面任何一台机器都是独立运行的,跟其他机器没有任何通信。
每台NameServer都有完整的集群路由信息,包括所有Broker节点信息,数据信息,等等。只要任何一台存活下来,就可以保证系统的可用性。
Broker
高可用性
利用主从架构实现多副本存储和高可用
集群结构
特点
slave broker 拉取消息采用的是pull模式
消费者可能从Master Broker拉取消息,也可能从Slave Broker拉取消息。Master Broker在返回消息给消费者系统的时候,会根据Master Broker的负载情况和Slave Broker的同步情况,向消费者系统建议下一次拉取消息的时候是从Master Broker拉取还是从Slave Broker拉取。
Master Broker宕机
会导致有部分时间不可用,而且如果Master Broker上如果有部分数据没来得及同步给Slave Broker的话,也会造成数据丢失。
Slave Broker宕机
除了会加大Master Broker的读写压力,其他影响并不大。
生产者一定是投递消息到Master Broker的,然后Master Broker会同步数据给它的Slave Brokers,实现一份数据多份副本,保证Master故障时数据不丢失,可以自动切换为Master提供服务。
几种模式
Master-Slave模式
RocketMQ 4.5之前
Master Broker挂了之后,无法让Slave Broker自动切换。需手动运维,修改配置以及重启机器,切换为新的Master Broker。
不彻底的高可用模式
Master-多Slave模式
RocketMQ 4.5之后
基于Dledger技术实现自动让Slave切换为Master
Dledger实现高可用的原理
Dledger融入RocketMQ后,让一个Master Broker对应多个Slave Broker,然后在Master和Slave中进行数据同步。
此时一旦Master Broker宕机,就可以在多个副本中,通过Dledger技术和Raft协议算法进行leader选举,直接将一个slave broker选举为新的Master Broker,然后这个新Master Broker就可以对外提供服务了。整个故障自动转移的过程也许只需要10秒或者几十秒就可以完成。
Broker跟NameServer之间的通信
心跳机制
Broker会和每个NameServer建立一个TCP长连接,然后定时(每30s)通过TCP长连接发送心跳请求过去,告诉每个NameServer自己目前还活着。
故障感知机制
各个NameServer通过跟Broker建立好的长连接不断收到心跳包,定时(每10s)检查Broker有没有120s都没发送心跳包,来判断集群中各个Broker是否挂了,挂了就从路由信息里面摘除这个Broker
Broker故障处理
开启sendLatencyFaultEnable,会有个自动容错机制,可以避免一个Broker故障后,短时间生产者频繁发送消息到这个故障得Broker上去,出现较多次数的异常,而是在一个Broker故障之后,自动回避一段时间不要访问Broker,过段时间再去访问他。
Broker数据存储
当Broker接收到一条消息,会直接写入磁盘上一个叫CommitLog的日志文件。
CommitLog是很多磁盘文件,每个文件限定最多1GB,Broker收到消息后就直接追加写入这个文件的末尾。如果写满1GB,就会创建一个新的CommitLog文件。
在Broker中,对Topic下的每个MessageQueue都会有一系列的ConsumeQueue文件。
在Broker的磁盘上,会有下面这种格式的一系列文件,$HOME/store/consumequeue/{topic}/{queueId}/{fileName}
{topic}指代的是某个Topic
{queueId}指代就是某个MessageQueue
存储在这台Broker机器上的Topic下的一个MessageQueue,他有很多ConsumeQueue文件,这个ConsumeQueue文件存储的是一条消息对应在CommitLog文件中offset偏移量,即一个物理位置,其实是对CommitLog文件一个消息的引用。同时也包含了消息的长度,以及tag hashcode,一条数据是20个字节,每个ConsumeQueue文件保存30万条数据,大概每个文件是5.72MB。
CommitLog文件读写
如何让消息写入CommitLog文件近乎内存写性能的
Broker是基于OS操作系统的PageCache和顺序写两个机制,加OS异步刷盘策略,来提升写入CommitLog文件的性能。
以顺序的方式将消息写入CommitLog磁盘文件,也就是每次写入就是文件末尾加一条数据就可以,对文件进行顺序写的性能要比文件随机写的性能提升很多。
数据写入CommitLog文件的时候,其实不是直接写入底层的物理磁盘文件的,而是先进入OS的PageCache内存缓存中,然后后续由OS的后台线程选择一个时间,异步化的将OS PageCache内存缓冲中的数据刷入底层的磁盘文件。
os cache对于commitLog而言,主要是提升文件写入性能,当你不停的写入时,很多最新的写入数据都会先停留在os cache里,比如这可能有10GB-20GB的数据。之后os会自动把cache里比较旧的一些数据刷入磁盘,腾出来空间给更新写入的数据放在os cache中,所以大部分数据可能多达几个TB的数据都是在磁盘上。
CommitLog是基于os cache+磁盘一起读取的。
读取刚写入的CommitLog的数据,那么大概率还停留在os cache中,此时你可以顺利的直接从os cache中读取CommitLog中的数据,这就是内存读取,性能是很高的。
读取的是比较早之前写入CommitLog的数据,那么数据早就被刷入磁盘了,已经不在os cache里面了,那么此时就只能从磁盘上读取,这个性能稍微差一点。
os cache读 or 磁盘读取?
如果消费者机器一直快速的在拉取和消费处理,紧紧跟上了生产者写入broker的消息速率,那么你每次拉取几乎都是在拉取最近刚写入commitLog的数据,那几乎都是在os cache中。
如果broker负载很高,导致拉取消息的速度很慢,或者你自己消费者机器拉取到一批消息后处理性能很低,处理速度很慢,那会导致你跟不上生产者写入的速率,就会从磁盘中读取数据。
master broker读取 or slave broke读取?
如果出现经常需要从磁盘中读取数据的情况出现,master broker就会认为自己作为master broker负载太高,导致没法及时把消息给消费者,因此消费者落后进度比较多,这时候就会建议消费者从slave broker读取消息。
ConsumeQueue也是基于os cache的
ConsumeQueue会被大量的消费者发送的请求给高并发的读写,所以ConsumeQueue文件的读操作非常频繁。而且同时会极大的影响到消费者进行消息拉取的性能和消费吞吐量。
实际上broker对consumeQueue文件也是基于os cache来进行优化的。对于broker机器磁盘上大量的ConsumeQueue文件,在写入的时候也是优先进入os cache中的。
os自己也有一个优化机制,就是读取一个磁盘文件的时候,自动会把磁盘文件的一些数据缓存到os cache中。大家之前知道ConsumeQueue文件主要时存放消息的offset,所以每个文件很小。实际上ConsumeQueue文件们时不占用多少磁盘空间的,整体数据很小,几乎完全可以被os缓存。
所以实际上在消费者机器拉取消息的时候,第一步大量的频繁读取ConsumeQueue文件,几乎可以说就是跟内存里的数据的性能时一样的。通过这个就可以保证数据消费的高性能以及高吞吐。
刷盘机制
异步刷盘
生产者把消息发送给Broker,Broker将消息写入OS PageCache中,就直接返回ack给生产者。
问题
如果生产者认为消息写入成功了,但实际上那条消息在Broker机器上os cache中的,如果此时Broker直接宕机,那么os cache中这条数据就会丢失的。
适用场景
异步刷盘可以提供超高的写入吞吐量,但是有数据丢失风险。
同步刷盘
生产者发送一条消息出去,broker收到消息后,必须直接强制把这个消息刷入底层的物理磁盘文件中,然后才会返回ack给producer,此时你才知道消息写入成功了。只要消息进入了物理磁盘,那么除非是你的物理磁盘坏了导致数据丢失,否则正常来说数据就不会丢失了。
如果broker还没有来得及将数据刷入磁盘,然后他自己挂了,那么此时对producer来说就会感知到消息发送失败了,然后你只要不停的重试发送就可以了,直到slave broker切换成master broker重新让你写入消息,此时可以保证数据是不会丢的。
如果你强制每次消息写入都要直接写入磁盘中,必将导致每条消息写入性能急剧下降,导致消息写入吞吐量急剧下降。但是可以保证数据不会丢失。
适用场景
大幅度降低写入吞吐量,但是可以让你的数据不丢失。
核心数据模型Topic
特点
数据集合
不同类型的数据放到不同topic里面去
分布式存储
让它的数据分散储存在多台Broker机器上,实现rocketmq集群分布式存储海量消息数据。
数据分片机制
MessageQueue可以将一个Topic的数据拆分成很多个数据分片,然后在每个Broker机器上存储一些MessageQueue
生产者发送消息写入哪个MessageQueue
生产者从NameServer中知道,一个Topic有几个MessageQueue,哪些MessageQueue在哪台机器上,哪些MessageQueue在另外一台Broker机器上。
MessageQueue与消费者之间的关系
会均匀的把MessageQueue分配给消费组的多台机器。
基于Netty的高性能网络通信架构
架构原理
Producer要和Broker建立一个TCP长连接,此时Broker上的这个Reactor主线程,会在端口上监听到这个Producer建立连接的请求,接着这个Reactor主线程就会专门负责跟这个Producer按照tcp协议规定的一系列步骤和规范,通过SocketChannel建立好一个长连接。
Reactor主线程建立好的每个连接SocketChannel,都会交给这个Reactor线程池的其中一个线程去监听请求。
这时Producer发送请求来时,他发送一个消息过来到达Broker里的SocketChannel,此时Reactor线程池里的一个线程会监听到这个SocketChannel中有请求到达了。
接着reactor线程从socketChannel中读取出来一个请求,这个请求在正式进行处理之前,必须先要进行一些准备工作和预处理,比如SSL加密验证,编码解码,连接空闲检查,网络连接管理,诸如此类的。这些事会由worker线程池处理,默认有8个线程。
接下来就是正式的业务处理,会交给SendMessage线程池(可配置,配置越多,处理消息吞吐量越高)
高性能高并发分析
利用不同线程池并发执行,任何一个环节在执行的时候,都不会影响其他线程池在其他环节进行请求的处理,各个环节都可以同时高并发的处理大量的请求,这样一套网络通信架构,最终可以实现的效果就是可以以高并发高吞吐的对大量网络请求进行处理,这是保证Broker实现高吞吐的一个关键环节。
Reactor主线程在端口上监听Producer建立连接的请求,建立长连接
Reactor线程池并发监听多个连接的请求是否到达
Worker请求并发对多个请求进行预处理
业务线程池并发的对多个请求进行磁盘读写业务操作
mmap: Broker读写磁盘文件的核心技术
多次数据拷贝的问题
读
写
mmap技术+page cache技术进行文件读写优化
RocketMQ底层对于CommitLog,ConsumeQueue之类的磁盘文件读写操作,基本上都会基于mmap技术实现。
具体到代码层面,就是基于JDK NIO包下的MappedByteBuffer的map()函数,来将一个磁盘文件,比如一个commitLog文件,或者是一个ConsumeQueue文件,映射到内存里来。
刚开始你建立映射的时候,并没有任何的数据拷贝操作,其实磁盘文件还是停留在那里。不过你把物理上的磁盘文件的一些地址和用户私有空间的一些虚拟内存地址进行了一个映射。这个mmap技术在进行文件映射的时候,一般有大小限制,在1.5GB~2GB之间。所以RocketMQ才让CommitLog单个文件在1GB,ConsumeQueue文件在5.72MB,不会太大。限制这个大小后,进行文件读写时,就可以很方便的进行内存映射了。
接着就可以对MappedByteBuffer执行写入操作了,写入的时候,会直接进入PageCache中,然后过了一段时间后,由os的线程异步刷入磁盘中。这时只有一次拷贝的过程,相当于使用mmap技术之后,相比于传统磁盘io的一个性能优化。
读取的时候,也会从PageCache中读取。PageCache在加载数据的时候,会将你加载的数据块临近的其他数据块也一起加载到PageCache中去。
预映射机制+文件预热机制
内存预映射机制
broker会针对磁盘上各种CommtiLog,ConsumeQueue文件预先分配好MappedFile,也就是提前对一些可能接下来要读写的磁盘文件,提前使用MappedByteBuffer执行map()函数完成映射。这样后续读写文件时,就可以直接执行了。
文件预热
提前对一些文件完成映射之后,因为映射不会直接把数据加载到内存里来,那么后续在读取尤其是CommitLog,ConsumeQueue的时候,其实很有可能会频繁的从磁盘中加载数据到内存中去。所以在执行完map()函数之后,会进行madvise系统调用,提前尽可能多的把磁盘文件加载到内存中。
Producer
作用
发送消息
容错机制
建议采取集群化部署
消息投递过程
在NameServer上拉取路由信息
通过路由信息找到自己要投递消息的topic分布在哪台机器上
根据负载均衡算法,从中选择一台Broker机器,然后跟其建立tcp长连接,通过长连接向Broker发送消息即可。
轮询round robin算法
hash算法
Broker接收到消息后,存储到自己本地磁盘里去。
代码
消息投递模式
同步发送模式
消息发送后,会等待收到发送结果后,代码才会往下走。
异步发送模式
消息发送后,不会等到发送结果,但是MQ返回结果时,会回调你SendCallback的函数。
onSuccess
onException
单向消息模式
消息发送后,代码继续往下走,不关心MQ返回结果。
Consumer
作用
消费消息
容错机制
建议采取集群化部署
消息消费过程
和生产者系统原理类似,会跟NameServer建立长连接,然后拉取路由信息,接着找到自己要获取消息的topic在哪几台Broker上,就可以和Broker建立长连接,从里面拉取消息了。
假设消费者机器发送了拉取请求给broker,他说我这次要拉取messageQueue0中的消息,但是之前都没拉取过,就会从第一条消息开始拉取,于是就会从messageQueue0对应的consumeQueue0中,从里面第一条消息的offset上开始拉取。
根据consumeQueue0中找到的第一条消息的地址,去commitLog中根据这个offset地址去读取出来这条消息的数据,然后把这条消息的数据返回给消费者机器。
处理完这批消息,消费者机器就会提交我们目前的一个消费进度到Broker上去,然后Broker就会存储我们的消费进度,比如我们现在对ConsumeQueue0的消费进度假设就是在offset=1的位置,那么他就会记录下来一个consumeOffset的东西去标记我们的消费进度。
那么下次消费组再次拉取这个consumeQueue的消息,就可以从Broker记录的消费位置开始拉取,不用重头拉取。
代码
消费者
注解方式
@RocketMQMessageListener(
nameServer = "",
topic = "",
consumerGroup = "",
selectorExpression = ""
)
nameServer = "",
topic = "",
consumerGroup = "",
selectorExpression = ""
)
接口
implements RocketMQListener<MessageExt>
MessageExt为Message子类
消息消费模式
Push消费模式
底层也是基于消费者主动拉取的模式来实现的,只不过名字交过push,意思是broker会尽可能实时的把新消息交给消费者机器来处理,消息的实时性更好。
当消费者发送请求到Broker上去拉取消息时,如果有新的消息会立马返回一批消息到消费机器去处理,处理完之后会接着立刻发送请求到Broker机器上去拉取下一批消息。
所以消费机器在push模式下会处理完一批消息,立马发起请求拉取下一批消息,消息处理的时效性非常好,看起来和Broker一直不停的推送消息到消费机器一样。
请求挂起和长轮询机制
当你的请求发送到Broker,结果他发现没有新的消息给你处理,就会让请求线程挂起,默认挂起15秒,这个期间它会有后台线程每隔一会儿就去检查一下是否有新的消息给你,另外如果这个挂起的过程中,如果有新的消息到达了会主动唤醒挂起的线程,然后把消息返回给你。
Pull消费模式
pull模式代码写起来更加复杂和繁琐。
消费组消费模式
集群模式
默认的,一个消费组获取到一条消息,指挥交给组内的一台机器去处理,不是每台机器都可以获得这条消息的。
广播模式
改为广播模式的设置
consumer.setMessageModel(MessageModel.BROADCASTING);
一个消费组获取到的一条消息,组内每台机器都可以获取到这条消息,但相对而言广播模式其实用的很少,常见基本上都是使用集群模式来进行消费的。
rebalance机制
如果消费者组中出现机器宕机或者扩容加机器,会进入一个rebalance的环节,重新给各个消费机器分配他们要处理的MessageQueue,进行负载重平衡。
Dledger
Broker高可用架构原理
多副本同步+Leader自动切换
基于Dledger技术的Broker主从同步原理
基于Dledger技术实现Broker高可用架构,实际上就是用DLedger先替换掉原来Broker自己管理的CommitLog,由Dledger先替换掉原来Broker自己管理的CommitLog,由Dledger来管理CommitLog,基于Dledger管理的CommitLog去构建出来机器上的各个ConsumeQueue磁盘文件。
Dledger是如何基于Raft协议选举Leader broker
第一轮选举先投票给自己,所以第一轮选举失败。
随后进入一个随机事件的休眠,先苏醒的Broker投票给自己,其他Broker收到了他的选票也会投票给他。
得到超过半数的选票,就可以快速选举出来一个Leader。
Dledger如果基于Raft协议进行多副本同步
uncommitted阶段
首先Leader Broker上的Dledger收到一条数据之后,会标记为uncommitted状态,然后他会通过自己的DledgerServer组件把这个uncommitted数据发送到Follower Broker的DledgerServer
committed阶段
接着Follower Broker的DledgerServer收到uncommitted消息之后,必须返回一个ack给Leader Broker的DledgerServer,然后会将消息标记为committed状态。
超过一半的Follower Broker的Dledger返回ack后,Leader Broker上的DledgerServer就会发送committed消息给Follower Broker机器的DledgerServer,让他们把消息标记为committed状态。
会降低TPS
Leader Broker崩溃了怎么办
基于Dledger采用Raft协议的算法,选举出来一个新的Leader Broker继续对外提供服务,对没有完成的数据同步进行一些恢复操作,保证数据不会丢失。
部署实战
机器配置
NameServer
3台
8C16G
500G磁盘
千兆网卡
核心的路由服务,所以8C16G的较高配置的机器,但是他一般就是承载Broker注册和心跳,系统路由表等拉取请求,负载很低,因此不需要特别高的机器配置,部署三台也可以实现高可用的效果。
Broker
3台
24C48G(两颗x86_64 cpu,每颗12核)
1TB磁盘
千兆网卡
Broker是负载最高的,未来要承载高并发写入和海量数据存储,所以要把最高配置的机器留给他,这利用3台机器组成一个“单Master + 双Slave”的集群。
Producer
2台
4C8G
500GB磁盘
千兆网卡
Consumer
2台
4C8G
500GB磁盘
千兆网卡
实战
部署RocketMQ
先安装JDK,在环境变量中设置JAVA_HOME
构建Dledger
git clone https://github.com/openmessaging/openmessaging-storage-dledger.git
cd openmessaging-storage-dledger
mvn clean install -DskipTests
构建rocketMQ
git clone https://github.com/apache/rocketmq.git
cd rocketmq
git checkout -b store_with_dledger origin/store_with_dledger
mvn -Prelease-all -DskipTests clean install -U
rocketMQ配置文件修改
进入一个目录 cd distribution/target/apache-rocketmq
在这个目录,编辑三个文件,一个bin/runserver.sh,一个bin/runbroker,sh,另外一个是bin/tools.sh
操作
快速启动RocketMQ集群
执行命令快速启动RocketMQ集群,就会在当前机器上启动一个NameSever和三个Broker,三个broker中一个是Master,另外两个是Slave,瞬间组成了最小可用的RocketMQ集群。
sh bin/dledger/fast-try.sh start
检查RocketMQ集群的状态,执行右边命令后要等待几秒到十几秒,会看到三行记录,说是一个RaftCluster, Broker名字叫做RaftNode00,然后BID是0,1,2,也可能是0,1,3,这就说明RocketMQ集群启动成功了,BID为0的就是Master,BID大于0的就是Slave,其实在这里也可以叫做Leader和Follower
sh bin/mqadmin clusterList -n 127.0.0.1:9876
主从切换
可以尝试将Slave自动切换为Master,找到BID为0的Broker,用lsof -i:xxx命令找出来占用该端口的进程PID,接着用kill -9的命令杀了,等待10s,再次执行命令查看集群状态。此时如果Leader的BID为0的节点变成另外一个Broker,说明Slave切换成为Master了。
部署NameServer
在三台NameServer机器上,按照上面的步骤安装好Java,构建好Dledger和RocketMQ,编辑对应的文件,设置好JAVA_HOMEJ即可。
执行右边命令启动NameServer,默认监听端口9876
nohup sh mqnamesrv &
部署Broker
启动一个Master Broker和两个Slave Broker,这个启动很简单,分别在上述三台为Broker准备的高配置机器上,安装好java,构建好Dledger和RocketMQ,然后编辑好对应的文件。
接着就可以执行如下命令
nohup sh bin/mqbroker -c conf/dledger/broker-n0.conf &
第一个Broker配置文件是broker-n0.conf,第二个broker的配置文件是broker-n1.conf,第三个broker的配置文件可以是broker-n2.conf
# 集群名称:整个broker集群用一个名称
brokerClusterName = RaftCluster
brokerClusterName = RaftCluster
# 这是Broker的名称,比如你有一个Master和两个Slave,那么他们的Broker名称必须是一样的,因为他们三个是一个分组,如果你有另外一组Master和两个Slave,你可以给他们起个别的名字,比如RaftNode01
brokerName = RaftNode00
brokerName = RaftNode00
# 这个就是你的Broker监听的端口号,如果每台机器上就部署一个Broker,可以考虑用这个端口号,不用修改
listenPort=30911
listenPort=30911
# 这里是配置NameServer的地址,如果你有很多个NameServer的话,可以在这里写入多个NameServer的地址
namesrvAddr=127.0.0.1:9876
namesrvAddr=127.0.0.1:9876
# 下面两个目录是存放Broker数据的地方,你可以换成别的目录,类似于/usr/local/rocketmq/node00之类的
storePathRootDir=/tmp/rmqstore/node00
storePathCommitLog=/tmp/rmqstore/node00/commitlog
storePathRootDir=/tmp/rmqstore/node00
storePathCommitLog=/tmp/rmqstore/node00/commitlog
# 是否启用Dledger技术,这个必须是true
enableDledgerCommitLog=true
enableDledgerCommitLog=true
# 这个一般建议和Broker名字保持一致,一个Master加两个Slave会组成一个Group
dledgerGroup=RaftNode00
dledgerGroup=RaftNode00
# 对于每组Broker,必须保证他们这个配置是一样的,在这里假设有三台机器部署了Broker,要让他们作为一个组,那么在这里就得写入他们三个ip地址和监听的端口号
dLedgerPeers=n0-127.0.0.1:40911;n1-127.0.0.1:40912;n2-127.0.0.1:40913
dLedgerPeers=n0-127.0.0.1:40911;n1-127.0.0.1:40912;n2-127.0.0.1:40913
# 这个是代表了一个Broker在组里的id,一般就是n0,n1,n2之类的,这个你得跟上面的dledgerPeers中的n0,n1,n2相匹配
dLedgerSelfId=n0
dLedgerSelfId=n0
# 这个是发送消息的线程数量,一般建议你配置成跟你CPU核数一样,比如我们机器假设是24核的,那么这里就改成24核
sendMessageThreadPoolNums=24
sendMessageThreadPoolNums=24
思路
你的Broker是分多组的,每一组是三个Broker,一个Master和两个Slave。对于每一组Broker,他们的Broker名称,Group名称都是一样的。然后你的给他们配置好一样的dLedgerPeers(里面是组内三个Broker的地址)
然后他们得配置好对应得NameServer地址,最后还有就是每个Broker都有自己的ID,在组内唯一得即可,比如说不同组里面都有一个id为n0的broker,这个是可以的。
按照这个思路就可以轻松配置好一组broker,在三台机器上分别用命令启动Broker即可。启动完成后,跟NameServer进行通信,检查Broker集群的状态,命令:sh bin/mqadmin clusterList -n 127.0.0.1:9876
压测代码
可视化与集群监控
机器本身的监控
Zabbix, Open-Falcon
监控集群每台机器的性能和资源使用率。比如,CPU, IO, 磁盘,内存,JVM GC负载和情况
直接使用linux命令行的一些命令来检查这些东西的资源使用率,其实都是可以看到的。包括JVM GC的情况,都是可以通过命令行工具查看的。
压测准备工作
相关参数优化
相关参数优化
优化项
linux os内核参数
jvm各种参数
本身核心参数
具体操作
OS内核参数
大量使用内存
vm.overcommit_memory
值
0
中间件系统申请内存时,os会检查内存是否足够,如果足够就分配内存给你,如果认为不够了,就会直接拒绝你的申请,导致你内存申请失败,进而导致中间件系统异常出错。
1
把所有可用物理内存都分配给你,只要由内存就给你用,这样可以避免内存失败的问题。
2
优化命令
echo 'vm.overcommit_memory=1'>> /etc/sysctl.conf
vm.swappiness
用来控制进程swap行为的,os会把一部分磁盘空间作为swap区域,如果有的进程现在可能不是太活跃,就会被操作系统调整为睡眠状态,把进程中的数据放入磁盘上的swap区域,然后让这个进程把原来占用的内存空间腾出来,交给其他活跃运行的进程来使用。
值
0
尽量别把任何一个进程放到磁盘swap区域里去,尽量大家都使用物理内存。
10
尽量把一些进程给放到磁盘swap区域去,内存腾出来给活跃的进程使用。
60
默认这个参数是60,有点偏高,会导致我们的中间件运行不活跃的时候被迫腾出内存空间,然后放磁盘swap区域。
优化命令
echo 'vm.swappiness=10'>>/etc/sysctl.conf
中间件系统开启大量线程
vm.max_map_count
会影响中间件系统可以开启的线程数量,如果这个参数设置过小,会导致有些中间件无法开启足够的进程,进而导致报错,甚至中间件系统挂掉。默认65536,有时不够,可以设置为655360,调大十倍
echo 'vm.max_map_count=655360' >> /etc/sysctl.conf修改
大量网络通信和磁盘IO
ulimit
用来控制linux上最大文件连接数的,默认值可能是1024,一般是不够的,因为你在大量频繁的读写磁盘文件的时候,或者进行网络通信的时候,都会和这个参数有关。如果采用默认值,可能会出现如下错误:error:too many open files.
优化命令
echo 'ulimit -n 1000000' >> /etc/profile
JVM参数
-server
服务器模式启动
-Xms20g -Xmx20g -Xmn10g
48G内存
-XX:+UseG1GC -XX:G1HeapRegionSize=16m
选择G1垃圾回收器做分代回收,对新生代和老年代都用G1,这里把G1的region大小设置为了16m,这个因为内存比较多,所以region大小可以调大到16m,不然2m的region,会导致region数量过多的
总结
采用G1垃圾回收器,默认堆内存大小是8G
-XX:G1ReservePercent=25
在G1管理的老年代里预留25%的空闲内存,保证新生代对象晋升到老年代的时候有足够空间,避免老年代都满了,新生代有对象要进入老年代没有足够内存了。默认值10%偏少。
-XX:InitiatingHeapOccupancuPercent=30
当对内存的使用率达到30%之后就自动启动G1的并发垃圾回收,并开始尝试回收一些垃圾对象。默认值45%,这里调低了一些,提高了GC的频率,避免垃圾对象过多。一次垃圾回收耗时过长的问题。
-XX:SoftRefLRUPolicyMSPerMB=0
建议这个参数不要设置为0,避免频繁回一些软引用的Class对象,这里可以调整为1000
-verbose:gc -Xloggc:/dev/shm/ma_gc%p.log -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCApplicationStoppedTime -XX:+PrintAdaptiveSizePolicy -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=5 -XX:GCLogFileSize=30m
确认了gc日志文件的地址,要打印哪些详细信息,然后控制每个gc日志文件的大小是30m,最多保留5个gc日志文件。
-XX:-OmitStackTraceInFastThrow
有时JVM会抛弃一些异常堆栈信息,因此这个参数设置之后,就是禁用这个特性,要把完整的异常堆栈信息打印出来
-XX:+AlwaysPreTouch
这个参数的意思是我们刚开始指定JVM用多少内存,不会真正分配给他,会在实际使用的时候再分配给他。
-XX:MaxDirectMemorySize=15g
这是说RocketMQ里大量用了NIO中的direct buffer,这里限定了direct buffer最多申请多少。如果你机器内存比较大,可以适当调大这个值,如果你机器内存比较大,可以适当调大这个值。
-XX:+UseLargePages -XX:-UseBiasedLocking
这两个参数的意思是禁用大内存页和偏向锁。
RocketMQ核心参数
sendMessageThreadPoolNums=16
修改dledger示例配置文件:rocketmq/distribution/target/apache-rocketmq/conf/dledger
这个参数表示RocketMQ内部用来发送消息的线程池数量,默认是16,这个参数可以根据CPU核数进行适当的增加,比如机器24核,可以增加这个线程数量到24或者30。
总结
中间件系统在压测或者上生产之前,需要对三大块参数进行调整:OS内核参数,JVM参数,以及中间件核心参数
os内核参数主要调整的地方都是跟磁盘IO,网络通信,内存管理,以及线程管理有关的,需要适当调节大小。
JVM参数需要我们去中间件的启动脚本中寻找他的默认JVM参数,然后根据机器情况,对JVM堆内存大小新生代大小,Direct Buffer大小,等等,做出调整,发挥机器的资源。
中间件核心参数关注其中跟网络通信,磁盘IO,线程数量,内存管理相关的,根据机器资源,适当可以增加网络通信线程,控制同步刷盘或者异步刷磁盘,线程数量有多少,内存中一些队列的大小。
压测指标/结果分析
最合适的最高负载
在RocketMQ的TPS和机器的资源使用率和负载之间取得一个平衡
RocketMQ的TPS和消息延时
每条数据的大小是500个字节,和后续的网卡流量有关。
一条消息从Producer生产出来到经过RocketMQ的Broker存储下来,再到被Consumer消费,基本上这个时间跨度不会超过1秒钟,这个性能是正常的可以接受的。
Master Broker的TPS可以稳定的达到7万左右
cpu负载情况
用top命令可以看到cpu load和cpu使用率,这个代表了cpu负载情况。
使用top命令后,看到信息:load average: 12.03, 12.05, 12.08, 代表cpu在1分钟,5分钟,15分钟内cpu负载情况,如果机器是24核的,12的意思就代表有12个核在使用,剩余12个还没用,cpu还有很大的余力。
内存使用率
用free命令可以查看内存的使用率
48G内存只使用了很小一部分,很大部分都是空闲可用,或者是被RocketMQ用来进行磁盘数据缓存了。
JVM GC频率
用jstat命令可以看到RocketMQ的JVM的GC频率
压测时,基本上新生代每隔几十秒就会垃圾回收一次,每次回收过后存活的对象很少,几乎不进入老年代。因此FullGC几乎一次都没有。
磁盘IO负载
用top命令看IO等待占用CPU时间的百分比
Cpu(s): 0.3% us, 0.3% sy, 0.0% ni, 76.7% id, 13.2 wa, 0.0% h, 0.0% si
wa
磁盘IO等待在CPU执行时间种的百分比,如果比例太高,说明CPU执行大部分时间在等待IP,IO负载很高,导致大量IO在等待。
压测时,40%左右,相对高一些,但是还是可以接受的,如果再高,就不行,说明磁盘IO负载可能过高了。
网卡流量
用sar -n DEV 1 2 查看网卡流量。可以看到每秒网卡读写数据量。
压测时服务器使用的是千兆网卡,理论上每秒可以传输128M数据,但是一般实际最大值是每秒传输100M数据。
在RocketMQ处理到每秒7万消息的时候,每条消息500字节左右大小的情况,每秒网卡传输数据量已经达到100M了,已经达到网卡的一个极限值了。
一个Master Broker服务器,每秒不光是通过网络接收写入的数据,还要把数据同步给两个Slave Broker,还有别的网络通信的开销。因此实际压测时发现,每条消息500字节,每秒7万消息的时候,服务器网卡几乎就已经打满了,无法承受更多的消息了。
总结
压测结果分析
经过压测,最终发现我们的服务器性能瓶颈在网卡上,因为网卡每秒能传输的数据是有限的,当我们使用平均大小为500字节消息的时候,最多是做到了单台服务器每秒7万TPS,而且这个时候cpu负载,内存负载,jvm gc负载,磁盘io负载,基本都在正常范围内,但是网卡流量打满就无法再提升TPS了。
压测思路
在TPS和机器的cpu负载,内存使用率,jvm gc频率,磁盘io负载,网络流量负载之间取得一个平衡,尽量让TPS尽可能的高,同时让各项资源负载不要太高。
压测过程
采用几台机器开启大量线程并发读写消息,然后观察TPS,cpu load(使用top命令),内存使用率(使用free命令),jvm gc频率(使用jstat命令),磁盘io负载(使用top命令),网卡流量负载(使用sar命令),不断增加机器和线程,让TPS提上去,同时观察各项资源负载是否过高。
生产集群规划
根据公司后台整体QPS来定,稍微多冗余部署一些机器即可。实际部署生产环境集群时,使用高配置物理机,同时合理调整os内核参数,jvm参数,中间件核心参数,如此即可。
架构特点
高可用
NameServer随便一台机器挂了也不怕,因为是集群化部署,每台机器都有完整信息。
Broker挂了也不怕,挂了Slave对集群没太大影响,挂了Master也会基于Dledger技术实现自动Slave切换为Master。
生产者消费者挂了一台也不怕,因为是集群化部署,其他机器会接管工作。
高并发
假设生产者系统,对topic发起每秒10万qps写入,只要topic分配到5台broker上,实际上每个broker会承载2万qps写入,高并发场景下10万qps可以分散到多台broker上扛下来。
海量消息
分布式存储,支持存储海量消息
可伸缩
如果要扛下更高的并发,存储更多的数据,可以在集群里加入更多的broker机器,就可以线性扩展集群了。
幂等性机制
业务判断法
基于redis缓存的状态判断法
引入redis缓存来存储你是否发送过消息的状态。如果你成功发送了一个消息到MQ里去,你得在redis缓存里写入一条数据。
消息消费异常处理
如果对消息的处理有异常,可以返回RECONSUME_LATER状态,意思是现在没法完成这批消息的处理,麻烦你稍后过段时间再次给我这批消息让我重试一下。然后会把你这批消息放到你这个消费组的重试队列中去。比如你的消费组名称是“VoucherConsumerGroup”,意思是优惠券的消费组,那么他会有一个“%RETRY%VoucherConsumerGroup”这个名字的重试队列。默认最多重试16次。
重试时间间隔可以进行如下配置:messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
如果在16次重试范围内消息处理无法成功,就需要自动进入死信队列,名称是“%DLQ%VoucherConsumerGroup”,根据我们的使用场景,对私信队列中的消息,还是要一直不停的重试。
延迟消息
代码
发送延迟消息的核心,是设置消息的delayTimeLevel
默认支持一些延迟级别如下:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
如果设置为3,意思是延迟10s
如果设置为3,意思是延迟10s
RocketMQ生产经验
1.灵活运用tags来过滤数据。
2.基于消息key来定位消息是否丢失。
mqadmin queryMsgByKey -n 127.0.0.1:9876 -t SCANERCORD -k orderId
3.消息零丢失方案的补充。
4.提高消费者的吞吐量。
可以设置consumer端的参数:consumeThreadMin,consumeThreadMax,这样一台consumer机器上的消费线程越多,消费的速度就越快。
可以开启消费者批量消费的功能,设置consumeMessageBatchMaxSize参数,默认是1,设置的多一些,就会交给你的回调函数一批消息处理
5.消费历史消息
一个是从Topic的第一条数据开始消费。
CONSUME_FROM_LAST_OFFSET
一个是从最后一次消费过的消息之后开始消费。
CONSUME_FROM_FIRST_OFFSET
生产环境RocketMQ其他功能
企业级的权限控制
配置
消息轨迹追踪功能
broker
broker开启配置traceTopicEnable=true,存储消息轨迹追踪数据的。
开启上述选项后,启动这个broker的时候,自动创建出来一个topic,就是RMQ_SYS_TRACE_TOPIC
Broker端也会记录消息的轨迹数据,包括:消息存储的Toic,消息存储的位置,消息的key,消息的tags
producer/consumer
生产者开启代码
Producer会上报这些数据到RMQ_SYS_TRACE_TOPIC:Producer的信息,发送消息的时间,消息是否发送成功,发送消息的耗时。
consumer也会记录这些数据,包括:Consumer的信息,投递消息的时间,这是第几轮投递消息,消息消费是否成功,消费这条消息的耗时。
0 条评论
下一页