Nacos-AP源码整理
2024-04-12 18:00:30 0 举报
nacos-AP协议源码整理,持续更新
作者其他创作
大纲/内容
DistroVerifyTimedTask(Runable)
数据验证前的准备task任务
获取其他服务节点列表serverList;获取协议数据类型types;for(types){ 根据类型获取数据处理器; 处理器中获取所有本节点处理的临时的应用客户端连接信息clients; // 循环服务节点 for(serverlist){ 获取执行引擎; 获取传输引擎; 创建执行任务task; 将task提交给执行引擎异步传输数据; }}
调用syncToTarget()
startLoadTask();
DistroProtocol
协议入口类,自动注入bean
构造函数() { 加载依赖bean: DistroTaskEngineHolder DistroComponentHolder ServerMemberManager 调用startDistroTask方法;}startDistroTask(){ // 开启验证任务 startVerifyTask(); // 开启同步任务 startLoadTask();}
ServerMemberManager
服务端节点管理器
当前节点的信息,地址、ip端口等;所有节点的信息,server地址集合,server集合等;构造函数.init(){ 注册地址变更监听器; 读取集群节点信息(ConfigFile方式、单机模式、配置文件方式);}监听方法onApplicationEvent(WebServerInitializedEvent){ 定时任务+异步+循环服务列表+gRpc方式; 向其他节点报告当前节点情况; 如果收到302没找到处理器则改为http调用;}提供服务节点的crud操作;
DistroClientComponentRegistry
组件初始化和注册
注入成Bean;@PostConstruct { 初始化数据仓和处理组件:DistroClientDataProcessor 初始化传输组件:DistroClientTransportAgent 初始化任务失败后处理组件:DistroClientTaskFailedHandler 注册到组件持有者中;}
DistroVerifyExecuteTask(Runnable)
数据验证task任务
for(clients){ 调用传输引擎进行数据传输;}
获取传输组件
调用
DistroLoadDataTask(Runable)
数据同步task任务
判断是否有集群其他节点信息,没有则等待,每秒判断一次;判断是否有协议数据处理组件,没有则等待,每秒判断一次;for(协议数据类型types){ 获取集群其他节点信息、传输组件、数据处理组件; for(集群节点){ 调用传输agent.getDatumSnapshot方法,获取目标节点的全量client信息; 调用数据处理器DistroClientDataProcessor.processSnapshot方法,存储到对应客户端类型的ClientManager的本地缓存中去; }}
DistroComponentHolder
组件持有者,存储各种类型的组件,每种协议数据都有单独的一套组件
DistroClientDataProcessor
协议数据处理器
2.x版本的处理器类型type:Nacos:Naming:v2:ClientData订阅事件{ ClientChangedEvent、ClientDisconnectEvent、ClientVerifyFailedEvent}处理事件{ if ClientVerifyFailedEvent { DistroProtocol.syncToTarget(协议数据key,[ADD]action,目标服务节点,延迟时间); } if ClientDisconnectEvent || ClientChangedEvent { if 断开的客户端不是临时客户端,且不是本节点管理的 return; else for(其他节点) { DistroProtocol.syncToTarget(协议数据key,[DELETE/CHANGE]action,目标服务节点,延迟时间); } }}// 处理查询到的快照数据processSnapshot(){ for(client快照数据){ 根据ClientId类型加载不同的客户端管理器ClientManager; 保存数据到不同的客户端管理器的本地缓存中; }}
@Bean
创建task
调用processSnapshot()
获取执行引擎
实现
启动定时任务;默认5秒执行一次
这个地方错了,并不是定时任务周期执行
TaskExecuteWorker
任务执行worker
内部线程类+阻塞队列+while循环执行任务
task提交给引擎
ApplicationListener
Spring监听器
WebServerInitializedEvent
DistroClientTransportAgent
数据传输引擎
DistroDelayTaskProcessor
延迟任务执行器
// 处理方法process(task){ if [DELETE]aciton 提交DistroSyncDeleteTask任务给}
startVerifyTask()
协议入口类提供的方法
入口
DistroTaskEngineHolder
协议执行引擎持有者,存放执行引擎
// 延迟任务执行引擎初始化DistroDelayTaskExecuteEngine引擎;// 正常任务执行引擎初始化DistroExecuteTaskExecuteEngine引擎;
调用getDatumSnapshot()
添加task
NacosExecuteTaskExecuteEngine
正常任务执行引擎
启动worker;添加worker;调用worker执行任务;
Subscriber
Nacos提供的订阅器
// 处理事件onEvent// 订阅事件subscribeType
获取数据处理器
NacosDelayTaskExecuteEngine
延迟任务执行引擎
添加task;删除task;合并task;启动定时任务不断地处理task;内部ProcessRunnable线程类.run方法.processTasks{ for(循环所有task){ DistroDelayTaskProcessor.process处理task; }}
启动定时任务;默认30秒执行一次
AP模式
获取服务列表
收藏
0 条评论
回复 删除
下一页