RocketMQ源码流程
2021-06-03 16:31:41 1 举报
RocketMQ源码流程
作者其他创作
大纲/内容
NettyRemotingServer是RocketMQ自己开发的网络服务器 底层还是Netty实现的ServerBootstrap
createBrokerController()中初始化BrokerController组件controller.initialize() 核心就是创建Netty服务器NettyRemotingServer
注册
客户端Producer
将NamesrvConfig和NettyServerConfig作为构造方法传递给NamesrvController
和nameservery一样,先脚本启动jvm类BrokerStartup类main,然后创建Controller核心组件createBrokerController(),然后start()启动
Producer.send() 会调用到DefaultMQProducerImpl类的sendDefaultImpl()中的tryToFindTopicPublishInfo 看看Topic路由信息是否在客户端本地缓存
创建NamesrvConfig(自身运行的配置参数)和NettyServerConfig(Netty服务器配置参数)配置类
MappedFile再写入CommitLog
netty网络通信模块
OffsetStore消费进度组件
IndexFile
请求处理线程池
如果namesrv启动带上-c配置文件地址 io解析并配置NettyServerConfig
ServerBootstrap(Netty服务器)
PullAPI消息拉取组件
BrokerController构造方法创建,参数就是4个配置
核心功能组件
Consumer
每隔30sBrokerLiveInfo覆盖之前的类 其中就有时间戳 如果距离上次心跳时间超时120s 删除这个Broker
注册Broker心跳刷新
handleDishFlush()中同步刷盘:底层还是NIO的MappedByteBuffer的force()方法异步刷盘:唤醒flushCommitLogService组件,每隔10s刷盘
ReputMessageService线程
NamesrvController请求处理组件
执行runserver.sh
调用createNamesrvController方法
NamesrvController启动start() 执行NamesrvController初始化initialize():构造Netty网络服务器NettyRemotingServer
向Broker注册,保持心跳,让Broker知道消费组内有哪些Consumer 重平衡组件随机选一个Broker获取有哪些consumer 然后最简单的就是平均分配了
RequestHead RequestBody
负载均衡取模选取一个MessageQueue:获取自增长index 取模 轮询的负载均衡
启动
创建DefaultMQPushConsumerImpl 调用start() 创建mQClientFactory 其封装了netty网络通信 pullMessage()拉取消息
每隔1ms,就将CommitLog通过doDispatch()消息转发到ConsumeQueue和IndexFile中
NameServerJVM进程
集群模式:只有一台机器获取广播模式:都会获取
写入内存 加锁写log
NettyServerConfig
选择一个MessageQueue:selectOneMessageQueue
BrokerConfig
使用
发送request 获取response Topic路由信息
NettyServer网络通信服务器
BrokerStartup启动组件
Broker
java命令+jvm参数NamesrvStartup类
执行Broker注册 doRegisterBrokerAll()
端口9876
RebalanceImpl重平衡组件 Consumer新加入或者宕机 重新分配MessageQueue
一个系统内部
获取到topic路由信息后
第一次进行注册 后续每隔30s发送 就是心跳
NettyServer的NettyRemotingServer网络服务器组件
启动start()
网络请求处理组件DefaultRequestProcessor
ConsumeQueueDispatcher和IndexDispatcher
ConsumeQueue
写入文件
本地没有缓存
创建BrokerConfig自己的配置、NettyServerConfig服务器配置、NettyClientConfig客户端配置、MessageStoreConfig消息存储配置
BrokerOuterAPI
拉取路由数据
distribution/bin中的mqnamesrv脚本
同样也是netty发送
初始化
如何创建?
netty网络通信
Channel网络连接
NettyClient
不同系统之间
如果命令行有-c/-p/-m的配置文件 解析并加载
创建NamesrvController接受网络请求
定时任务线程池
MessageStoreConfig
打包发送Request:给消息分配全局唯一ID,对超过4KB消息进行压缩,设置生产者组、Topic名称、MessageQueue数量、MessageQueue的ID、消息发送时间、消息的flag、消息扩展属性、消息重试次数、是否批量发送、事务消息带上prepared标记
不同系统应该设置不同消费组,不同消费组订阅一个Topic 其中一条消息 每个消费组都会获取;如果设置同一个消费组 一条消息只有一个系统能获得
删除文件:通过fileReservedTime来配置删除时间默认72小时 DefaultMessageStore类调用cleanFilesPeriodically() 1 如果磁盘没满 默认凌晨4点删除文件 2 磁盘超过85%使用率 立刻触发删除逻辑
处理请求
NettyServerConfig配置信息
本地有Topic路由缓存
NettyClientConfig
调用BrokerOuterAPI发送请求给NameServer brokerOuterAPI.registerBrokerAll() 里面真正走的是NettyClient
路由数据管理组件RouteInfoManager 用Map结构存放Broker路由信息
NamesrvConfig配置信息
启动jvm进程:NamesrvStartup main()
BrokerController管理控制组件
收藏
0 条评论
下一页