RocketMQ知识体系
2024-06-07 20:07:15 0 举报
AI智能生成
RocketMQ知识体系
作者其他创作
大纲/内容
分支主题
分支主题
高性能高可用的原因
发送消息负载均衡,且发送消息线程安全(可满足多个实例死循环发消息),集群消费模式下消费者端负载均衡,这些特性加上上述的高性能读写, 共同造就了RocketMQ的高并发读写能力。
刷盘和主从同步均为异步(默认)时,broker进程挂掉(例如重启),消息依然不会丢失,因为broker shutdown时会执行persist。 当物理机器宕机时,才有消息丢失的风险。另外,master挂掉后,消费者从slave消费消息,但slave不能写消息。
事务消息
消息发送及提交
1. 生产者发送half消息给broker,broker写入成功后返回ack。此时该消息对consumer不可见
2. producer接收到响应后开始执行本地事务,若事务执行成功,则producer向broker发送commit消息。只有producer顺利执行commit操作后consumer才可以消息该消息
3. 若事务执行失败则producer向broker发送rollback消息
事务补偿
若broker写入half消息后一直未收到producer的commit或rollback消息,broker会发起回查操作
producer收到回查消息后,检查消息对应的事务状态,根据事务状态重新commit或rollback
全链路消息零丢失方案
1. 发送消息到MQ零丢失
1. 同步发送消息 + 反复重试
2. 事务消息机制
2. MQ收到消息后零丢失
开启同步刷盘加主从架构同步机制
3. 消息消费零丢失
不使用多线程消费消息,处理业务逻辑完成再提交状态及offset
延迟队列
默认支持18个级别的延迟消息,messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
Broker在启动时,内部会创建一个内部主题:SCHEDULE_TOPIC_XXXX,根据延迟level的个数,创建对应数量的队列,也就是说18个level对应了18个队列。注意,这并不是说这个内部主题只会有18个队列,因为Broker通常是集群模式部署的,因此每个节点都有18个队列
消息的16次重试就是基于该队列实现,仅仅将messageDelayLevel的前两个级别去除而已
使用场景
对30分钟后未支付的订单进行取消
实现原理
1. producer端设置消息的延迟级别,消息属性DELAY中存储对应延时级别
2. broker收到消息后判断消息延迟级别,若大于0则备份该消息的原始topic及queueid,并将topic改为内存特定topic - SCHEDULE_TOPIC,queueid改为(延时级别-1)
3. ScheduleMessageService为每一个延迟级别单独设置一个定时器,定时拉取对应延迟级别的消息队列
4. 对队列的消息开始遍历,根据offset从commitlog中解析出对应消息,并从消息tagCode中解析出消息应当被投递的时间,用此时间与当前时间做比较
5. 若达到了投递时间则构建一个新的消息,并从根据消息属性恢复备份的topic及queueid,并清除消息延迟属性
保证消费顺序
producer端
让同一订单的消息进入同一个MessageQueue,在发送消息时通过重写MessageQueueSeletor的select方法实现
consumer端
同一订单消息由同一个consumer来消费,消费失败则返回SUSPEND_CURRENT_QUEUE_A_MOMENT而不是返回RECONSUME_LATER。可通过MessageListenerOrderly实现,consumer会对每一个ConsumeQueue使用单个线程来顺序消费
Consumer
疑问
1. 若消息消费出现异常如何处理
返回RECONSUME_LATER状态,broker会将该批次消息放入 %RETRY%<group_name>的重试队列中稍后进行重试,每次重试的时间间隔会不断增加,最多重试16次。若16次后仍然失败,则将消息放入死信队列%DLQ%<group_name>中,管理后台可查看。后续可通过定义该死信队列做降级处理
数据过滤
发送消息时可为数据设置tag属性,订阅时可以通过MessageSeletor.bySql("a>5 AND b= '123' ")进行过滤
过滤语法
数值比较
>,<,=,between, <=,>=
字符比较
=,<>, in
IS NULL IS 或 NOT NULL
逻辑符号 AND ,OR, NOT
布尔值 TRUE, FALSE
消费模式
pull
push(基于pull实现,采用的是长轮询方式)
Producer
发送消息流程
1. 从ns获取到的信息中找到topic对应的master broker
2. 通过负载均衡算法找到待发送的master broker,
并开始建立长连接,将消息发送到master broker
存储
采用Topic混合追加方式,即一个 CommitLog 文件中会包含分给此 Broker 的所有消息,不论消息属于哪个 Topic 的哪个 Queue
RocketMQ 用了顺序写盘、mmap。并没有用到 sendfile ,还有一步页缓存到 SocketBuffer 的拷贝。
kafka使用 sendfile
缺点
页缓存会定时刷盘,这刷盘不可控,并且内存是有限的,会有 swap 等情况,而且 mmap 其实只是做了映射,当真正读取页面的时候产生缺页中断,才会将数据真正加载到内存中,
优化
文件预分配
CommitLog 的大小默认是 1G,当超过大小限制的时候需要准备新的文件,而 RocketMQ 就起了一个后台线程 AllocateMappedFileService,不断的处理 AllocateRequest,AllocateRequest 其实就是预分配的请求,会提前准备好下一个文件的分配,防止在消息写入的过程中分配文件,产生抖动。
文件预热
MappedFile为防止缺页中断通过对该mappedFile的每个
Page Cache写入一个字节的数据
每写够1000字节就执行一次 Thread.sleep(0);
防止因线程长时间执行而引起GC
调用mlock锁定指定内存区域,防止被操作系统交换到swap空间
调用madvise建议内核尽可能多的将文件内容预读到内存
kafka与rocketmq在使用mmap零拷贝时的异同
rocketmq
作用在生产者发送消息过程
使用warmMappedFile函数实现预热,即通过mlock函数锁住1G的pageCache内存空间,然后使用madvise函数通知操作系统这部分空间即将被使用,请不要分配给其他应用程序使用。而锁住的这部分空间即可以与磁盘空间建立一一对应关系,当刷盘时间间隔 flushIntervalCommitLog=500ms到达时即可以将消息顺序的写入到硬盘中
kafka
作用在消息消费过程
临时存储池 TransientStorePool
transientStorePoolEnable默认为 false
内存级别读写分离
TransientStorePool 默认会初始化 5 个 DirectByteBuffer (对外内存),并提供内存锁定功能,即这部分内存不会被置换,可通过 transientStorePoolSize 参数控制
默认读写消息都是走的 pagecache, 为了缓解 pagecache 在高并发情况下的压力. 基于 DirectByteBuffer 和 MappedByteBuffer 的读写分离,消息先写入 DirectByteBuffer(堆外内存),随后从 MappedByteBuffer(pageCache)读取。
仅当 transientStorePoolEnable 为 true(默认 false)且 FlushDiskType 为 ASYNC_FLUSH 且当前 broker 不是 SLAVE 角色时,才启用 commitLog 临时存储池
刷盘方式(flushDiskType)
异步刷盘
flushlntervalCommitLog: FlushRealTimeService 线程任务运行间隔 默认 刷盘频率 500ms
flushPhysicQueueLeastPages : 一次刷写任务至少包含页数, 如果待刷 写数据不足, 小于该参数配置的值,将忽略本次刷写任务,默认 4 页 。
flushPhysicQueueThoroughlnterval :两次真实刷写任务最大间隔, 默认 10s
同步刷盘
GroupCommitService 每隔 10ms 刷盘一次
使用 requestRead 及 requestWrite 两个队列来处理实时刷盘与消息写入的同步问题, 待写入完成后再将requestWrite队列的数据 放到 requestRead 中, 而 requestWrite 被清空
ReBalance
每隔 20s 执行一次rebalance
获取所有 topic 信息与其消费组当前所有消费者信息, 计算当前消费者分配到的队列集合. 对比原先的分配队列与原分配队列. 若先新队列不包含原先队列, 则需将原队列停止并移除. 若原队列不包含新队列则需创建新队列
平衡算法
(AllocateMessageQueueAveragely)平均分配算法(默认)
(AllocateMessageQueueAveragelyByCircle)环状分配消息队列
(AllocateMessageQueueByConfig)按照配置来分配队列: 根据用户指定的配置来进行负载
(AllocateMessageQueueByMachineRoom)按照指定机房来配置队列
(AllocateMachineRoomNearby)按照就近机房来配置队列
(AllocateMessageQueueConsistentHash)一致性 hash,根据消费者的 cid 进行
角色
NameServer
负责管理集群中所有broker的信息,使得producer及consumer可用通过它来感知集群有哪些broker。NameServer可集群化部署。
Broker
代理服务器,主从架构实现多副本存储。分master及slave,slave会主动向master拉取消息进行同步
单个Broker跟所有Namesrv保持心跳请求,心跳间隔为30秒,心跳请求中包括当前Broker所有的Topic信息。Namesrv会反查Broer的心跳信息, 如果某个Broker在2分钟之内都没有心跳,则认为该Broker下线,调整Topic跟Broker的对应关系。但此时Namesrv不会主动通知Producer、Consumer有Broker宕机。
Producer
Consumer
Consumer跟Broker是长连接,会每隔30秒发心跳信息到Broker。Broker端每10秒检查一次当前存活的Consumer,若发现某个Consumer 2分钟内没有心跳, 就断开与该Consumer的连接,并且向该消费组的其他实例发送通知,触发该消费者集群的负载均衡(rebalance)。
分布式系统设计需要考虑的问题
1. 如何保证高可用
多副本
容灾-故障转移
2. 主从数据不一致如何处理
Broker
数据同步
由slave主动拉取
消费数据
消费者消费消息时先请求到master获取一批数据,master将数据返回的同时会根据当前负载及同步情况建议消费者下次开始应该从哪个broker拉取消息
master挂之后如何处理
1. 4.5版本以前需要手动运维,将slave设置为master,导致中间一段时间不可用
2. 4.5版本后引入Deldger机制(基于raft协议实现),使得一个master可对应多个slave
发送方式
异步
同步
单向消息
运维
查询消息积压数
1. 通过在控制台的 Topic 页面中找到指定Topic对应的 Consumer Manager,点击即可查看。
其中的 diffTotal 即消息的积压数量
2. 通过命令查询, 同样通过diffTotal查看积压数量
mqadmin consumerProgress -n 192.168.31.250:9876
mqadmin consumerProgress -n 192.168.31.250:9876 -g device-log-group
NameServer
独立部署运行,ns节点间无数据通讯。任何一个
节点都保存了所有的broker信息
数据模型
Topic
OS内核参数调整
vm.overcommit_memory
0
应用在申请内存时,OS会检查内存是否充足,
充足则分配,不充足则拒绝申请,进而导致系统出错
1
OS将所有可用的内存尽量分配给应用
2
表示内核允许分配超过所有物理内存和交换空间总和的内存。
vm.max_map_count
此参数会影响中间件系统可以开启的线程的数量,同样也是非常重要的
vm.swappiness
此参数用来控制进程的swap行为的,这个简单来说就是os会把一部分磁盘空间作为swap区域,然后如果有的进程现在可能不是太活跃,就会被操作系统把进程调整为睡眠状态,把进程中的数据放入磁盘上的swap区域,然后让这个进程把原来占用的内存空间腾出来,交给其他活跃运行的进程来使用。如果这个参数的值设置为0,意思就是尽量别把任何一个进程放到磁盘swap区域去,尽量使用物理内存。默认为60
ulimit
用于控制Linux上最大文件链接数, 默认1024
压测指标
1. TPS及延时
2. CPU负载
3. 内存使用率
4. JVM GC频率
5. 磁盘IO负载
6. 网卡流量
0 条评论
下一页