rocketmq源码(三)——broker向namesrv注册流程
2024-09-10 11:24:48 0 举报
登录查看完整内容
在RocketMQ中,broker向namesrv注册是一个核心流程,用于建立broker与namesrv之间的通信关系,以便于实现消息的分发和消费。这个过程主要包括以下几个步骤: 1. 首先,broker启动时,会创建并初始化一个NettyRemotingClient对象,用于进行网络通信。 2. 然后,broker会通过NettyRemotingClient对象向namesrv发送一个注册请求,请求中包含broker的ip地址、端口号、broker名称等信息。 3. namesrv在接收到注册请求后,会进行一系列验证,如检查broker的合法性、是否存在重复注册等。 4. 在验证通过后,namesrv会将broker的信息存储在一个map中,并维护一个最新的broker列表。 5. 最后,namesrv会向broker发送一个注册响应,表示注册成功。同时,namesrv还会定期检测broker的状态,以确保broker的可用性。 这个注册流程涉及到的文件主要包括RemotingClient、NettyRemotingClient、RemotingCommand等。这些文件主要用来实现网络通信、消息编码和解码等功能。其中的修饰语主要包括private、public等,用于控制变量的可见性和函数的访问权限。
作者其他创作
大纲/内容
构建RegisterBrokerRequestHeader注册请求
rocketmq中所以远程通信都是封装成RemotingCommand,注册请求的code为
向RouteInfoManager注册broker信息
doRegisterBrokerAll
requestHeader.setBrokerAddr(brokerAddr); 设置 Broker 地址requestHeader.setBrokerId(brokerId); 设置 Broker IDrequestHeader.setBrokerName(brokerName); 设置 Broker 名称requestHeader.setClusterName(clusterName); 设置集群名称requestHeader.setHaServerAddr(haServerAddr); 设置 HA 服务器地址requestHeader.setCompressed(compressed); 设置是否压缩RegisterBrokerBody requestBody = new RegisterBrokerBody(); 创建注册 Broker 请求体requestBody.setTopicConfigSerializeWrapper(topicConfigWrapper); 设置 Topic 配置requestBody.setFilterServerList(filterServerList); 设置过滤服务器列表final byte[] body = requestBody.encode(compressed); 编码请求体final int bodyCrc32 = UtilAll.crc32(body); 计算请求体 CRC32 校验值requestHeader.setBodyCrc32(bodyCrc32); 设置请求头的 CRC32 校验值
cmd.getCode() = 103 processorTable中没有对应的处理器,所以使用默认的处理器defaultRequestProcessor处理器在namesrv启动的时候注册的
拿到NameServer 地址this.remotingClient.getNameServerAddressList();
code = REGISTER_BROKER= 103 走REGISTER_BROKER分支switch (request.getCode())
this.brokerOuterAPI.registerBrokerAll
NameSrv服务端
开启定时线程任务注册broker信息到namesrv
registerBrokerAll
处理客服端请求processMessageReceived
获取和创建channel连接final Channel channel = this.getAndCreateChannel(addr);
发起broker注册registerBrokerAll
netty入栈事件处理器NettyServerHandler
回调callback.callback(response);
封装成异步任务,使用remotingExecutor异步执行任务处理请求
发起网络请求channel.writeAndFlush(request)
broker客服端
获取处理请求的处理器defaultRequestProcessor
将requestHeader转换为requestHeader和registerBrokerBody方便后续数据处理
响应请求ctx.writeAndFlush(response);
RemotingCommand
clusterAddrTable:集群地址表brokerAddrTable:brokerData相关信息brokerLiveTable:存活的broker表filterServerTable:过滤服务器列表该方法的主要功能是注册一个新的Broker到NameServer中,更新相关的数据结构,包括集群地址表、Broker地址表、Broker活跃信息表和过滤服务器表。同时,处理Topic配置的更新和从节点的HA服务器地址设置。整个过程通过获取和释放写锁来确保线程安全。
异步任务run()
remotingServer
REGISTER_BROKER =103
0 条评论
回复 删除
下一页