Rocketmq知识图谱
2024-09-25 11:19:41 0 举报
AI智能生成
RocketMQ是一款由阿里巴巴集团开发的高性能、高可用、高吞吐量的分布式消息中间件。它采用消息队列模型,提供主题、订阅、发布等多种通信模式,适用于异步通信、削峰填谷、分布式事务等多种应用场景。RocketMQ知识图谱描绘了RocketMQ的核心概念、通信模型、技术架构以及应用场景等关键信息。它帮助用户更好地理解RocketMQ的设计理念与实现原理,为开发者提供了宝贵的参考资料。
作者其他创作
大纲/内容
业务系统问题
下单核心流程环节太多,性能较差
跟第三方物流系统耦合在一起,性能存在抖动的风险
做秒杀活动时订单数据库压力过大
订单退款的流程可能面临退款失败的风险
关闭过期订单的时候,存在扫描大量订单数据的问题
大数据团队要获取订单数据,存在不规范直接查询订单数据库的问题
MQ解决的问题
异步&解耦
流量削峰
MQ调研选型
考察点
可靠性
业内常用MQ
集群化支持
QPS-性能如何
社区支持粒度和文档建设
常用功能是否完备
横向对比
Rabbitmq
优点
集群化部署
高可用
消息不丢失
支持功能较完备
缺点
吞吐量是这三个产品中最低的较低
erlang语言开发,不变阅读和修改源码做定制化
对集群的横向拓展复杂
总结
适用于不需要太高吞吐量,不做定制化修改的中小型公司
Rocketmq
优点
集群化部署
高可用性
消息可以通过合理配置,做到不丢失
功能完备
java开发,易读性和二开方便
吞吐量相对较高
缺点
官方文档内容较为简单,需要通过实践和源码配合分析
总结
适合广泛使用
Kafka
优点
吞吐量极高
支持集群化部署
缺点
功能相对单一
配置不合理会丢消息
总结
适用于需要高吞吐量,允许适当的消息丢失,不需要太高级功能|日志采集|标杆
总结:https://rocketmq.io/faq/ons-user-question-history16731/?spm=0.29160081.0.0.766c432aTPqqvr
基础架构
节点
Producer
通过tcp与name server建立长连接,定时拉取broker元数据
元数据信息包括:Topic路由信息、messagequeue在哪些broker中
每30s自动和name server进行一次数据获取&更新
Consumer
通过tcp与name server建立长连接,定时拉取broker元数据
Nameserver
用于管理broker server,类似于zookeeper的功能
本身是无状态的,所有的数据是存放在内存当中
支持集群部署,用来规避单节点风险,各节点间数据是不做同步的
Broker
部署架构模式
单节点模式
存在单节点风险
主从架构模式
在master节点故障之后,无法自动恢复
自动切换的Dledger模式
Dledger是什么?
本质上是Raft协议的一个具体实现形式,是一个Java Lib
在Rocketmq怎么实现的?
使用Dledger Commitlog来替换了Rocketmq原生的commitlog,来管理消息的写入
使用Dledger来管理Leader的选举进行
选主是如何进行的?
选主是采用了Raft协议的选举方法,主要分为了两部分
Term 任期
Vote 票数
各节点的角色?
Leader
Follower
Candidate
基于Raft协议进行Leader选举
各个节点在初始状态的情况下,都是Follower角色
先发起一轮投票,都投自己
每个节点都有各自的随机休眠时间,先到休眠时间的节点,会投自己,其他节点会尊重已有投票的选择
优先选择Term大的,再选Vote多的
基于Raft协议进行多副本同步
(两段提交)
(两段提交)
Leader Broker上的Dledger接到一条消息后,会先标记为uncommitted状态
通过DledgerServer组件将消息发送给Follow Broker的DledgerServer
收到消息的Follow Broker必须返回给Leader Broker一个ack
Leader Broker收到半数以上的Follow Broker返回的ack后,将这条消息标记为committed状态
Leader Broker崩溃的话怎么办
Dledger基于raft重新选举Leader,继续对外提供服务。
新选举出的Leader会对没完成数据同步的数据进行恢复性操作,保证数据不会丢失。
其他备注
其实理解起来和Zab协议很类似,对Leader的选举也是Epoch优先,再选SID,再选Mid
因为选举过程中或者选举完成以后,Leader节点会不间断给各个Follower进行心跳探测,这不仅仅是重置了各Follower节点的超时时间
也将数据信息同步给了各个Follower节点
也将数据信息同步给了各个Follower节点
Raft动图:http://www.kailing.pub/raft/index.html
Dledger具体落地
Controller是嵌入到Name server中
Controller是独立集群部署
对于Broker而言,启动没有什么不同,只是在配置文件中新增了部分配置
enableControllerMode:Broker controller 模式的总开关,只有该值为 true,自动主从切换模式才会打开。默认为 false。
controllerAddr:controller 的地址,多个 controller 中间用分号隔开。
概要
注册
Broker Server需要向每个Name Server进行注册
master和slave节点都需要注册
Broker 和Name Server之间是TCP的长连接
心跳机制
Broker 向Name Server每30s发送一次心跳
源码中,心跳就是重新发起了一次注册
Name Server内部维护了一个ConcurrentHashMap来存储Broker的注册信息
Name Server有线程每10s对Broker Server进行一次探活
120s内没有接到Broker心跳确认,认为Broker宕机
数据同步
Producer向Name Server拉取Broker信息
主从同步:Slave节点向Master节点拉取数据
Consumer节点消费Broker中数据
支持读写分离
Rocketmq的读写分离,slave节点的度并不是人为可以控制的,也并不是传统意义上的读写分离
是否从slave节点读取消息,受限于两个条件
master节点的负载情况,由master节点对负载进行判断后,给下一次的数据拉取提供建议
具体的负载情况是受限于内存使用:当主服务器积压的消息超
过了物理内存的 40%,则建议从从服务器拉取
过了物理内存的 40%,则建议从从服务器拉取
如果 slaveReadEnable 为 false,表
示从服务器不可读,从服务器也不会接管消息拉取
示从服务器不可读,从服务器也不会接管消息拉取
slave如果落后master节点过多,就只能在master节点进行读取
故障自动切换
slave节点挂掉,对整体集群影响不大
master挂掉需要运维工程师手动调整配置,把slave切换成master,不支持自动主备切换
master-slave不是彻底的高可用,无法实现自动主备切换,version 4.5后,引入了Dledger,实现了高可用自动切换
核心架构
核心数据模型
Topic
一个Topic可以有上百万甚至更多的数据
消息的分布式存储,实际上是分散在各个Broker当中
Topic可以设置多个message queue做了一个逻辑分割,让数据分布式存储
这种概念,相当于kafka的partition,分散在各个broker中
Message queue
Producer
Name Server集群
拉取Topic的元数据信息
写入数据
均匀写入
master broker1
topic
message queue1
一个message queue由一个消费进程处理
message queue2
master broker2
topic
message queue3
message queue4
故障无法写入
开启sendLatencyFaultEnable自动容错机制
未成功访问broker时,自动回避一段时间
如何选择message queue
默认轮询机制
指定message queue
自定义选择器(Selector)
Broker数据存储
CommitLog消息顺序写入机制
样例: 接到消息
CommitLog
消息a
消息b
消息c
消息d
......
消息a
消息b
消息c
消息d
......
可以是很多log文件,默认每个文件限制1GB大小
采用PageCache写入+OS异步刷盘+磁盘顺序写的策略,
提升性能,基本和使用内存差不多(为什么性能高的问题)
提升性能,基本和使用内存差不多(为什么性能高的问题)
即不直接写入磁盘,先写入OS的pageCache中,再由OS线程异步刷入磁盘
当前Broker下的所有的Topic的message queue消息都顺序写入commitlog文件
在Page Cache中,相当于基于内存操作,此时Broker宕机有数据丢失风险
可以设定为同步刷盘,即强制写入磁盘后才返回ack,会导致吞吐量急剧下降
同步刷盘,除非磁盘损坏,否则数据不会丢失
ConsumeQueue
对应MessageQueue Topic下的每个MessageQueue都会有一系列的ConsumeQueue文件
$HOME/store/consumequeue/{topic}/{queueId}/{filename}
ConsumeQueue里面储存的是CommitLog的偏移量offset、消息的长度和tag的hash
ConsumeQueue同样是基于OS cache的
大部分都是放在OS cache中的,性能非常高
它的每条数据大概在20个字节,30w消息可能才5.7M
如果发送和消费的频率相似,基本上消息写入page cache中,读取也在page cache中,等同于内存操作
如果有大量的消息积压,就会导致消息不断从Disk中进行读取,性能就很低
Consumer Offset
集群模式
任意一条消息,有且仅有一个消费组中的一个机器可以拉取到消息
消费进度是存放在Broker 当中进行统一维护
文件路径通常为${user.home}/store/config/consumerOffset.json
Broker启动时,会加载这个文件,并将其内容加载到一个双层Map(ConsumerOffsetManager)中,以便快速访问和管理消费进度。
广播模式
消费组中的每一个机器可以拉取到消息
因为每个消费者都可以消费到消息,所以需要自行维护Consumer Offset
消费进度(Consumer Offset)会以JSON格式持久化到消费者本地磁盘文件中
默认文件路径为当前用户主目录下的.rocketmq_offsets/${当前消费者ID}/${消费者组名}/offsets.json
实时消息
Rocket MQ提供给我们push|pull 两种消息拉取机制
Push
推送消息->即MQ主动将消息推送给消费者
Rocket MQ的push底层实现其实是一次次的pull request
这就产生了一个问题,如果消费者实时感知消息,就需要通过不断的轮询进行消息的拉取
consumer不断轮询,一次pull request不到,马上就进行下一次pull request,这无疑是非常消耗性能的。
Pull
拉取消息->即消费者通过轮询机制从broker中拉取消息
可能会出现消息积压的问题
长轮询机制
由客户端发起pull请求,服务端接收到客户端请求后,如果发现队列中没有消息,并不立即返回,而是持有该请求一段时间,在此期间,服务端不断轮询队列中是否有新消息
如果有,则用现在连接将消息返回给客户端,如果一段时间内还是没有消息,则返回空
好处在于,其本质还是pull,所以消息处理的主动权还是在客户端手里,客户端可以根据自己的能力去做消息处理
而服务端持有请求一段时间的机制由很大程度避免了空拉取,减少资源浪费
而服务端持有请求一段时间的机制由很大程度避免了空拉取,减少资源浪费
但是这种机制也有一定的问题,当客户端数量过多时,服务端可能在某个时间段内需要持有过多的连接,服务端压力比较大
不过一般来说,消息队列的承压能力还是比较可靠的,再加上集群的保障,基本不用担心这个问题。
不过一般来说,消息队列的承压能力还是比较可靠的,再加上集群的保障,基本不用担心这个问题。
消息拉取
ConsumeGroup大部分在OS cache中,读取性能很高
CommitLog数据很多,不可能全放在OS cache中,先读取cache中的,再读磁盘
关于何时master让消费者去slave拉取消息? 在ConsumeQueue做了说明
生产配置和压测
机器配置
Name Server
3台及以上
8C16GB,实际利用率非常低,可以参考Zookeeper的配置机器
Broker
3台
24C48GB
单集群->单master & 双slave的Dledger集群模式
Producer
2台
4C8GB
Consumer
2台
4C8GB
监控
MQ监控
Prometheus 利用exporter进行监控
可以采用JMX进行监控
用Rocketmq-Console Dashboard进行监控管理
主机监控
Prometheus
Zabbix
参数调整
操作系统
vm.overcommit_memory=1
把所有可用的物理内存都分配给你
vm.swappiness=10
尽量使用磁盘内存,不要放到swap中去
vm.max_max_count=655360
保证中间件可以开启足够多的线程
ulimit=1000000
最大文件连接数,保证网络通信和磁盘io
JVM
大体就是优化jvm相关参数,rocketmq默认是g1回收器
Rocketmq
sendMessageThreadPoolNums=10
内部用来发送消息的线程数,可根据机器配置相应调整(一般和CPU核数相似)
压测
测试出最高负载
即MQ的TPS和机器的资源和负载之间取得一个平衡
需要关注的影响点
RocketMQ的tps和消息延时
cpu负载情况
内存使用率
jvm gc频率
磁盘io负载
网卡流量
sar -n DEV 1 2
这个命令会显示当前网络接口的实时使用情况,包括每个接口的发送和接收数据包的数量、字节数等信息。但是,它不会自动每隔一段时间更新数据。
应用
发送消息
单向发送
这种模式就是只管发送,不管MQ是否收到消息
类似于Kafka中的ACK=0的时候
produce.SendOneWay(msg);
同步发送
默认是采用了同步刷盘策略
Producer发送给Broker之后,会等待Broker响应(超时时间默认为5分钟)
SendResult send = produce.send(msg)
异步发送
默认采用的是异步刷盘策略
Producer发送给Broker之后,不会同步等待Broker响应(超时时间默认为5分钟)
可以调整失败重试次数
produce.send(msg, new SendCallBack(){
@Override
public void onSuccess(){
}
@Override
public void onException(Throwable e){
}
})
@Override
public void onSuccess(){
}
@Override
public void onException(Throwable e){
}
})
各类型对比
发送方式 优点 缺点 使用场景
同步发送 可靠性高,简单易用 延迟较高,吞吐量受限 订单系统、金融交易、重要的消息通知等
异步发送 非阻塞,延迟较低 实现复杂度高,可靠性相对降低 实时数据处理、日志采集、消费信息的推送等
单向发送 高效,延迟最低 无法确认消息是否成功发送,可靠性最低 日志收集、监控数据上报等
同步发送 可靠性高,简单易用 延迟较高,吞吐量受限 订单系统、金融交易、重要的消息通知等
异步发送 非阻塞,延迟较低 实现复杂度高,可靠性相对降低 实时数据处理、日志采集、消费信息的推送等
单向发送 高效,延迟最低 无法确认消息是否成功发送,可靠性最低 日志收集、监控数据上报等
消费消息
Push
Pull
秒杀业务场景
CDN+Nginx,Lua脚本+Redis三级缓存处理静态页面
下单前加入答题模块,避免脚本疯狂调用接口以及错峰
独立部署一套秒杀响应的服务
redis控制扣减库存,快速响应
抢购完毕时,Lua脚本过滤无效请求
瞬时流量进入rocketmq削峰
延时消息
应用场景
电商场景下,下订单15min未付款则关闭订单
针对此种场景,不能频繁扫描db,数据量太大
可以通过MQ的消息延迟机制,先将订单消息发送到MQ,设定15min后消费者可见这条消息
订单服务消费者获取到MQ消息,此时查询订单状态,若仍旧为未付款,则关闭订单
message.setDelayTimeLevel(3)也就是延迟级别,跟消息重发机制那个一样
顺序消息
场景
多个consume消费消息,导致消息消费无序
解决方案
取模选择对应messageQueue+批量等待机制
首先要保证消息是有序进入MQ的
消息放入MQ之前,对id等关键字进行取模,放入指定messageQueue
consume消费消息失败时,不能返回reconsume——later,这样会导致乱序
应该返回suspend_current_queue_a_moment,意思是先等一会,一会儿再处理这批消息,而不是放到重试队列里
应该返回suspend_current_queue_a_moment,意思是先等一会,一会儿再处理这批消息,而不是放到重试队列里
Tag实现消息过滤
设置tag,consume进行消费的时候可以根据tag进行过滤
consume.subscribe("TopicOrderDbData","Table A || Table B")
consume.subscribe("TopicOrderDbData",MessageSelector.bySql("a > 5 and b = 'abc'"))
consume.subscribe("TopicOrderDbData",MessageSelector.bySql("a > 5 and b = 'abc'"))
RocketMQ还是支持比较丰富的数据过滤语法的
1.数值比较,比如:>,>=,<,<=, BETWEEN, =;
2.字符比较,比如:=,<>, IN;
3.IS NULL或IS NOT NULL;
4.逻辑符号AND, OR, NOT;
5.数值, 比如:123,3.1415;
6.字符, 比如:'abc',必须用单引号包裹起来;
7.NULL,特殊的常量
8.布尔值,TRUE或者FALSE
2.字符比较,比如:=,<>, IN;
3.IS NULL或IS NOT NULL;
4.逻辑符号AND, OR, NOT;
5.数值, 比如:123,3.1415;
6.字符, 比如:'abc',必须用单引号包裹起来;
7.NULL,特殊的常量
8.布尔值,TRUE或者FALSE
消息幂等
消息重复的场景
系统间调用的重复机制
A->B,B系统处理慢导致超时,A系统重新发送请求导致重复
A->B,B系统处理慢导致超时,A系统重新发送请求导致重复
上述B系统后有个将消息放入MQ的操作
手动提交offset未完成
手动提交offset未完成
消费者成功消费完消息,未返回consume_commit时,系统重启|系统宕机,
MQ重新发送消息到同消息组其他消费者机器,导致消息重复
MQ重新发送消息到同消息组其他消费者机器,导致消息重复
重试机制导致的问题
消息成功发送到MQ中,但MQ因网络原因未能成功返回,导致重试机制重试机制重复发送到MQ
消息成功发送到MQ中,但MQ因网络原因未能成功返回,导致重试机制重试机制重复发送到MQ
解决方案
producer
业务判断法
操作方法
向MQ发送消息时,先确认消息是否在MQ中
缺点
MQ虽然有这个查询功能,但是不建议使用,性能不好
msg.setKeys(orderId);
broker会基于这个key, hash创建一个索引,放到indexFile中
可以根据mq提供的命令,用这个id来查询消息是否存在
未来得及将消息成功写入redis,系统就宕机了,仍然会有重复消息的问题
基于redis的幂等机制
Redis记录是否已成功发送此条消息
consumer
业务判断法
db中数据的状态翻转,查一下就知道是不是消费过了
消息重发及死信队列
场景
生产者发送失败重试
同步或异步发送失败
对于普通消息,默认会重试2次
重试时不会选择上次发送失败的Broker,而是选择其他Broker
会尽可能保证发送成功,但有可能会导致消息重复
oneway消息发送
发送失败是没有重试机制的
消费者消费失败重试
无序消息(普通消息、延时消息、事务消息)
try-cache捕获异常,返回reconsume_later
即通知MQ将消息放到针对你这个消费组的consumeQueue创建的重试队列里,过一会再让消费者消费
消费者组内的重试队列
例如Test_group,重试队列%retry%test_group
集群消费模式下默认最多16次,每次重试间隔如下
messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
顺序消息
对于顺序消息,当消费者消费消息失败后,为了保证消息的顺序性,其会自动不断地进行消息重试,直到消费成功。
消费重试默认间隔时间为1000毫秒。
网络问题导致的重发
发送时网络问题
当一条消息已被成功发送到服务端并完成持久化,但此时出现了网络闪断或客户端宕机,导致服务端对客户端的应答失败。
如果生产者意识到消息发送失败并尝试再次发送消息,消费者后续会收到两条内容相同且Message ID也相同的消息。
如果生产者意识到消息发送失败并尝试再次发送消息,消费者后续会收到两条内容相同且Message ID也相同的消息。
消费时网络问题
消息已投递到消费者并完成业务处理,但客户端给服务端反馈应答时网络闪断。
为了保证消息至少被消费一次,RocketMQ的服务端将在网络恢复后再次尝试投递之前已被处理过的消息,这也可能导致消费者收到重复消息。
为了保证消息至少被消费一次,RocketMQ的服务端将在网络恢复后再次尝试投递之前已被处理过的消息,这也可能导致消费者收到重复消息。
死信队列
16次重试失败,则放入死信队列
%DLQ%Test_group
这个消息在rocket mq后台管理平台可以看见
后续可以开个后台线程,处理死信队列内的消息
消息零丢失
Producer
事务消息机制->half-commit
流程
先发送一条half消息到MQ,看MQ是不是还活着
成功
说明MQ状态正常,继续做后续流程
这个half消息在消费者看来是不可见的
half消息被存储在特定的系统主题RMQ_SYS_TRANS_HALF_TOPIC中
失败
就不会发送消息给MQ了
执行本地逻辑,db插入等
本地方法失败
发送一个rollback消息到MQ,在内部op_topic中记录此条half消息为rollback状态
成功
CRUD,继续走后续流程
提交commit到MQ
失败
本地操作回滚,db数据状态回滚
MQ后台线程定时扫描half消息,长时间未commit/rollback则回调系统接口,若询问15次后仍未返回,则默认消息rollback了
查询db状态,已关闭则返回rollback给MQ,在内部op_topic中记录此条half消息为rollback状态
处于此种状态的MQ message是对consumer端不可见的
成功
消息已经100%进入到了MQ
同步发送+不断重试机制
流程
先执行本地方法
本地方法执行完成后,同步调用MQ消息
MQ发送失败,重试三次
依旧失败的话回滚
kafka用的就是这种机制,但是这种机制仍然存在问题
MQ在本地事务之外
如果本地方法执行成功,但是发送MQ失败,在重试的时候,本地服务崩溃了就会导致本地方法已经执行完了,
但是MQ没有收到消息,消息就丢了
但是MQ没有收到消息,消息就丢了
MQ在本地事务之内
看似解决了问题,但重试会导致性能接口非常差,而且本地方法中可能还有写redis/es的操作,是无法通过事务回滚的。
Broker
默认异步刷盘->同步刷盘
流程
异步刷盘
MQ接收到消息,写入CommitLog(OS cache)中,写入consumeQueue(OS cache)中
后台线程异步将os cache的数据刷入磁盘
消费者直接从os cache中拿到consumeQueue的偏移量offset对应的commitLog的消息,进行消费
如果此时broker宕机,消息就丢了,因为此时消息都在os cache中,未写入磁盘持久化
同步刷盘
MQ接收到消息,写入commitLog(os cache)中,写入consumeQueue(os cache)中
强制要求同步刷入磁盘
flushDiskType由默认的ASYNC_FLUSH->SYNC_FLUSH
flushDiskType由默认的ASYNC_FLUSH->SYNC_FLUSH
消费者直接从OS cache中拿到consumeQueue的偏移量offset对应的commitLog消息进行消费
数据落入磁盘中,宕机也不会丢失消息
master-slave
上述方案在磁盘坏了的时候,仍然会丢失消息,通过主从架构将消息备份到slave,多备份机制避免因磁盘损坏导致的消息丢失问题
Consumer
手动提交offset+自动故障转移
成功消费消息后,返回一个consume_commit标识
如果消息未消费完宕机,则本地事务回滚,未返回consume_commit标识给MQ
注意,消息消费不能异步
MQ感知到消费者机器宕机,且未完成消息消费,自动将消息发送给消费者组内的其他机器进行消费
消费者机器宕机时,MQ会执行rebalance, 重新分配messagequeue对应消费者机器
拓展
Netty引入
Produce
TCP长连接
端口
Broker
Reactor主线程
SocketChannel
长连接发送消息
SocketChannel
Reactor线程池
线程1
线程2
线程3
线程4
...
Worker线程池
线程1
线程2
线程3
线程4
...
线程1
线程2
线程3
线程4
...
Worker线程池
线程1
线程2
线程3
线程4
...
mmap内存映射技术+pageCache实现性能读取
正常IO读取数据会经过两次内核态切换,两次数据拷贝
用户私有进程->内核IO缓冲区->磁盘
mmap内存映射技术,映射用户私有进程地址和磁盘数据地址到pageCache不需要从OS cache拷贝至用户私有进程
不能太大,1.5G到2G之间
写入同理,直接进入pageCache,然后异步刷盘
除此之外,还有内存预映射机制,文件预热来优化性能
企业级权限控制
通过在broker端放一个acl权限文件,在里面规定好权限,哪个消费者对应哪个topic有什么操作权限等
1、需要在 broker.conf 文件中,增加参数 aclEnable=true。
2、并拷贝 distribution/conf/plain_acl.yml 文件到${ROCKETMQ_HOME}/conf 目录。
3、plain_acl.yml样例参考备注。
2、并拷贝 distribution/conf/plain_acl.yml 文件到${ROCKETMQ_HOME}/conf 目录。
3、plain_acl.yml样例参考备注。
参考连接:https://rocketmq.apache.org/zh/docs/bestPractice/03access/
MQ百万级消息积压处理
一般如果消息不重要的话就在consume上直接释放掉
如果topic的messageQueue设置的比较多,比如设置了20个,consume实例只有4个,那么每个consume实例对应5个messageQueue,这个时候可以申请临时加机器,增加consume实例为20个,达到快速消费的目的
如果messageQueue设置的比较少,比如只设置了4个,那么这个时候就不能通过加consume机器来解决了,这时候就需要修改消费者代码了,不再消费者消费,而是把要消费的消息放到mq的另一个topic中,这个topic设置20个messageQueue,对应20个consume实例,进行消费
金融级系统设计MQ高可用方案
try-catch机制,发送消息到MQ,失败的话捕获,重试3次,依旧失败则认为MQ集群崩溃
这个时候需要把消息有序的放到DB/NO SQL中,待MQ集群恢复正常后,再进行消费
注意:此方案保证消息写入有序
MQ限流方案
防止程序bug等导致疯狂写入MQ消息的情况
可采用令牌桶算法等来进行限制
避免因此种情况导致MQ集群挂掉
MQ迁移方案
双写+双读方案同步数据
MQ生产环境消息追踪
broker的配置文件里开启traceTopicEnable=true
启动broker时,会自动创建一个名为rmq_sys_trace_topic的topic存放消息轨迹记录
通过rocket-console上面的消息轨迹,创建查询任务,也可以看到消息的轨迹
收藏
0 条评论
下一页