源码 - Nacos 源码解析
2022-02-06 09:45:09 1 举报
Nacos 源码解析
作者其他创作
大纲/内容
allInstances.addAll(persistentInstances); allInstances.addAll(ephemeralInstances);
deleteIP(instance);
集群数据同步: 异步批量同步方案
scheduled定时任务
Yes
GlobalExecutor.submitTaskDispatch(taskScheduler);
同步实例数据到文件
临时实例
获取客户端的服务实例缓存信息
com.alibaba.cloud spring-cloud-alibaba-nacos-discovery
DistroController#onSyncDatum
DelegateConsistencyServiceImpl#put
异步更新内存注册表
return \"ok\";
mapConsistencyService(key)
为了防止读写并发,nacos大量运行了CopyOnWrite思想防止并发读写冲突区别于Eureka放读写并发冲突的多级缓存结构, nacos的客户端感知及时性要强于eureka
2. 同步实例信息到nacos server 其他节点
除了客户端的主动拉取,服务端还有一个UDP的推送,双重手段确保时效性 (PushService)
InstanceController#list
实现了ApplicationListener接口的类,Spring启容器启动时会回调onApplicationEvent 处响应特定的事件
NacosAutoServiceRegistration
2) service.init();
找到jar包中的spring.factories文件中的org.springframework.boot.autoconfigure.EnableAutoConfiguration对应的自动装配类com.alibaba.cloud.nacos.NacosDiscoveryAutoConfiguration
实例不存在,则重新注册(比如网络不通、服务下线、重启等等)
/instance
调用server的发送心跳接口
persistentConsistencyService
serverProxy.registerService
启动cpuCoreCount个线程
getRegistration()就是NacosRegistration的实例
定时获取服务端数据并更新到本地的任务
service.srvIPs
bind(event)
获取注册过来的所有健康数据
while true
本节点是否是Leader
dataSize == partitionConfig.getBatchSyncKeyCount() || (System.currentTimeMillis() - lastDispatchTime) > partitionConfig.getTaskDispatchPeriod()
/** * font color=\"#f44336\
调用server的实例注册接口 by HttpClient
start()
NacosRegistration
和Ribbon相关
instance.setHealthy(false);
NacosServiceRegistry#register(Registration registration)
BlockingQueue tasks
nacos client
异步
ApplicationListener<WebServerInitializedEvent>
异步队列操作,提升性能
是
doSrvIPXT
服务发现
serviceInfoMap.get(key) - 客户端实例缓存Map
服务健康检查
同步实例数据给其他节点
否
extends
同方法调用
@Bean
tasks.take()
N-持久化实例
服务注册
/distro/datum
NacosServiceRegistry
taskDispatcher.addTask(key);
为空创建,否则直接获取
consistencyService.onPut
raftStore.write(datum);
KeyBuilder.matchEphemeralKey(key) ? ephemeralConsistencyService : persistentConsistencyService
入口
实现
nacos-naming子模块InstanceController#register
reqAPI(UtilAndComs.NACOS_URL_BASE + \"/instance/beat\
发布服务变化事件
Yes-临时实例
利用CountDownLatch实现了一个简单的raft协议写入逻辑,必须集群半数以上写入成功才给客户端返回
serviceRegistry就是NaocsRegistration的实例
register()
放完了数据,队列里的消息什么时候消费呢???初始化的时候启动然后while true
String url = \"http://127.0.0.1:\" + RunningConfig.getServerPort() + RunningConfig.getContextPath() + UtilsAndCommons.NACOS_NAMING_CONTEXT + \"/instance?\" + request.toUrl();
HttpClient.httpPutLarge(\"http://\
!success
ClientBeatProcessor#run
HealthCheckReactor.scheduleCheck(clientBeatCheckTask);
公共方法
存在直接获取Service
数据同步接口
调用server实例注销接口(异步)
服务端接口,未完待续
queue.offer(key); 异步操作
nacos server
HealthCheckReactor.scheduleNow(clientBeatProcessor);
查询
更新serviceInfoMap
阿里自己实现的AP模式-Distro协议
Notifier implements Runnable
通常优先看Auto的
如果注册的实例达到一定的数量,就批量同步给naocs集群中的 其他节点,或者举例上一次节点同步达到一定的时间也会会开始批量同步
NacosNamingService#getAllInstances
putServiceAndInit(service)
定时任务
将servcice对应的全量instance写入内存注册表
@PostConstructTaskDispatcher#init
NacosNamingService#registerInstance
InstanceController#beat
hostReactor.getServiceInfo
AbstractAutoServiceRegistration
TaskScheduler implements Runnable
retrySync(syncTask);
@DeleteMappingInstanceController#deregister
1) putService(service);
30秒没收到心跳,直接踢出该实例
不成功重试
异步任务ClientBeatProcessor更新客户端实例的之最后心跳时间
while true 循环从阻塞队列tasks里获取数据进行处理
RaftConsistencyServiceImpl#put
serviceRegistry.register(getRegistration())
keys.add(key)
UpdateTask
System.currentTimeMillis() - instance.getLastBeat() > instance.getIpDeleteTimeout() 30秒没收到心跳
executor.submit(notifier);
是否以 com.alibaba.nacos.naming.iplist.ephemeral. 开头
创建内存注册表结构
1. 将注册实例更新到内存注册表
构建注册信息 Map
最后会更新lastRefTimew为当前时间
nacos接入spring整合的部分
启动定时任务
@PostConstruct DistroConsistencyServiceImpl#init
instance.isEphemeral()ap模式
GlobalExecutor.submitDataSync
onApplicationEvent(WebServerInitializedEvent event)
NacosDiscoveryAutoConfiguration
调用sesrver的实例同步接口
/raft/datum/commit
看看父类、接口等
beatReactor.addBeatInfo服务心跳
本机HTTP -DELETE
service.processClientBeat(clientBeat)
/instance/list
client
/instance/beat
serverProxy.sendBeat(beatInfo)
缓存为空,调用server接口获取最新数据
ClientBeatCheckTask task
instance.setLastBeat(System.currentTimeMillis());
将临时的注册实例更新到Cluster的 empemeralInstances属性上去去,服务发现查找临时实例最终从内存中找的就是这个属性
serviceManager.getInstance == null
BlockingQueue queue = new LinkedBlockingQueue(128 * 1024)
serviceManager.registerInstance
ephemeralInstances = toUpdateInstances;
将注册实例信息更新到注册表内存结构中去
15秒没收到心跳,实例健康状态设置为false
Server
getPushService().serviceChanged(this);
new BeatTask(beatInfo)
System.currentTimeMillis() - instance.getLastBeat() > instance.getInstanceHeartBeatTimeOut() 15秒没收到心跳
TaskScheduler#run
new ConcurrentHashMap
createEmptyService
HttpClient.asyncHttpPostLarge
0 条评论
下一页