997_RocketMQ原理
2024-05-23 21:33:33 0 举报
RocketMQ原理剖析
作者其他创作
大纲/内容
topic_order
初始化nettyClient
brokername和地址映射表brokerAddrTable
committed
broker 01follower
CommitLog
3、4、5步
获取channel
心跳sendHeartbeatToAllBrokerWithLock()
启动写线程writeSocketService
slave返回ack offset
超过16次发送到死信
消息处理SendMessageProcessor
responseRemotingCommand
broker注册请求RequestCode.REGISTER_BROKER
IndexService
处理器注册绑定线程池registerProcessor()
nginx
-XX:InitiatingHeapOccupancyPercent=30,当堆内存的使用率达到30%之后就会自动启动G1的并发垃圾回收,开始尝试回收一些垃圾对象,提高了GC的频率,但是避免了垃圾对象过多,一次垃圾回收耗时过长的问题-XX:SoftRefLRUPolicyMSPerMB=0,不要设置为0,避免频繁回收一些软引用的Class对象,这里可以调整为比如1000
启动netty
注册
1、发送half消息
每日凌晨4点清理文件只保留3天的文件
定时任务
拉取消息DefaultMQPushConsumerImplpullMessage
订单服务
选择一个
1
topic
回调接口
consumer
nettyClient
故障broker摘除cleanOfflineBroker()
nameserver
MQClientInstance核心实例
# echo 'vm.max_map_count=655360' >> /etc/sysctl.conf。vm.max_map_count影响开始线程的数量
push
注册到所有broker上
每台机器的producer启动80个线程发数据
处理成功?
DLedger模式 commitLog处理DLedgerCommitLogputMessage
slave commitLog同步
messageQueue 01
brokerslave
DLedgerServer集群模式下,commitLog由DLedgerServer管理
consumerQueue
netty客户端NettyRemotingClientinvokeSync
requestheader:RequestCode.REGISTER_BROKER
RocketMQ 消息存储
3、写入内部topic的的offset
秒杀服务1、扣减库存2、发送秒杀成功消息
DLedger日志存储dLedgerServer.handleAppend()
LVS
socketChannel
8
超过90%停止写入,删除文件
# echo 'ulimit -n 1000000' >> /etc/profile。修改文件打开数,Linux中一切皆文件,所以磁盘io,网络io都和此参数有关系
拉取broker路由信息
页面内容本地缓存
BrokerController
保存channelchannelTables.put
Dledger部署自动主从切换实现高可用
异步唤醒FlushCommitLogService
5
topicPublishInfoTabletopic和topic信息缓存表messageQueue和选定下标
dispatch
心跳集合brokerLiveTable
消息存储器DefaultMessageStore
本地DB宕机
1 写入msg5到leader
7 更新
channel
3、没有消息挂起15秒有消息随时唤醒
broker
reactor线程N
messageQueue0
messageQueue01
order_topic 订单topic
msgN+1
Y
定时扫描half线程
启动请求接收线程acceptSocketService
2、写入logcommitLog
commitLog写入CommitLogasyncPutMessage
broker 0master
1、正常流程half-执行业务-commit2、异常流程,未发送commit/rollback3、异常流程,broker未返回half消息ack4、定时扫描机制5、消息零丢失(broker端)broker设置同步+broker主从同步,大部分follower写入成功后,消息才算存储成功 6、消息零丢失(consumer端)手动提交ack7、消息重复: 发送给MQ重试; consumer没提交offset就宕机; 支付调用订单系统重试导致订单系统发送MQ重复。
sendMessage线程池处理业务:commitLog,comsumerQueue,indexlog等自定义线程数量
consumerQueue 01_2
根据不同角色(master or slave)设置特定参数
处理本地业务
同步封装GroupCommitRequest请求发送到GroupCommitService请求队列
CDN缓存预热
10s一次,遍历brokerLiveTable超过120s没心跳就认为broker宕机了
reactor线程8
实战场景百万消息积压:1、丢弃;2、扩容;3、换topic(如果messageQueue或者partition限制,可以新建多messageQeue和partition的topic,然后通过consumer直接发送到新的topic中)金融系统MQ宕机:涉及降级方案,将消息严格保证顺序的一条条写入本地store、数据库,nosql中,然后MQ恢复后,后台线程发送过去MQ限流:修改源码,增加令牌桶算法MQ迁移:双写+双读。运行一周观察读写数据是否一致,consumer从新的MQ消费的数据存储在其他地方,然后对比consumer处理结果,如果全部都一模一样,就可以将旧MQ和对应代码下线
如果是DLedger集群模式,修改broker的配置
1、发送方,路由到相同messageQueue2、接收方,1、用相同线程处理同一个订单;2、失败暂停而非重试
事务消息原理
相同线程处理同一个order处理本地业务
consumerQueue和index文件写入
调优broker jvm在runbroker.sh脚本
JVM GC
poll
心跳记录BrokerLiveInfo保存最后一次心跳
Netty服务器ServerBootstrap
索引文件 indexLog方便通过消息key和时间范围查找消息
broker列表brokerAddrTable
处理响应processResponseCommand
msg1
重试队列和死信队列1、重试机制2、死信机制
初始化配置
内存使用率
6
启动BrokerStartup
4 follower写入成功后返回ackleader的DLedger
处理失败
SendMessageRequestHeader
消息延时
特殊处理
初始化网络组件NettyRemotingClient
启动controller.start();
重平衡处理RebalanceImplupdateProcessQueueTableInRebalance
Redis集群
启动 NamesrvStartup
本地缓存过期后通过lua从Redis拿到缓存
2 启动
创建
库存服务
producer
拉取topic路由信息updateTopicRouteInfoFromNameServer()
默认处理器
consumerQueue 4
msg1地址
msgN+1地址
channelRead0channel请求处理
找到topic中messageQueue对应的consumerQueue
磁盘
consumerQueue异步分发线程定时1毫秒写入各topic对应的消息物理地址offset写索引文件
缓存
消息发送DefaultMQProducerImplsendDefaultImpl()
创建网络服务器组件NettyRemotingServer
broker 02slave
4、消费完,提交ack成功的回调函数中返回consume_success
push重试16次
MQClientAPIImpl
worker线程池(默认8个线程)处理请求前的准备编解码,ssl等
清理logcleanFilesPeriodically()
pull同步数据
网络组件netty承接broker请求的web服务器NettyServerConfig
库存数量缓存
启动传输线程groupTransferService10毫秒
BrokerConfig
自己作为producer和consumer的server的配置 port:10911NettyServerConfig
拉取消息组件PullMessageService
死信consumer
是否建立建立好连接
commitLog最大1G
创建createNamesrvController
reactor线程3
RemotingCommand
DLedgerServercommitLog
reconsume_later
机器2 应用1
-verbose:gc -Xloggc:/dev/shm/mq_gc_%p.log -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCApplicationStoppedTime -XX:+PrintAdaptiveSizePolicy -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=5 -XX:GCLogFileSize=30m这一堆参数都是控制GC日志打印输出的,确定了gc日志文件的地址,要打印哪些详细信息,然后控制每个gc日志文件的大小是30m,最多保留5个gc日志文件。
consumer(group)消费
初始化线程池1、发送消息线程池2、拉取消息线程池3、回复消息线程池4、查询消息5、心跳6、事务
从broker配置文件解析nameserver地址namesrvAddr
是
操作组件MQAdminImpl
-XX:-OmitStackTraceInFastThrow,有时候JVM会抛弃一些异常堆栈信息,因此这个参数设置之后,就是禁用这个特性,要把完整的异常堆栈信息打印出来-XX:+AlwaysPreTouch,刚开始指定JVM用多少内存,不会真正分配给他,会在实际需要使用的时候再分配给他,所以使用这个参数之后,就是强制让JVM启动的时候直接分配我们指定的内存,不要等到使用内存的时候再分配-XX:MaxDirectMemorySize=15g,RocketMQ里大量用了NIO中的direct buffer,这里限定了direct buffer最多申请多少,如果你机器内存比较大,可以适当调大这个值,如果有朋友不了解direct buffer是什么,可以自己查阅一些资料。
TopicRouteData
网络
消息拉取请求组件PullAPIWrapper
对外接口brokerOuterAPI
get
读取最新消息
启动broker连接线程HAConnection
1、pull/pushpush底层也是pull
messageQueue03
停止消费一段时间,而不是重试suspend_current_queue_a_moment
2 oscache+顺序追加写,直接返回ack高性能
consumerGroup
order_topic
reactor线程1
发送rollback消息写一条rollback记录
TopicPublishInfo
nginx集群
监听
处理请求processRequestCommand
写入
对RocketMQ核心参数进行调整
netty网络组件
机器1 应用1
-server server模式启动-Xms8g -Xmx8g -Xmn4g,堆大小及新生代大小-XX:+UseG1GC -XX:G1HeapRegionSize=16m,把G1的region大小设置为了16m,这个因为机器内存比较多,所以region大小可以调大一些给到16m,不然用2m的region,会导致region数量过多的-XX:G1ReservePercent=25,在G1管理的老年代里预留25%的空闲内存,保证新生代对象晋升到老年代的时候有足够空间,避免老年代内存都满了,新生代有对象要进入老年代没有充足内存了
reactor线程池监听socket请求默认3个线程
6 缓存
7 pull到消息
index文件
存储地址$HOME/store/consumequeue/topic_order/messageQueue01/consumerQueue01
messageQueue 02
N
路由管理组件RouteInfoManager
2 初始化状态为uncommitted
网络server启动remotingServerfastRemotingServer
consumerQueue1
创建网络server组件remotingServer
是否异步
msg2地址
库存 lua
CommitLogDispatcherBuildConsumeQueue
4
uncommitted
加载详情页面
端口10911
选择一个messageQueueselectOneMessageQueue(取模+轮训)
响应完成
刷盘CommitLog.this.mappedFileQueue.flush(0)
3
消息封装AppendEntryRequest
nio类型确定select/pull、epoll
拉取nameserver地址fetchNameServerAddr()
开始拉取消息pullMessageService.start()
CommitLogDispatcherBuildIndex
2 同步写入返回ack低性能
mmap写入MappedByteBuffer内存映射MappedFileappendMessage()
发送消息到broker master上1、若发送失败就尝试发送到其他master上2、等待slave晋升为master再发送
raft协议+随机休眠投票竞争master
真正处理请求processMessageReceived
topic管理组件topicConfigManager
6、将事务消息offset写入业务topic
OP_topic half消息标记topic标记half消息是rollback还是commit
-XX:-UseLargePages -XX:-UseBiasedLocking:这两个参数的意思是禁用大内存页和偏向锁,这两个参数对应的概念每个要说清楚都得一篇文章,所以这里大家直接知道人家禁用了两个特性即可。
请求
获取nameserver地址
设置请求处理handlerNettyServerHandler
发送消息sendKernelImpl()
put
获取topic信息tryToFindTopicPublishInfo
reactor主线程建立连接创建socketChannel
msg5
1、nameserver集群2、broker dledger集群注册、心跳3、故障发现 心跳检测4、broker failover高可用5、producer发送消息6、consumer 拉取消息7、broker主从同步
consumerQueue文件
维护集群信息路由信息BrokerDataclusterAddrTable1、clustername2、brokerid3、brokerAddr4、brokerName
死信队列%DLQ%order_topic
创建channel
秒杀场景
定时任务第一次注册后续开始就是心跳registerBrokerAll()
3
请求处理器
初始化 start()
秒杀成功消息
订单数据库
1、同一个messageQueue只能被一个consumer消费,保证相同订单都让一个consumer消费2、用messageListenerOrderly单线程处理消息
1 start
5 leader收到超过半数follower的ack后就发送commit消息到follower
启动HAService管理broker集群高可用组件
IO线程
indexlog
consumerQueue 2
同步日志到slavetransferData()
定时调度任务,启动初始化好的组件1、consumerOffset定时持久化2、consumerFilter持久化3、打印水位4、dispatchBehindBytes落后commitLog分发
创建网络server组件fastRemotingServer
磁盘load1、consumer offset2、topic配置3、consumer filter4、consumer订阅
4.1 未返回响应业务方就放弃业务或者重试发送half消息
1、承载高并发和海量数据:messageQueue消息分片2、负载均衡:message写入策略:均匀写入3、producer遇到broker故障4、consumerQueue分发和存储机制5、高性能存储:commitLog6、高可用:基于raft消息同步机制7、打tag和增加property8、自动容错,broker故障后,producer自动回避一段时间,避免频繁发送消息到故障的broker上,参数:sendLatencyeFaultEnable9、延时发送,producer.setDelayTime设置延时级别,订单过期场景10、权限,broker开启acl并配置plain_acl.yml,构建producer和consumer的时候增加accountkey和secret
CPU负载
2、标记故障
trans_topic 事务消息内部topic
body
每个consumerQueue最多30w条共5.72mb,几乎全部换成在oscache中
nameserver配置NamesrvConfig
定时刷盘
创建netty服务Bootstrap
controller启动
否
默认PUSH消费者实现类 初始化DefaultMQPushConsumerImplstart
向所有nameserver注册registerBrokerAll
请求分发
保存集群broker列表
默认消息存储DefaultMessageStoreasyncPutMessage
定时线程ReputMessageService每毫秒读取commitLog,从上一次读取的位置reputFromOffset
分发到consumerQueue和index中DefaultMessageStoredoDispatch
监听9876
是否有库存?
MQ
如果slave offset > 请求的offset就说明传输完成
broker 02master
messageQueue 04
根据messageQueue的brokerName拉取 broker地址
# echo 'vm.overcommit_memory=1' >> /etc/sysctl.conf“vm.overcommit_memory = 1”这个参数有三个值可以选择,0、1、2。0是os会判断内存是否够,有可能会拒绝你的申请。1是会将所有可用内存分配给你。
构建请求命令RemotingCommand类型为RequestCode.REGISTER_BROKER
重平衡组件RebalanceService
2
订阅push拉取pull
5、发送commit消息
页面缓存
RocketMQ集群原理
自己作为 nameserver的client的配置NettyClientConfig
均匀写入
消息存储DefaultMessageStoreasyncPutMessage
5.1 未发送commit消息,就靠broker的定时扫描+回调机制解决
#sar -n DEV 1 2千兆网卡每秒传输100m数据,压测500字节的数据每秒7w多条就会把网卡打满,并且这个过程中还要有master同步slave
发送消息
broker 01leader
初始化事务TransactionalMessageCheckService
获取处理完的订单,完成付完款
发送请求channel.writeAndFlush
重平衡 20s循环rebalanceService.start()
绑定
生产者管理ProducerManager
msg2
30s心跳
consumerQueue E
RouteInfoManagerbroker健康检查心跳检测scanNotActiveBroker()
网络请求处理器netty请求和响应都走这里ClientRemotingProcessor
重平衡RebalanceImpl#rebalanceByTopic
初始化请求处理器 registerProcessor() DefaultRequestProcessor
topic_notify
offset持久化persistAllConsumerOffset()
执行同步submitReplicaRequest()
消费者管理器consumerManager
磁盘不足85%立刻触发清理逻辑
consumerQueue3
consumer消息拉取处理组件pullMessageProcessor
初始化消息存储配置MessageStoreConfig
压测基于合理的参数进行压测(每条数据500字节,关系网卡流量)
心跳检测组件10s超120s任务故障
获取缓存
剔除broker
回答问题过滤机器人
操作本地业务
2、返回消息
如果过了一段时间消息依然是half状态,就回调接口确定消息是rollback还是commit,最多回调15次
3 DLedger将消息发送给follower的DLedgerServer同同时标记状态为uncommitted
1秒消息发送时添加timestamp,消息消费时当前时间戳和发送时间戳之间是差就是消息延时,可以计算一个平均值。
地址引用
启动读线程readSocketService
broker 2master
初始化权限ACL
返回秒杀成功,等待下单环节
加载consumerQueueloadConsumeQueue()
msg3
msgN
1 均匀写入
# free -h正常要保留30%以上的free内存
slave的同步进度push2SlaveMaxOffset
添加定时清理log任务addScheduleTask()
GET_ROUTEINTO_BY_TOPIC
从master拉取数据并返回拉取建议,下一次从slave拉取从slave上拉取消息,如果slave落后master,
1、订单id 对 messageQueue数量取模,保证相同的订单分发到相同的messageQueue,重写messageQueueSelectororderId % messageQueueCount
创建接收broker请求的ControllerNamesrvController
OS内核参数调优
对外请求组件启动brokerOuterAPI
GroupCommitRequest
顺序性
商品详情页服务
1..n
jstat -gc pid 1000每1000毫秒打印一次GC情况
处理业务
broker dledger集群
producer消息生产者
找到处理这种请求的processorSendMessageProcessor
JVM参数调优RocketMQ默认的JVM参数是采用G1垃圾回收器,默认堆内存大小是8G
异步刷盘
# echo 'vm.swappiness=10' >> /etc/sysctl.conf控制进程的swap使用比例,将内存的一些不常用的放入磁盘中。这个比例要调到很小,避免大量数据放入磁盘,使用的时候还需要load到内存,影响性能
获取库存
如果是broadcast模式就load本地offsetoffsetStore.load()
库存缓存提前预热
2 先去缓存中找
consumerQueue F
建立连接bootstrap.connect
TPS
sendMessageThreadPoolNums=16,RocketMQ内部用来发送消息的线程池的线程数量,默认是16,根据CPU来提升数量,dledger的示例配置文件:rocketmq/distribution/target/apache-rocketmq/conf/dledger
consumer消息消费者
初始化 brokerController
持有连接mapchannelTables
初始化网络配置
1、失去心跳
启动DefaultMQPushConsumerstart
初始化controller.initialize()
重试队列%RETRY%order_topic(1s 5s 10s..1m...2h)
设置并启动ServerBootstrap
consumer消费进度管理器ConsumerOffsetManager
commitLog所有topic的消息最大1G
页面 lua
执行刷盘submitFlushRequest()
4 返回成功响应
加载commitlogcommitLog.load()
NameServer
扣减库存
nameserver集群
消息加载messageStore.load()
模式:异步同步单向
consumerQueue01_1 记录commitLog消息offset+消息长度
消息持久化组件启动messageStore
更新抢购按钮标识路由到售罄页面
commitLog
topic和messageQueue和broker的路由表topicRouteTable
0 条评论
回复 删除
下一页