2、RocketMQ运行的核心原理
2020-11-26 15:12:02 3 举报
RocketMQ
作者其他创作
大纲/内容
代码逻辑
MessageQueue0
NettyServer网络组件
写入
channel网络连接
15、Consumer是如何均匀分配消息队列的1、Consumer启动的时候,有Rebalancer重平衡组件,专门负责Consumer的负载均衡。 2、Consumer在启动之后,都会向Broker进行注册,并且保持自己的心跳,让每个Broker都能感知到一个消费组内有哪些Consumer。 3、Consumer在启动之后,重平衡组件会随机挑选一个Broker,从里面获取到这个消费组里有哪些Consumer存在 4、重平衡组件一旦知道了消费组内有哪些Consumer之后,就好办了,就会把Topic下的MessageQueue均匀的分配给这些Consumer。一切都是平均分配的5、一旦MessageQueue负载确定了之后,下一步Consumer就知道自己要消费哪几个MessageQueue的消息。就可以连到那个Broker上去,从里面不停的拉取消息进行消费
核心配置组件
Consumer
4.NettyRemotingServer构造网络服务组件(核心)
13:均匀的把消息写入各个MessageQueue对应的ConsumeQueue
MessageQueue1
获取转发的消息
7、Producer启动准备相关资源1:构建好Producer后,在第一次发送消息的时候才会去拉取一个Topic的路由数据,包括这个Topic有几个MessageQueue,每个MessageQueue在那个Broker上,然后从中选择一个MessageQueue,跟那台Broker建立连接发送消息过去
MessageQueue
IndexFile
NameServer(独立运行)每10s检查心跳
OffsetStore
NettyServerConfig
2、BrokerController初始化1:Broker启动的时候加载了几个核心配置组件2:Broker自己的配置、Broker作为Netty服务器的配置(发送、接收消息)、Broker作为客户端的配置(和NameServer通讯)、Broker消息存储的配置3:加载Topic的配置、Consumer的消费Offset、Consumer订阅组、过滤器4:创建消息存储管理组件5:初始化线程池(发送消息过来、consumer拉去消息、回复消息、查询消息、管理broker执行命令、管理客户端、负责给NameServer发送心跳、事务消息、管理consumer),初始化后台线程(定时进行broker统计、定时进行consumer消费、offset持久化到磁盘、定时进行broker任务的保护、定时进行commitlog任务分发、
RouteInfoManager路由数据管理组件(加锁更新)
DefaultRequestProcessor网络处理组件(核心)
10、Producer如何与Broker进行通信1:从本地缓存或者NameServer获取到Broker地址后,封装一个Request请求。2:封装好Request请求里的信息(给消息分配全局唯一ID、对超过4kB的消息体进行压缩、生产者组、Topic名称、Topic的MessageQueue数量、MessageQueue的ID、消息发送时间、消息的flag、消息扩展属性、消息重试次数、是否批量发送消息、如果是事物消息则带上prepared标记等),通过Netty请求发送出去
系统A(生产者)
定时任务10s(清理线程)
BrokerConfig
注册broker刷新心跳
Topic
14、Consumer是如何被创建出来的1:Consumer启动,要和Broker建立长连接,指定自己要消费哪个Topic的数据2:Consumer启动创建Rebalancer重平衡组件和设置Consumer分组,假设ConsumerGroup里计入了一个新Consumer,那么就会重新分配每个Consumer消费的MessageQueue,如果ConsumerGroup里的某个Consumer宕机了,也会重新分配MessageQueue。3:Consumer启动创建拉取消息的API组件PullAPI,去拉取消息4:Consumer启动创建存储和管理Consumer消费进度Offset的组件OffsetStore,在拉取的过程中,要维护Offset消费进度
Netty网络连接
清理
5、NameServer如何处理注册请求1:NameServer基于Netty来接收Broker注册强求,交给DefaultRequestProcessor这个请求处理组件,注册Broker。2:真正的Broker注册逻辑在RouteInfoManager这个数据路由组件实现,最终Broker路由数据都会存放在RouteInfoManager内部的一些Map数据结构中
否
DLedgerCommitLog
ReputMessageService
拉取消费消息
15、拉取消费消息
6、Broker如何定时发送心跳以及故障感知1:BrokerController启动的时候,并不是仅仅发送一次注册请求,而是启动了一个定时任务,每隔30s就会发送一次注册请求2:发送到NameServer中的一个set集合中,每隔30s会封装一个新的BrokerLiveInfo放入map,覆盖之前的BrokerLiveInfo(里面有一个当前的时间戳,记录最近一次心跳时间)3:NameServer的RouteInfoManager启动了一个定时扫描不活跃Broker的线程(每10s扫描一次),如果一个Broker两分钟没有发送心跳,就会把这个Broker从路由数据表里剔除。
是
8、BrokerOuterAPI对外通信组件
2.初始化、解析
Salve Broker
NettyClent
Master Broker
使用
PullAPI
ServerBootstrap(netty网络服务器)
8、Producer是如何从NameServer拉取Topic数据的1:每次发消息的时候,先会去检查一下,要发送的这个Topic路由数据是否在客户端本地,如果不在,会发请求到NameServer拉取,然后缓存在客户端本地
ReBalancerImpl
3.NameServerController请求处理组件组装完成
Broker
11、Broker收到消息后是如何存储消息的1:Broker收到一条消息后把消息写入到MappedFile映射文件里去(内存),什么时候把内存刷入到磁盘里,根据配置的刷盘策略决定。然后刷入到CommitLog里(默认1GB大小,一个写满了写另一个,文件名是文件中第一个偏移量),一个Broker机器只有一个CommitLog文件,写入的时候加锁,串行,不会并发写入,并发写入会导致数据错乱2、Broker启动的时候开启一个线程ReputMessageService,每隔1毫秒,他会把CommitLog更新事件转发出去,让任务处理器去更新ConsumerQueue和IndexFile3、异步写入ConsumerQueue。因为一个Topic有多个MessageQueue,任何一条吸入消息都是写入一个MessageQueue的,MessageQueue其实就是对应了一个ConsumerQueue文件。4、同时异步吸入一个IndexFile文件。(把消息的key和消息在CommitLog中的Offset偏移量做一个索引,这样后续如果要根据消息key从CommitLog温经理查消息,直接根据IndexFile的索引来查
NettyServerConfig网络配置
1.NameServer启动
1、NameServerController初始化及启动1:加载一些配置2:初始化Netty服务器,创建Netty服务器工作的线程池3:创建定时后台线程(定时扫描哪些Broker没发送心跳)4:启动Netty服务器,监听9876端口,接收Broker和客户端网络请求
11、检查是否有缓存
3、BrokerController启动1:启动Netty服务器,用于接收网络请求2:启动BrokerOuterAPI组件,用于Nameserver去注册以及发送心跳3:去Nameserver注册
MessageStoreConfig
13、Broker的数据存储超过一定时间后,磁盘数据如何清理1:Broker启动的时候会启动一个后台线程,每隔10s,这个后台线程会去自动检查CommitLog,ConsumerQueue文件,如果磁盘没满,默认情况下,比较旧的超过72小时的未修改过的文件,默认情况下凌晨4点,就会出发删除文件逻辑2:如果磁盘空间不足了(超过85%使用率),立马执行删除文件逻辑,此时磁盘允许继续写入数据,3:如果磁盘使用率超过90%了,此时不允许在磁盘里写入数据,立马删除文件
Broker Group
5、Master Broker启动(一个JVM进程)
NettyClientConfig
17、Consumer是如何均匀分配消息队列的1. 对于一个Broker机器而言,存在他上面所有Topic以及MessageQueue的消息数据都是写入一个统一的CommitLog。2. 对于一个Topic的各个MessageQueue而言,就是通过各个ConsumerQueue文件来存储数据MessageQueue的消息在CommitLog文件中的物理地址,就是一个Offset偏移量。3. 对于一个Topic上的多个MessageQueue,他会均匀的将MessageQueue分配给消费组的多台机器来消费。 a. 例如“TopicOrderPaySuccess”这个Topic上有4个MessageQueue,这4个MessageQueue分布在两个Master Broker上,每个Master Broker上有两个MessageQueue b. 消费者组里有两台机器,Topic的多个MessageQueue会均匀的分摊给消费组内多个机器去消费。 c. 一个MessageQueue只能被一个消费机器去处理,但是一台消费者机器可以负责多个MessageQueue的消息处理。4. Push模式和Pull模式都是消费者主动发送请求到Broker机器去拉取一批消息下来 a. Push的时效性更好,一般使用的都是push模式来做的 ⅰ. 当消费者发送请求到Broker拉取消息的时候,如果有新的消息可以消费那么就会立马返回一批消息到消费机器去处理,处理完之后立刻发送请求到Broker拉取下一批消息。 ⅱ. push模式下有一个请求挂起和长轮询机制, ⅲ. 当你的请求发送到Broker,结果他发现没有新的消息给你处理的时候。就会让请求线程挂起,默认是挂起15s,然后这个期间他会有后台线程每个一会就去检查一下是否有新的消息过来,如果在这个挂起过程中,如果有新的消息到达了就会主动唤醒过期的线程,然后把消息返回给你。6. Broker收到消费机器的拉取请求后,是如何将消息读取出来返回给消费机器的 a. 一个消费机发送了拉取请求到Broker,消费机声明这次要拉取MessageQueue0中的消息,然后之前我都没拉去过消息,所以这个就从MessageQueue0中的第一条消息开始拉取。 b. Broker就会找到MessageQueue0中对应的Consumer0,从里面找到第一条消息的Offset, c. 接着Broker就需要根据ConsumerQueue0中找到的第一条消息的地址,去CommitLog中根据这个Offset地址去读取出来这条消息,返回给消费机。7. 消费机如何处理消息,进行ACK以及提交消费进度 a. 消费者机拉取到一批消息之后,就会将这批消息回调我们注册的一个函数。 b. 当我们处理完这批消息后,消费者机器就会提交我们目前的一个消费进度到Broker上去, c. Broker会存储我们的消费进度,比如我们现在对ConsumerQueue0的消费进度假设在Offset=1的位置,那么它就会记录下来一个ConsumerOffset的东西去标记我们的消费进度。 d. 下次消费组只要再次拉取这个ConsumerQueue的消息,就可以从Broker记录的消费位置开始继续拉取。不用在重头开始拉取了。8. 如果消费组中出现机器宕机或者扩容加机器,会怎么处理 a. 消费组中出现机器宕机或者扩容加机器,就会进入一个rabalance的环节,也就是重新给各个消费机器分配他们要处理的MessageQueue b. 加减机器都会有一个负载重平衡的机制。
ConsumeQueue0
9、Producer是如何选择MessageQueue发送出去的1:一个Topic的数据往往是分布式存储在多态Broker机器上的,Topic本质是由多个MessageQueue组成.2:有可能一个Topic的多个MessageQueue在一个Broker机器上3:在DefaultMQProducerImpl.sendDefaultImpl()方法中,选择出要发送的MessageQueue。(设置自增长的index,对MessageQueue数量取模,获取到一个MessageQueue的位置,保证了有序性)
14、注册及发送心跳
异步写入
缓存
MappedFile
NameSrvConfig配置信息
处理请求
7、初始化BrokerController管控组件1、核心功能组件(管理Broker的请求处理,Consumer消费的Offset、Topic配置、Consumer发送过来的请求等2、管理后台线程池及磁盘数据3、NettyServer网络通信服务器4、后台定时任务线程池
4、Broker注册到NameServer1:调用BrokerOuterAPI发送请求给每个NameServer去注册。获取到NameServer的地址列表(请求头里包含了Broker的id和名称,请求体里包含了一些配置),全部注册完了才能接着往下走
6、BrokerStartUp启动组件
初始化
12、RocketMQ如何实现同步刷盘和异步刷盘1:同步刷盘,就是调用NIO下的API,force()方法强迫把写入内存的数据刷入到磁盘文件里去。2:异步刷盘,有个定时刷新的逻辑,最大间隔10s,就会去执行一次刷盘
本地Topic路由数据缓存
NameServer集群
12:拉取Topic元数据(一个Topic有那几个MessageQueue,那些MessageQueue在哪台Broker上)
16、Consumer是如何均匀分配消息队列的1、Consumer启动的时候,有Rebalancer重平衡组件,专门负责Consumer的负载均衡。 2、Consumer在启动之后,都会向Broker进行注册,并且保持自己的心跳,让每个Broker都能感知到一个消费组内有哪些Consumer。 3、Consumer在启动之后,重平衡组件会随机挑选一个Broker,从里面获取到这个消费组里有哪些Consumer存在 4、重平衡组件一旦知道了消费组内有哪些Consumer之后,就好办了,就会把Topic下的MessageQueue均匀的分配给这些Consumer。一切都是平均分配的5、一旦MessageQueue负载确定了之后,下一步Consumer就知道自己要消费哪几个MessageQueue的消息。就可以连到那个Broker上去,从里面不停的拉取消息进行消费
9、执行Broker注册10、后续每隔30s发送一次心跳
启动
收藏
收藏
0 条评论
下一页