RocketMQ知识体系梳理
2023-12-27 15:05:49 45 举报
AI智能生成
RocketMQ知识体系梳理
作者其他创作
大纲/内容
ConsumeGroup
多个机器可同属于一个消费组,例如库存服务4个机器。
消费模式
集群模式
消费组中的一个机器可以拉取到消息
广播模式
消费组中的每一个机器可以拉取到消息
宕机/加机器时,会进行Rebalance,重新分配消费者对应的MessageQueue
Dledger实现高可用
DledgerCommitLog代替Broker管理CommitLog
基于Raft协议选举Leader Broker
先发起一轮投票,都投自己
随机休眠,再次投票,先醒来的投自己,并发送投票给其他人,其他人醒来因为自己没投票,就会尊重发送投票的选择
基于Raft协议进行多副本同步
Leader Broker上的Dledger接到一条消息后,会先标记为uncommitted状态
通过DledgerServer组件将消息发送给Follow Broker的DledgerServer
收到消息的Follow Broker必须返回给Leader Broker一个ack
Leader Broker收到半数以上的Follow Broker返回的ack后,将这条消息标记为committed状态
Leader Broker崩溃的话怎么办
Dledger基于raft重新选举Leader,继续对外提供服务。
新选举出的Leader会对没完成数据同步的数据进行恢复性操作,保证数据不会丢失。
Broker数据存储
CommitLog消息顺序写入机制
样例: 接到消息
CommitLog
消息a
消息b
消息c
消息d
......
消息a
消息b
消息c
消息d
......
可以是很多log文件,每个文件限制1GB大小
采用PageCache写入+OS异步刷盘+磁盘顺序写的策略,提升性能,基本和使用内存差不多
即不直接写入磁盘,先写入OS的pageCache中,再定时异步刷入磁盘,顺序写入文件
相当于基于内存操作,有数据丢失风险
可以设定为同步刷盘,即强制写入磁盘后才返回ack,除非磁盘坏了,不然会导致吞吐量急剧下降
ConsumeQueue
对应MessageQueue Topic下的每个MessageQueue都会有一系列的ConsumeQueue文件
$HOME/store/consumequeue/{topic}/{queueId}/{filename}
ConsumeQueue里面储存的是CommitLog的偏移量offset
ConsumeQueue同样是基于OS cache的
它的每条数据非常小,30w消息可能才5.7M
大部分都是放在OS cache中的,性能非常高
核心数据模型
Topic
一个topic中可能有上百万的数据
分布式存储在多个broker上
topic可以设置多个MessageQueue, 分布在不同的broker上
Message Queue
生产者
NameServer集群
拉取Topic元数据
写入数据
均匀写入
Master Broker
Topic
MessageQueue
一个MessageQueue对应一个消费者机器处理
MessageQueue
Topic
MessageQueue
MessageQueue
故障无法写入
开启sendLatencyFaultEnable自动容错机制
未成功访问broker时,自动回避一段
核心架构
节点
Produce
通过tcp与name server建立长连接,定时拉取broker元数据
NameServer
用于管理broker
支持集群化部署
Peer-to-peer
Broker
支持集群化部署
主从架构
Master Broker
Slave Broker
Master Broker
Slave Broker
Master Broker
Slave Broker
Consume
概要
注册
broker需要向每个NameServer注册
master和slave都需要注册
通过tcp长连接与broker通信
broker跟每个NameServer建立长连接
心跳机制
broker每30s向NameServer发送一次心跳
源码中,心跳即重新发送了一次注册
NameServer内部维护了一个ConcurrentHashMap存储注册的broker
NameServer每10s钟检查一次心跳
120s未接到心跳,则认为该broker宕机了
数据同步
Produce向NameServer拉取broker信息
主从同步:slave从master拉取数据
consume从broker拉取数据消费
支持读写分离
master根据自己的负载,向consume发送建议,是否从slave拉取消息
slave同步落后过多时,master建议consume只从master拉取消息
比如100w消息,slave才同步了96w
故障自动切换
slave挂掉对整体有点影响不过不大
master挂掉需要运维工程师手动调整配置,把slave切换成master,不支持自动主备切换
master-slave不是彻底的高可用,无法实现自动主备切换,version 4.5后,引入了Dledger,实现了高可用自动切换
生产配置以及压测
机器配置
NameServer
3台
8C16GB
Broker
3台
24C48GB
单集群->单master & 双slave
Produce
2台
4C8GB
Consume
2台
4C8GB
监控
rocket-console可视化监控平台
MQ监控
Zabbix || Open-Falcon
机器资源监控。 cpu || io || jvm
参数调整
操作系统
vm.overcommit_memory=1
把所有可用的物理内存都分配给你
vm.swappiness=10
尽量使用磁盘内存,不要放到swap中去
vm.max_max_count=655360
保证中间件可以开启足够多的线程
ulimit=1000000
最大文件连接数,保证网络通信和磁盘io
JVM
大体就是优化jvm相关参数,rocketmq默认是g1回收器
MQ核心参数
sendMessageThreadPoolNums=10
内部用来发送消息的线程数,可根据机器配置相应调整
压测
测试出最高负载
即MQ的TPS和机器的资源和负载之间取得一个平衡
需要关注的影响点
RocketMQ的tps和消息延时
cpu负载情况
内存使用率
jvm gc频率
磁盘io负载
网卡流量
sar -n DEV12
应用
发送消息
同步发送
SendResult send = produce.send(msg)
异步发送
可以调整失败重试次数
produce.send(msg, new SendCallBack(){
@Override
public void onSuccess(){
}
@Override
public void onException(Throwable e){
}
})
@Override
public void onSuccess(){
}
@Override
public void onException(Throwable e){
}
})
单向发送
produce.SendOneWay(msg);
消费消息
push
监听器监听是否有消息待消费
pull
while循环不断拉取
秒杀场景处理
CDN+Nginx,Lua脚本+Redis三级缓存处理静态页面
下单前加入答题模块,避免脚本疯狂调用接口以及错峰
独立部署一套秒杀响应的服务
redis控制扣减库存,快速响应
抢购完毕时,Lua脚本过滤无效请求
瞬时流量进入rocketmq削峰
技术选型
考察点
可靠性
业内常用MQ
集群化支持
QPS-吞吐量
常用功能是否完备
横向对比
RabbitMQ
优点
集群化部署
高可用
消息不丢失
支持功能较完备
缺点
吞吐量较低
集群化加机器比较麻烦
erlang语言开发,不变阅读和修改源码做定制化
子主题
总结
适用于不需要太高吞吐量,不做定制化修改的中小型公司
kafka
优点
吞吐量极高
支持集群化部署
高可用
缺点
丢消息
功能单一
总结
适用于需要高吞吐量,允许适当的消息丢失,不需要太高级功能|日志采集|标杆
rocketMQ
优点
阿里出品
集群化部署
高可用性
消息不丢失
功能完备
高吞吐量
java语言开发
缺点
官方文档内容较为简单
总结
适合广泛使用
拓展
企业级权限控制
通过在broker端放一个acl权限文件,在里面规定好权限,哪个消费者对应哪个topic有什么操作权限等
生产环境消息轨迹追踪
broker的配置文件里开启traceTopicEnable=true
启动broker时,会自动创建一个名为rmq_sys_trace_topic的topic存放消息轨迹记录
通过rocket-console上面的消息轨迹,创建查询任务,也可以看到消息的轨迹
MQ百万消息积压处理
一般如果消息不重要的话就在consume上直接释放掉
如果topic的messageQueue设置的比较多,比如设置了20个,consume实例只有4个,那么每个consume实例对应5个messageQueue,这个时候可以申请临时加机器,增加consume实例为20个,达到快速消费的目的
如果messageQueue设置的比较少,比如只设置了4个,那么这个时候就不能通过加consume机器来解决了,这时候就需要修改消费者代码了,不再消费者消费,而是把要消费的消息放到mq的另一个topic中,这个topic设置20个messageQueue,对应20个consume实例,进行消费
金融级系统设计MQ崩溃高可用方案
try-catch机制,发送消息到MQ,失败的话捕获,重试3次,依旧失败则认为MQ集群崩溃
这个时候需要把消息有序的放到DB/NO SQL中,待MQ集群恢复正常后,再进行消费
注意:此方案保证消息写入有序
MQ限流方案
防止程序bug等导致疯狂写入MQ消息的情况
可采用令牌桶算法等来进行限制
避免因此种情况导致MQ集群挂掉
MQ迁移方案
双写+双读方案同步数据
延时消息
应用场景
电商场景下,下订单15min未付款则关闭订单
针对此种场景,不能频繁扫描db,数据量太大
可以通过MQ的消息延迟机制,现将订单消息发送到MQ,设定15min后消费者可见这条消息
订单服务消费者获取到MQ消息,此时查询订单状态,若仍旧为未付款,则关闭订单
message.setDelayTimeLevel(3)也就是延迟级别,跟消息重发机制那个一样
Tag实现消息过滤
设置tag,consume进行消费的时候可以根据tag进行过滤
consume.subscribe("TopicOrderDbData","Table A || Table B")
consume.subscribe("TopicOrderDbData",MessageSelector.bySql("a > 5 and b = 'abc'"))
RocketMQ还是支持比较丰富的数据过滤语法的
1.数值比较,比如:>,>=,<,<=, BETWEEN, =;
2.字符比较,比如:=,<>, IN;
3.IS NULL或IS NOT NULL;
4.逻辑符号AND, OR, NOT;
5.数值, 比如:123,3.1415;
6.字符, 比如:'abc',必须用单引号包裹起来;
7.NULL,特殊的常量
8.布尔值,TRUE或者FALSE
顺序消息
场景
多个consume消费消息,导致消息消费无序
解决方案
取模选择对应messageQueue+批量等待机制
首先要保证消息是有序进入MQ的
消息放入MQ之前,对id等关键字进行取模,放入指定messageQueue
consume消费消息失败时,不能返回reconsume——later,这样会导致乱序
应该返回suspend_current_queue_a_moment,意思是先等一会,一会儿再处理这批消息,而不是放到重试队列里
应该返回suspend_current_queue_a_moment,意思是先等一会,一会儿再处理这批消息,而不是放到重试队列里
消息幂等
消息重复的场景
系统间调用的重复机制
A->B,B系统处理慢导致超时,A系统重新发送请求导致重复
A->B,B系统处理慢导致超时,A系统重新发送请求导致重复
上述B系统后有个将消息放入MQ的操作
手动提交offset未完成
消费者成功消费完消息,未返回consume_commit时,系统重启|系统宕机,
MQ重新发送消息到同消息组其他消费者机器,导致消息重复
消费者成功消费完消息,未返回consume_commit时,系统重启|系统宕机,
MQ重新发送消息到同消息组其他消费者机器,导致消息重复
重试机制导致的问题
消息成功发送到MQ中,但MQ因网络原因未能成功返回,导致重试机制重试机制重复发送到MQ
消息成功发送到MQ中,但MQ因网络原因未能成功返回,导致重试机制重试机制重复发送到MQ
解决方案
produce
业务判断法
操作方法
向MQ发送消息时,先确认消息是否在MQ中
缺点
MQ虽然有这个查询功能,但是不建议使用,性能不好
msg.setKeys(orderId);
broker会基于这个key, hash创建一个索引,放到indexFile中
可以根据mq提供的命令,用这个id来查询消息是否存在
未来得及将消息成功写入redis,系统就宕机了,仍然会有重复消息的问题
基于redis的幂等机制
Redis记录是否已成功发送此条消息
consume
业务判断法
db中数据的状态翻转,查一下就知道是不是消费过了
消息重发以及死信队列
场景
消费者消费消息时,db宕机,怎么办?返回success,这条数据就丢了,不返回?那要等多久
解决方案
try-cache捕获异常,返回reconsume_later
即通知MQ将消息放到针对你这个消费组的consumeQueue创建的重试队列里,过一会再让消费者消费
消费者组内的重试队列
例如Test_group,重试队列%retry%test_group
默认最多16次,每次重试间隔如下
messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
死信队列
16次重试失败,则放入死信队列
%DLQ%Test_group
这个消息在rocket mq后台管理平台可以看见
后续可以开个后台线程,处理死信队列内的消息
消息零丢失
produce
事务消息机制->half-commit
流程
先发送一条half消息到MQ,看MQ是不是还活着
失败
一系列回滚操作,退款等
成功
说明MQ状态正常,继续做后续流程
执行本地逻辑,db插入等
本地方法失败
发送一个rollback消息到MQ,在内部op_topic中记录此条half消息为rollback状态
成功
继续走后续流程
提交commit到MQ
失败-订单系统感知
本地操作回滚,db数据状态回滚
MQ后台线程定时扫描half消息,长时间未commit/rollback则回调系统接口,若询问15次后仍未返回,则默认消息rollback了
查询db状态,已关闭则返回rollback给MQ,在内部op_topic中记录此条half消息为rollback状态
失败-订单系统认为已经成功
MQ后台线程定时扫描half消息,长时间未commit/rollback则回调订单接口,若询问15次后仍然未返回,则默认消息rollback了
查询db状态,已成功则返回commit给MQ,消息标记为commit并转入消费者topic中,此时消费者可见正常消费。
原理
initial
produce
发送消息->Topic(ConsumeTopic)
CommitLog
消息1
消息2
消息3
...
消息1
消息2
消息3
...
ConsumeQueue
(MessageQueue)
地址1
地址2
地址3
...
(MessageQueue)
地址1
地址2
地址3
...
consume
now
produce
发送消息->Topic(ConsumeTopic)
CommitLog
消息1
消息2
消息3
...
消息1
消息2
消息3
...
ConsumeQueue
(RMQ_SYS_TRANS_HALF_TOPIC对应的)
地址3
...
(RMQ_SYS_TRANS_HALF_TOPIC对应的)
地址3
...
ConsumeQueue(MessageQueue对应的)
地址1
地址2
地址3
...
地址1
地址2
地址3
...
consume
同步发送+不断重试机制
流程
先执行本地方法
本地方法执行完成后,同步调用MQ消息
MQ发送失败,重试三次
依旧失败的话回滚
kafka用的就是这种机制,但是这种机制仍然存在问题
MQ在本地事务之外
如果本地方法执行成功,但是发送MQ失败,在重试的时候,本地服务崩溃了就会导致本地方法已经执行完了,但是MQ没有收到消息,消息就丢了
MQ在本地事务之内
看似解决了问题,但重试会导致性能接口非常差,而且本地方法中可能还有写redis/es的操作,是无法通过事务回滚的。
broker
默认异步刷盘->同步刷盘
流程
initial
MQ接收到消息,写入CommitLog(OS cache)中,写入consumeQueue(OS cache)中
后台线程异步将os cache的数据刷入磁盘
消费者直接从os cache中拿到consumeQueue的偏移量offset对应的commitLog的消息,进行消费
如果此时broker宕机,消息就丢了,因为此时消息都在os cache中,未写入磁盘持久化
now
MQ接收到消息,写入commitLog(os cache)中,写入consumeQueue(os cache)中
强制要求同步刷入磁盘
flushDiskType由默认的ASYNC_FLUSH->SYNC_FLUSH
消费者直接从OS cache中拿到consumeQueue的偏移量offset对应的commitLog消息进行消费
数据落入磁盘中,宕机也不会丢失消息
master-slave
上述方案在磁盘坏了的时候,仍然会丢失消息,通过主从架构将消息备份到slave,多备份机制避免因磁盘损坏导致的消息丢失问题
consume
手动提交offset+自动故障转移
监听器监听消息
成功消费消息后,返回一个consume_commit标识
注意,消息消费不能异步
如果消息未消费完宕机,则本地事务回滚,未返回consume_commit标识给MQ
MQ感知到消费者机器宕机,且未完成消息消费,自动将消息发送给消费者组内的其他机器进行消费
消费者机器宕机时,MQ会执行rebalance, 重新分配messagequeue对应消费者机器
Netty 引入
Produce
TCP长连接
端口
Broker
Reactor主线程
Reactor主线程
SocketChannel
长连接发送消息
SocketChannel
Reactor线程池
线程1
线程2
线程3
线程4
...
线程1
线程2
线程3
线程4
...
Worker线程池
线程1
线程2
线程3
线程4
...
线程1
线程2
线程3
线程4
...
mmap内存映射技术+pageCache实现性能读取
正常IO读取数据会经过两次内核态切换,两次数据拷贝
用户私有进程->内核IO缓冲区->磁盘
mmap内存映射技术,映射用户私有进程地址和磁盘数据地址到pageCache不需要从OS cache拷贝至用户私有进程
不能太大,1.5G到2G之间
写入同理,直接进入pageCache,然后异步刷盘
除此之外,还有内存预映射机制,文件预热来优化性能
实时消息
Rocket MQ提供给我们push|pull 两种消息拉取机制
push
推送消息->即MQ主动将消息推送给消费者
Rocket MQ的push底层实现其实是一次次的pull request
这就产生了一个问题,如果消费者实时感知消息,就需要通过不断的轮询进行消息的拉取
consumer不断轮询,一次pull request不到,马上就进行下一次pull request,这无疑是非常消耗性能的。
pull
拉取消息->即消费者通过轮询机制从broker中拉取消息
可能有消息积压的问题
长轮询机制
由客户端发起pull请求,服务端接收到客户端请求后,如果发现队列中没有消息,并不立即返回
而是持有该请求一段时间,在此期间,服务端不断轮询队列中是否有新消息
如果有,则用现在连接将消息返回给客户端,如果一段时间内还是没有消息,则返回空
好处在于,其本质还是pull,所以消息处理的主动权还是在客户端手里,客户端可以根据自己的能力去做消息处理
而服务端持有请求一段时间的机制由很大程度避免了空拉取,减少资源浪费
但是这种机制也有一定的问题,当客户端数量过多时,服务端可能在某个时间段内需要持有过多的连接,服务端压力比较大
不过一般来说,消息队列的承压能力还是比较可靠的,再加上集群的保障,基本不用担心这个问题。
消息拉取
ConsumeGroup大部分在OS cache中,读取性能很高
CommitLog数据很多,不可能全放在OS cache中,先读取cache中的,再读磁盘
关于何时master让消费者去slave拉取消息
10w的消息,只消费了2w,还有8w待消费
master最多只能用10GB的OS cache,只能缓存5w条数据
还有3w就要去磁盘中拉取
此时他就会冉伟可能是master负载太高了,你去slave拉取吧
0 条评论
下一页