Nacos 客户端服务发现源码流程图
2022-03-29 17:15:52 0 举报
Nacos 客户端服务发现源码流程图
作者其他创作
大纲/内容
instance.setLastBeat(System.currentTimeMillis())
url = NamingHttpClientManager.getInstance().getPrefix() + curServer + api
加入队列
拼接URL
run方法
调用OnPut方法
result.addAll(clusterObj.allIPs())
放到tasks map中
NacosServiceRegistryAutoConfiguration
if (!result)
通过 命名空间Id 获取
Set<Instance> toUpdateInstances = ephemeral ? ephemeralInstances : persistentInstances
private final LinkedBlockingDeque<ServiceKey> toBeUpdatedServicesQueue
for (Instance instance : currentIPs)
设置间隔 默认5 秒
创建消息
初始化
添加
进行循环所有实例
是 DOWN 从 UP状态的Set节点中删除
核心方法 对服务名称做hash取绝对值 对服务数量取模 只有一台负责心跳机制如果其中有一台机器宕机,不算在取模数量之内 在com.alibaba.nacos.naming.cluster.ServerListManager#init方法 spring实例化后会对 @PostConstruct 注解标识的init方法 进行调用 init 又执行了run方法 进行节点状态维护
serverStatus(@RequestParam String serverStatus)
取出来的是 Runnable
获取服务
clientBeatProcessor.setService(this)
deregister(HttpServletRequest request)
查找数据处理器
定时发送心跳
核心注册逻辑 entryIPs 是新注册的机器
添加任务
创建 DistroDelayTask 任务
加入到阻塞队列 由 InnerWorker run方法执行
transportAgent.getDatumSnapshot(each.getAddress())
if (service == null)
根据ephemeral来判断 默认是true 复制了一份 写时复制思想
获取所有实例
if (!processor.process(task))
接收节点之间的心跳状态
循环所有要添加的 Instance 一般只有一个
DataStore
计算时间 [ 当前时间 - 最后一次心跳时间 > 超时时间 默认15秒]
Cluster
private Set<Instance> ephemeralInstancesprivate Set<Instance> persistentInstances
属性
ExecutorFactory.newSingleScheduledExecutorService(new NameThreadFactory(name))
发送心跳信息
从缓存获取一个数据
更新服务
List<Instance> result = new ArrayList<>()
ClientBeatProcessor clientBeatProcessor = new ClientBeatProcessor()
获取
发送数据
添加缓存
循环每个集群 一般只有一个 DEFAULT
putTask((Runnable) task)
for (Member each : memberManager.allMembersWithoutSelf())
处理请求 返回所有Instance 列表
parseInstance(request)
return instance
run()
获取所有集群节点
循环设置健康状态
通过socket 发送给客户端
集群才用到 集群节点同步数据
返回实例
HealthCheckReactor.scheduleNow(clientBeatProcessor)
return serviceInfoMap.get(key)
处理数据
处理快照
创建数据返回
Message msg = new Message()
keys.addAll(tasks.keySet())
把任务放到map中 由 DistroTaskEngineHolder 类的 DistroDelayTaskExecuteEngine 执行
核心方法 创建空服务
从 搭建集群 conf文件的 cluster.conf 文件获取所有server
构造方法中初始化线程
对服务名称做hash取绝对值
distroProtocol.onReceive(distroHttpData)
删除实例列表 标识 remove
serverListManager.onReceiveServerStatus(serverStatus)
@Bean
递归
DistroSyncChangeTask.run
注册 Put
distroComponentHolder.findDataProcessor(resourceType)
解析实例 封装为 Instance
serviceKey = toBeUpdatedServicesQueue.take()
dataProcessor.processSnapshot(distroData)
for (Instance instance : instances)
url = \"http://\" + serverIP + \":\" + EnvUtil.getPort() + EnvUtil.getContextPath() + UtilsAndCommons.NACOS_NAMING_CONTEXT + \"/service/status\"
更新注册表后发布一个事件 保证实例的及时性
同步集群节点之间的状态延迟2000毫秒执行一次 线程执行 run
new Instance()
核心方法 处理客户端心跳
ProcessRunnable
获取任务
if (ephemeral) 是否临时
return JacksonUtils.toObj(result)
核心方法 健康检查
同步成功直接结束 循环所有配置,但是同步一次就停止
创建 Nacos自动服务注册器
拼接实例健康状态
return result
构建心跳信息
udpSocket.send(ackEntry.origin)
Nacos 客户端 发现
空的话 创建一个空服务
queue.take()
拼接 Url请求路径
服务注销
创建任务
processTasks()
DistroProtocol
判断是否包含 不包含直接进行put
核心方法启动
返回true false
核心方法
request 请求Api 传入参数
字符串解析为 RsInfo 对象
if (action == DataOperation.DELETE)
添加服务并且初始化
获取服务 ServiInfo
核心方法 把构建的对象 传入
return chooseServiceMap(namespaceId).get(serviceName)
返回所有 Instance 列表
注册流程结束
获取并返回所有实例
这里是节点心跳集群之间维护, 还有一个是节点实例状态维护
new Service()
加入到队列
返回结果
clientBeatProcessor.setRsInfo(rsInfo)
for (Instance instance : instances)
通过命名空间Id 获取服务 返回
创建Instances 放入 instanceList
加入 Map
返回
服务注册
执行run方法
service.getClusterMap().get(clusterName)
TaskExecuteWorker worker = getWorker(tag)
加入任务
从阻塞队列 take 服务之间的状态
实现
把 key value 包装为 datum 放入 dataStore put到 dataMap 中
if (!getDistroMapper().responsible(service.getName()))
Notifier
PushService.onApplicationEvent(ServiceChangeEvent event)
return true
beat(HttpServletRequest request)
InnerWorker 是 TaskExecuteWorker 子类
List<Instance> instances = service.allIPs()
返回列表
循环
如果是 CHANGE 上面传入的就是 change
request 请求
线程池定时执行发送心跳 默认延迟设置的周期 5秒 BeatTask是个线程 看run 方法
加载远端所有数据快照
同步节点之间实例状态
同步数据 syncData
执行的DistroSyncChangeTask run方法
获取 ServiceInfo
ApplicationListener
run 方法
instance.setHealthy(true)
dataStore.getDataMap()
msg.setData(status)
task.run()
if (!instance.isMarked())
memberAddressInfos.remove(newMember.getAddress())
for (String namespaceId : serviceMap.keySet())
persistentInstances = toUpdateInstances
空
GlobalExecutor.scheduleUdpSender(() -> {
呼叫server端 注册服务
ServerStatusReporter 的run 方法
接收服务器状态
设置心跳最新系统时间
ClientBeatProcessor 是个线程 看他的run方法
定时更新本地缓存
接收请求
Nacos 集群加入节点
Instances instances = new Instances()
把以前的赋值到 oldMap
distroHash(serviceName) % servers.size()
判断是不是 Down状态
从注册中心注册表 获取的数据存入 currentInstanceIds
if (code == NamingResponseCode.RESOURCE_NOT_FOUND)
类型
serviceInfoMap.get(serviceInfo.getKey())
if (action == DataOperation.CHANGE)
拼接Url
删除实例
重写 ApplicationListener 接口的方法
这里不是null 已经放了一个空的
临时节点
for (RecordListener listener : listeners.get(datumKey))
呼叫服务端
循环所有节点 发送健康状态
getAllTaskKeys()
从本地缓存获取 ServiceInfo 包含实例列表
allInstances.addAll(ephemeralInstances)
是
创建实例容器列表
找不到 进行注册
processServiceJson(result)
创建了一个线程池
each 要同步的数据和每台机器地址封装到 distroKeyWithTarget
this.start()
发起请求
核心方法处理
instance.setHealthy(valid)
handleFailedTask()
getBasicIpAddress(request)
创建了个 旧的Map
GlobalExecutor.submitServiceUpdate(new ServiceUpdater(serviceKey))
DataOperation action = pair.getValue1()
解析json字符串
否
注册
获取数据
return chooseServiceMap(namespaceId).get(serviceName)
if (System.currentTimeMillis() - instance.getLastBeat() > instance.getInstanceHeartBeatTimeOut())
监听事件
继承
设置服务为 this
bind(event)
startLoadTask()
new DistroDelayTaskExecuteEngine()
for (String namespaceId : allServiceNames.keySet())
handle(pair)
put到 consistencyService DelegateConsistencyServiceImpl
判断类型
DistroProtocol 类
distroProtocol.onSnapshot(KeyBuilder.INSTANCE_LIST_KEY_PREFIX)
register(HttpServletRequest request)
startDistroTask()
getProcessor(taskKey)
获取服务下的所有实例
getPushService().serviceChanged(service)
找到相同ip 和端口的 实例进行返回
for (Instance ip : ips)
执行同步
beatInfo.setPeriod(instance.getInstanceHeartBeatInterval())
核心方法 注册方法
解析请求返回的字符串到对象
如果返回找不到服务会重新注册服务
返回服务
ServiceReporter 的 run方法
DistroConsistencyServiceImpl
private volatile Notifier notifier = new Notifier()
@PostConstruct public void init() { // notifier 提交到线程池执行 GlobalExecutor.submitDistroNotifyTask(notifier); }
将更新的服务添加到队列
if (System.currentTimeMillis() - instance.getLastBeat() > instance.getIpDeleteTimeout())
ServiceManager.init()
从远端加载数据
这里其实只是校验 因为 右边代码已经添加了空服务
是空
return namesMap
发送请求
String url = \"http://\" + IPUtil.localHostIP() + IPUtil.IP_PORT_SPLITER + EnvUtil.getPort() + EnvUtil.getContextPath() + UtilsAndCommons.NACOS_NAMING_CONTEXT + \"/instance?\" + request.toUrl()
递归延迟执行
从注册中心注册表 获取 Instance
Service
ipArray.add(ip.toIpAddr() + \"_\" + ip.isHealthy())
核心方法 删除实例
拉取其他节点数据
instance.setHealthy(false)
for (String each : distroComponentHolder.getDataStorageTypes())
if (!serviceMap.containsKey(service.getNamespaceId()))
创建Http请求 要同步的数据对象
HealthCheckReactor.scheduleCheck(clientBeatCheckTask)
持久节点
同步数据
getAllServiceNames()
客户端本地缓存 服务
allInstances.addAll(persistentInstances)
更新心跳状态
像所有节点 发送msg 健康状态
获取实例 Instance
执行一个任务 ProcessRunnable run方法
return Math.abs(serviceName.hashCode() % Integer.MAX_VALUE)
设置心跳包
进行循环
Nacos 客户端 注册
udpPush(ackEntry)
for (Instance instance : instances)
发送Http 请求进行注册
循环节点
发布事件,通知实例为健康
把原有的实例放到 oldMap
获取已有的 put 新的
通过key dataStore.get(datumKey) 获取 Datum 也就是 instance
执行 run方法
Datum<T extends Record>
public String keypublic T value
请求结果进行封装
创建一个 实例列表key ephemeral(临时节点) 默认为true
NacosNamingService.getAllInstances(String serviceName)
传入 UPDATE_INSTANCE_ACTION_REMOVE remove
方法结束看一下这个方法 集群实例之间的维护
核心服务发现方法
getPushService().serviceChanged(this)
创建任务到线程池 看 run 方法
return null
发起请求 查询服务列表 这里同时发送了一个 Udp端口 用来 服务端及时推送服务端注册销毁列表
更新实例
for (Object taskKey : keys)
if (UtilsAndCommons.UPDATE_INSTANCE_ACTION_REMOVE.equals(action))
list(HttpServletRequest request)
获取服务组名称
判断当前 ip 和端口和心跳包一致
注册服务
loadAllDataSnapshotFromRemote(each)
for (Instance instance : ips)
获取serviceInfo
异步 发起delete请求 本地路径 com.alibaba.nacos.naming.controllers.InstanceController.deregister
String url = \"http://\" + serverIP + \":\" + EnvUtil.getPort() + EnvUtil.getContextPath() + UtilsAndCommons.NACOS_NAMING_CONTEXT + \"/operator/server/status\"
获取实例
service.allIPs()
创建任务 看run方法
参数加入命名空间
for (String cluster : clusters)
是否标记过
ServiceManager 在spring初始化后 会调用 @PostConstruct 标识的init 方法
创建客户端心跳处理器
load()
核心方法 执行 DistroDelayTaskProcessor.process
if (instance.getIp().equals(ip) && instance.getPort() == port)
更新状态
拼接url 提供server状态
循环所有配置节点
第一次是空
reqApi(UtilAndComs.nacosUrlBase + \"/instance/list\
同步集群之间实例数据的阻塞队列
listWithHealthStatus(@RequestParam String key)
获取 worker
memberManager.update(server)
distroDataStorage.getDatumSnapshot()
获取 key
服务发现
serviceStatus(HttpServletRequest request)
获取任务处理器 获取的为 DistroDelayTaskProcessor
属性 Map的 Vlaue
提交任务到线程池 run方法
获取临时节点还是持久节点标志
构建 Nacos Instance
String datumKey = pair.getValue0()
定时延迟递归调用发送心跳 传入周期时间 默认还是5秒
方法返回的列表 instanceList
参数加入命名空间 ID
onApplicationEvent(WebServerInitializedEvent event)
添加服务到注册表 service 目前还是空的 还没有实例
DistroLoadDataTask run方法
for (Instance ip : toUpdateInstances)
deleteIp(instance)
递归拉取服务列表
if (NodeState.DOWN.equals(newMember.getState()))
nacos 自动配置类 spring.factories 文件
getNacosInstanceFromRegistration(registration)
service.allIPs(ephemeral)
获取所有节点实例
getServers()
listener.onDelete(datumKey)
获取数据快照
接受请求
获取所有数据
service.allIPs(clusters)
集群实例状态维护方法
获取消息
传入 UPDATE_INSTANCE_ACTION_ADD add
BeatInfo beatInfo = new BeatInfo()
返回数据
超过15秒 健康状态改为 false
DistroConsistencyServiceImpl 实现类的 Put方法
线程执行
this.serviceRegistry.register(getRegistration())
如果之前标记过 把健不健康状态改为健康
拼接url
queue.put(task)
查找数据存储
创建了个空map
for (Member server : sameSiteServers)
循环所有获取的 keys
提交任务继续异步去操作 修改节点事件的装填
循环集群所有节点 allMembersWithoutSelf() 方法中删除了自己 也就是集群除自己所有节点
执行 http 请求发送
putServiceAndInit(service)
NamingProxy .reqApi(EnvUtil.getContextPath() + UtilsAndCommons.NACOS_NAMING_CONTEXT + \"/instance/\" + \"statuses\
return target >= index && target <= lastIndex
UpdatedServiceProcessor 的 run方法
getIpAddress(request)
处理任务
take
getAllDatums()
如果是 remove
instanceMap.remove(instance.getDatumKey())
注册实例 NamingService 接口
List<Instance> instances = cluster.allIPs(true)
GlobalExecutor.submitServiceUpdateManager(new UpdatedServiceProcessor())
ephemeralInstances = toUpdateInstances
返回实例列表
putService(service)
异步处理同步节点之间实例状态
如果获取的 Instance 是空 则注服务册
绑定
return new DistroData(new DistroKey(\"snapshot\
核心方法 加入实例
跳过自己节点
处理快照数据
放入请求解析后的 ServiceInfo
收到请求执行
request 发送请求
if (result)
如果不成功
如果状态 是down 从列表 server列表删除
RestResult<String> result = HttpClient.httpPutLarge( \"http://\
构造方法
如果是临时节点
获取集群下所有实例 true 是临时节点
return serviceMap.get(namespaceId)
currentInstanceIds.add(instance.getInstanceId())
service.processClientBeat(clientBeat)
改为健康 修改的是引用所以实例已经变为健康
计算时间 [ 当前系统时间 - 最后一次心跳时间 > 删除时间 默认30秒]
发送msg
再次循环
DistroTaskEngineHolder
http 获取数据
实现注册
获取所有实例名称 key 命名空间id val 对应的服务名称列表
service.allIPs(true)
private final BlockingQueue<Runnable> queue
reqApi(UtilAndComs.nacosUrlBase + \"/instance/beat\
return new ArrayList<>(instanceMap.values())
加载数据
return msg
不成功则重试
register()
创建服务对象
memberManager.allMembers()
把 datumKey action 封装一个对象 加入阻塞队列 注册完成 Notifier 本类是个线程 看run方法
接收节点之间的状态
for (Member server : allServers)
HttpClient.httpGet( \"http://\
永久节点
dataProcessor.processData(distroData)
service.init()
AbstractAutoServiceRegistration
发布心跳超时事件
创建实例 设置端口 ip ephemeral 集群
循环所有实例
if (serviceMap.get(namespaceId) == null)
NamingProxy.getAllData(targetServer)
再次加入到队列
distroComponentHolder.findDataStorage(type)
设置心跳信息
worker.process(task)
重写
instances.setInstanceList(instanceList)
这个是集群环境决定检测心跳方法
InnerWorker run方法
建造定时
空创建 ServiceInfo
0 条评论
下一页