rocketmq源码(二)——broker启动流程
2024-09-10 11:23:17 4 举报
Broker启动流程主要包括以下步骤:首先,初始化配置,加载配置文件中的参数,如端口号、存储路径等。然后,创建消息服务器,包括RemotingServer和NettyServer,用于处理网络通信。接下来,注册处理器,如AdminBrokerProcessor、ClientManageProcessor等,以处理不同类型的请求。此外,还需要初始化存储层,如commitLog、consumeQueue等,用于存储和读取消息。最后,启动定时任务,如持久化刷盘、消息存储管理等,以确保消息的持久性和一致性。此外,broker还提供消息查询、批量发送、消费进度管理等功能。
作者其他创作
大纲/内容
管理过滤规则,过滤消息consumerFilterManager
用于管理消费者(consumer)相关任务的线程池及其队列consumerManagerThreadPoolQueue
处理事务结束相关操作的线程池endTransactionExecutor
this.fileWatchService.start();
处理消费者拉请求,并且保持这些请求,直到有新消息可供消费或超时pullRequestHoldService
监听消费者组中消费者数量变化consumerIdsChangeListener
设置netty服务监听端口,10911 端口是用于 Broker 与客户端(如 Producer 和 Consumer)之间进行通信的默认端口
BrokerStartup.main(args)
监听消息到达事件messageArrivingListener
AUTO_CREATE_TOPIC_KEY_TOPIC= \"TBW102\"自动创建topic
this.brokerOuterAPI.start();
向各个服务组件注册各种处理请求的处理器this.registerProcessor();
处理客户端心跳检测相关操作的线程池heartbeatExecutor
this.pullRequestHoldService.start();
this.fastRemotingServer.start();
读取文件:/工作路径/store/config/subscriptionGroup.json配置subscriptionGroupManager.load()
管理 Topic 的配置信息,包括 Topic 的名称、队列数量、权限等topicConfigManager
nettyServerConfig
处理心跳检测任务heartbeatThreadPoolQueue
用于处理消息放入操作的线程池putMessageFutureExecutor
2
在消息队列系统中通常用于处理响应或回复类型的任务replyThreadPoolQueue
创建BrokerConfig、NettyServerConfig、NettyClientConfig、messageStoreConfig配置类对象
初始化BrokerController
用于处理事务结束的请求endTransactionThreadPoolQueue
用于管理和处理与客户端管理相关的任务clientManagerThreadPoolQueue
管理和维护 Broker 的配置信息configuration
enableDLegerCommitLog为true,表示Broker使用DLedger模式,brokerId设置为-1
快速处理或跳过无法及时完成的请求,以避免对系统其他部分造成阻塞和性能影响brokerFastFailure
检查 NameServer 地址是否合法
this.brokerFastFailure.start();
处理消费者从 Broker 拉取消息的请求pullMessageProcessor
创建broker控制器createBrokerController
nettyClientConfig
处理客户端的所有请求,包括生产者发送消息的请求和消费者拉取消息的请求。创建remotingServer和fastRemotingServer
. . .
创建Broker核心服务组件
10912端口在RocketMQ中用于Broker的HA服务,主要用于主从Broker之间的数据同步和故障转移messageStoreConfig.setHaListenPort(nettyServerConfig.getListenPort() + 1);
处理消费者拉取消息请求的线程池pullMessageExecutor
定时记录Broker 统计信息BrokerController.this.getBrokerStats().record();
用于处理消息拉取(Pull)操作的线程池及其工作队列pullThreadPoolQueue
this.messageStore.start();
创建各种异步任务线程池,处理异步任务
处理消息查询操作的线程池queryMessageExecutor
读取文件:/工作路径/store/config/consumerFilter.json配置consumerFilterManager.load()
开启定时任务
brokerConfig
new BrokerController创建BrokerController对象
读取文件:/工作路径/store/config/topic.json配置this.topicConfigManager.load();
启动定时线程任务,向namesrv注册broker信息
用于处理 Broker 与外部系统(例如 NameServer 或其他 Brokers)之间的通信brokerOuterAPI
处理消费者管理相关操作的线程池consumerManageExecutor
用于处理消息放入操作的线程池及其工作队列putThreadPoolQueue
读取文件:/工作路径/store/config/consumerOffset.json配置consumerOffsetManager.load();
处理消息发送操作的线程池sendMessageExecutor
将准备好的properties配置设置到brokerConfig、nettyServerConfig、nettyClientConfig、messageStoreConfig配置对象中
用于管理和维护订阅组(Subscription Group)的信息和状态subscriptionGroupManager
Broker配置解析
设置brokerId,如果是SLAVE,则brokerId >=1,如果是MASTER,则brokerId=0且只能是0
启动BrokerControllercontroller.start();
用于消息存储管理创建messageStore
处理 Broker 与客户端(如生产者和消费者)之间的通信broker2Client
管理和维护 Broker 统计信息的关键组件创建brokerStats
controller.initialize();
1
初始化broker组件
构建broker命令行命令
处理客户端管理相关操作的线程池clientManageExecutor
this.remotingServer.start();
管理和维护消费者组消费偏移量的关键组件consumerOffsetManager
负责收集和管理 Broker 统计信息的组件brokerStatsManager
消息发送任务队列sendThreadPoolQueue
处理消息回复操作的线程池replyMessageExecutor
启动BrokerController中的各个组件
在消息队列系统中通常用于处理查询类型的任务queryThreadPoolQueue
用于管理主从同步过程,即在主(Master)Broker 和从(Slave)Broker 之间进行数据同步的机制slaveSynchronize
this.brokerStatsManager.start();
读取broker启动时-c指定的配置文件,生成properties结构配置对象
管理 Broker 相关操作的线程池的关键组件adminBrokerExecutor
管理客户端连接和清理资源的一个服务。其主要职责在于维护客户端连接的健康状态,并在发现连接不再有效或超时时进行清理和回收资源clientHousekeepingService
读取存储的消息数据(commit log)this.messageStore.load()
messageStoreConfig
this.clientHousekeepingService.start();
设置 Broker 配置文件路径,BrokerPathConfigHelper辅助配置文件解析类BrokerPathConfigHelper.setBrokerConfigPath(file);
0 条评论
下一页