Nacos源码详细完整流程图
2022-11-27 22:11:25 7 举报
Nacos源码分析
作者其他创作
大纲/内容
服务下线流程
3、注册监听器
服务注册队列处理流程1、 注册信息放进队列2、线程从队列取出数据3、更新本地缓存4、推送到客户端其他节点
distroMapper.responsible
/raft/datum/commit
临时实例健康检查任务:ClientBeatCheckTask
ServerStatusReporter.init()
调用server的实例发送心跳接口(HttpMethod.PUT)
主要是把SpringCloud的配置参数转换成Nacos原生的配置参数
hostReactor.processServiceJson(pushPacket.data);
定时任务:集群节点状态定时同步(新版不用了)
客户端处理完通过UDP反馈ACK
udpSocket.send(ackEntry.origin);
服务发现
raftProxy.proxyPostLarge(将注册请求转发到集群的leader节点)
服务注册接口
getAllTaskKeys();
继承于AbstractAutoServiceRegistration,并重写了register方法。其构造函数中把NacosServiceRegistry传到了父类的构造函数中。
ServerMemberManager.this.allMembersWithoutSelf();
调用server的服务发现接口(HttpMethod.GET)传入的参数里有客户端的udp端口,这个是方便服务端实例有变化了通过udp方式同步给客户端
/instance/beat
1、更新本地缓存
客户端启动时,在Spring回调监听器时,进行注册
NacosServiceRegistry
更新客户端本地缓存
开启CountDownLatch,数量为集群半数节点+1
TaskExecuteWorker#process
handleFailedTask()
serviceManager.registerInstance
udp方式将服务变动通知给订阅的客户端
只有集群模式且只有Leader才能发起心跳
getPushService().serviceChanged(this)
AbstractAutoServiceRegistration
创建了ProcessRunnable,并放到线程调度中
调用server的实例同步接口(HttpMethod.PUT)
/instance
new NacosDelayTaskExecuteEngine
这里只需要向一个节点拉取数据即可,因为每个节点都有全量数据
queue.take()
InnerWorker.run()
1、判断Leader的任期时间是否到期,如果没有到期,则结束if (local.leaderDueMs > 0) { return;}2、设置随机选举开始时间local.resetLeaderDue();3、开始投票选举:sendVote();
peers.decideLeader(peer);
请求完成后回调
/distro/datums
loadAllDataSnapshotFromRemote(each)
UDP通道
遍历集群节点列表:1、向节点发起异步投票请求2、请求回来后,决定是否是Leader
持久实例健康检查任务:HealthCheckTask
是
InstanceController.list
NamingProxy.getAllData(targetServer)
UpdateTask
集群Leader选举完成,发布的事件是LeaderElectFinishedEvent,它是BaseRaftEvent的子类,而BaseRaftEvent又是Spring中ApplicationEvent的子类
MemberInfoReportTask.executeBody
this.getRegistration()就是NacosRegistration的实例
1、创建内存注册表结构
HealthCheckReactor.scheduleCheck(clientBeatCheckTask)
serviceRegistry.register(this.getRegistration())
集群Leader节点心跳任务
同步的发起机器就是做健康检查任务的那台机器
返回的就是注册时写入的实例属性
bind(event)
否
serviceRegistry就是NacosServiceRegistry的实例
putService(service)
serverProxy.registerService
MasterElection.run()
依次把集群选举任务和心跳任务放到线程池
GlobalExecutor.submitLoadDataTask
consistencyService.onPut
RaftController.vote
queue.put(task)
1、遍历所有的节点,若voteFor不为空,则将节点的voteFor添加到ips中,并记录选票最多的节点2、如果此节点的投票次数已经大于半数节点,则把该节点设置为Leader,然后发布Leader选举完成的事件:LeaderElectFinishedEvent
distroProtocol.sync
RaftController.beat
关于AP和CP实现上的区别:客户端向集群请求时,是从集群节点中随机选择一个节点进行请求的。对于AP模式,所有节点都可以处理客户端的请求。节点处理完成后,会同步给集群其他节点。对于CP模式,在处理时,只有Leader节点才可以进行处理,当请求到本节点时,如果自己不是Leader节点,则把请求转发到Leader节点进行处理。Leader在处理的过程中,需要把数据同步到其他节点,当半数以上通过时,才算是真正的成功。
构造方法里开启数据同步任务
sendBeat
1、任期加1:local.term.incrementAndGet();2、投票给自己:local.voteFor = local.ip;3、设置自己为候选者状态:local.state = RaftPeer.State.CANDIDATE;
spring-cloud-alibaba-nacos-discovery.jar里的spring.factories文件里的EnableAutoConfiguration对应NacosServiceRegistryAutoConfiguration
同步写实例数据到文件
N
调用server的实例注销接口(HttpMethod.DELETE)
服务端通过UDP推送到客户端
Server
用服务名称hash后对机器数取模,选择集群里的一台机器执行任务
MemberUtil.onSuccess
start()
DistroDelayTaskProcessor#process
同步实例信息到集群其他节点主要逻辑:1、添加实例数据到Map中2、定时线程ProcessRunnable判断Map中有没有数据,如果有数据,则放到一个阻塞队列中3、定时线程InnerWorker从阻塞队列中取出数据,然后调用集群节点的同步接口进行数据的同步4、如果第3步调用失败,则进行重试逻辑5、如果重试还是失败,则有定时任务:服务实例信息在集群点间定时同步来补偿6、而集群节点收到同步请求,其实核心还是调用了同样的onPut方法进行处理的。为什么不直接放到队列里,而是还需要让另一个线程去拿到再放到队列,再由其他线程拿到队列数据进行处理呢?因为不同的task有不同的处理器进行处理,此处的处理就是放到队列中。主里有比较好的一个设计:任务执行引擎
NacosServiceRegistry.register
创建心跳后回到注册主流程
开启了临时实例健康检查定时任务:ClientBeatCheckTask
通过使用阻塞队列的方式,避免了死循环导致白白耗费CPU资源的问题
定时任务:集群节点状态定时同步
startLoadTask()
1、定时拉取
启动时
/instance/list
MemberInfoReportTask
服务下线接口
将service对应的全量实例instances写入内存注册表
beatReactor.addBeatInfo
如果某个实例超过15秒没有收到心跳,则将它的healthy属性置为false
POST
/raft/beat
@Bean
HeartBeat.run()
根据实例类型,返回不同的实例类型处理类
PushReceiver.run
定时线程:定时调用服务端实例列表接口查询最新数据并更新到本地缓存
遍历集群节点列表,寻找票数最多的节点,如果此节点的票数超过半数了,则该节点为Leader。最后发布Leader选举完成事件
DelegateConsistencyServiceImpl.put
HttpClient.asyncHttpPost
封装请求数据,并用GZIP压缩
推送到客户端流程
源码入口
onApplicationEvent
NacosRegistration
在Nacos-Server启动成功后,会定时给除自己之外的其他Member进行通信,检测其他节点是否还存活。如果通信失败,会将该Member状态置为不健康的,如果后续和该节点重新通信成功,会将该节点的状态置为健康,该Task与Responser的计算密切相关。这个任务的两个作用:一方面是为了同步当前节点信息,另一方面也是健康检查
1. Nacos是怎么和SpringCloud连接起来的2. 服务注册3. 临时实例心跳上报机制:客户端定期通过心跳机制上报状态4. 服务端临时实例健康检查:服务端定期检测客户端的心跳状态,根据状态对客户端做不健康标记及服务下线操作。5.服务端对持久实例主动健康检查6. 服务下线:主动触发、健康检查触发。7. 服务发现:启动时服务发现、调用时服务发现8. 推送到客户端其他节点:服务上下线等变更时通过UDP推送到客户端。9. 服务端集群节点数据同步:服务上下线等变更时,同步到集群其他节点。10. 服务端集群节点状态同步:集群节点可能会出现宕机的问题,其之间需要定期做数据及状态同步11. 服务端集群节点心跳任务12. 服务端集群启动流程:raft协议选举Leader流程、去集群其他节点拉取全量数据13. 客户端启动流程:当前服务实现发现,定期拉取服务列表的任务添加当前服务14. CP架构下服务注册流程15. 故障转移流程
客户端心跳上报流程
如果本节点不是Follower,则设置为Follower
mapConsistencyService(key)
循环从阻塞队列tasks里拿实例数据处理
注册两个监听器,一个是临时实例的,一个是持久实例的。在后续这个实例发生变更的事件通知中,会回调
/nacos/v1/core/cluster/report
继承
2、发布服务变化事件
service.processClientBeat(clientBeat)
NamingProxy.reqApi
1、加载持久化实例数据raftStore.loadDatums2、设置选举周期setTerm
发布事件ValueChangeEvent更新内存注册表
ServiceManager.init()
register()
GlobalExecutor.registerHeartbeat(new HeartBeat());
这里向集群其他节点发起投票请求前,先判断此节点是否就是Leader节点,如果是,则不需要发起投票请求,然后直接执行latch.countDown();,相当于我自己肯定投给我自己了。如果不是leader节点,则再发起投票请求
1
notifier.run()
事件监听 器RaftListener.onApplicationEvent
实现了SpringCloud注册中心规范:ServiceRegistry
服务注册主流程
调用server的实例注册接口(HttpMethod.POST)
这里有个疑问:从源码上可以看出,是先本节点处理,然后通过listener.onChange把数据推送给其他客户端,最后才是向集群其他节点发起投票请求。那么如果投票没有过半数,那之前本节点处理的和已经推送给客户端的数据不就有问题了吗?
如果同步不成功重试
sendVote
Y
NotifyCenter.publishEvent
如果本节点的任期大于请求的任期,则抛出异常 if (local.term.get() > remote.term.get()) {
deleteIP(instance)
instance.setHealthy(false)
同步实例信息到集群其他节点流程
这里,如果UDP推送失败,也不影响,客户端有定时任务定时拉取服务实例列表
getDistroMapper().responsible(service.getName())
通过@Component+@PostConstruct执行
DistroController.onSyncDatum
PushService.onApplicationEvent(ServiceChangeEvent event)
1、设置节点状态为UP2、发布节点变更事件:MembersChangeEvent
服务发现流程
service.init
定时任务:服务实例信息在集群点间定时同步
遍历监听器,调用每一个监听器的onChange方法
2、同步实例信息到nacos server集群其它节点
2
新节点启动后,从集群其它节点一次性同步数据
阿里自己实现的CP模式的简单Raft协议
local.state = RaftPeer.State.FOLLOWER;local.voteFor = remote.ip;
hostReactor.getServiceInfo
最后会更新lastRefTime为当前时间
tasks.take()
服务注册
数据同步接口
这是SpringCloud提供的注册中心标准规范。实现ApplicationListener接口的类,spring容器启动时会调用处理事件方法
集群节点状态同步任务
instance.setLastBeat(System.currentTimeMillis())
raftStore.write(datum)
load()
allInstances.addAll(persistentInstances);allInstances.addAll(ephemeralInstances);
/raft/vote
如果某个实例超过30秒没有收到心跳,直接剔除该实例(被剔除的实例如果恢复发送心跳则会重新注册)
开启了持久实例健康检查任务
向follower发起请求
1、Nacos与SpringCloud整合2、客户端启动时服务注册入口流程
serviceInfoMap(客户端实例缓存map)
serviceManager.removeInstance
Task.run()
1、本地缓存中删除该节点2、标记节点为SUSPICIOUS状态3、如果达到了最大失败次数(配置:nacos.core.member.fail-access-cnt,默认3次),则设置节点状态为DOWN4、发布节点变更事件:MembersChangeEvent
/distro/datum
异步向集群中其他节点发起投票请求
DistroConsistencyServiceImpl.init()
如果投票失败,则结束。如果投票成功,则latch.countDown();
遍历监听器
心跳接口
如果客户端是不健康状态,则标记为健康并发布实例变更事件
new DistroDelayTaskExecuteEngine(){super()//通过调用父类构造方法,创建了NacosDelayTaskExecuteEngine}
Follower节点如何确认Leader挂了,需要重新选举的?Leader定时给Follower发送心跳,Follower接收心跳时,执行了一个操作:local.resetLeaderDue();即重置了选举周期。在MasterElection的选举开始时,计算(leaderDueMs-500)是否大于0,如果大于0,则表示还没到选举期,选举结束。如果Follower长期没有收到Leader的心跳包,则这个leaderDueMs就不会变化,最终(leaderDueMs-500)一定会小于0,然后开始进行选举
集群Leader选举接口
DistroTaskEngineHolder
RaftCore.init()
GlobalExecutor.registerMasterElection(new MasterElection());
这里的queue为一个阻塞队列,task类型为:DistroSyncChangeTask
/raft/datum
healthCheckProcessor.process(this);
系统启动时,在NacosWatch中去服务端获取当前服务的实例列表,而对于其依赖的服务,其服务发现是在第一次调用服务接口时根据服务名去服务端获取的,这是一种懒加载机制
心跳任务:BeatTask
如果缓存为空,调用server接口获取最新服务数据
NamingService.registerInstance
如果重试仍然失败呢?如果重试还是失败,则有定时任务:服务实例信息在集群点间定时同步来补偿
Nacos和SpringCloud就是在这连接结合了
将临时的注册实例更新到了cluster的ephemeralInstances 属性上去,服务发现查找临时实例最终从内存里找到的就是这个属性if (ephemeral) { ephemeralInstances = toUpdateInstances; } else { persistentInstances = toUpdateInstances;}源码精髓:nacos这个更新注册表内存方法里,为了防止读写并发冲突,大量的运用了CopyOnWrite思想防止并发读写冲突,具体做法就是把原内存结构复制一份,操作完最后再替换回真正的注册表内存里去。Eureka防止读写并发冲突用的方法是注册表的多级缓存结构,只读缓存,读写缓存,内存注册表,各级缓存之间定时同步,客户端感知的及时性不如nacos
ephemeralConsistencyService
通过 CountDownLatch 实现了一个过半机制new CountDownLatch(peers.majorityCount()) 只有当成功的节点大于 N/2 + 1 的时候才返回成功。 public int majorityCount() { return peers.size() / 2 + 1; }
这个任务,主要是Leader发送心跳给集群其他节点,这些节点收到心跳请求后,更新Leader的任期时间。如果在指定时间内,这个任期时间没有更新,说明过期了,需要重新选举Leader了。这一块是在MasterElection中判断并处理的
ApplicationListener
获取客户端的服务实例缓存信息
是否是临时实例?
实现
onReceive():
client
udpSocket.receive(packet);
InstanceController.register
源码精髓:很多开源框架为了提升操作性能会大量使用这种异步任务及内存队列操作,这些操作本身并不需要写入之后立即成功,用这种方式对提升操作性能有很大帮助
注册服务实例信息在集群节点间同步任务
阿里自己实现的AP模式的Distro协议
udpPush(ackEntry);
NacosServiceRegistryAutoConfiguration
NacosAutoServiceRegistration
task.run()
客户端收到服务端UDP推送处理流程
集群启动流程
persistentConsistencyService
1、将注册实例更新到内存注册表
将新注册实例加入对应服务service的实例列表里去
DistroSyncChangeTask.run()
NacosDelayTaskExecuteEngine#processTasks
distroMapper.responsible(serviceName)
int target = distroHash(serviceName) % servers.size()
putServiceAndInit(service)
集群节点之间相互同步节点状态,如果有节点宕机了,集群其他节点会感知到并更新集群节点的状态,这个会影响心跳任务机器选择的计算
2、开启健康检查任务
Nacos集群新节点启动时向其它节点拉取数据同步流程
集群选举任务
标记Leader节点peers.makeLeader(remote);
service.srvIPs
RaftCore.receivedBeat
InstanceController.beat
doSrvIPXT
服务端对客户端做健康检查流程
服务端:持久化实例注册处理流程
DistroProtocol
/nacos/v1/ns/service/status
健康检查逻辑一、临时实例:1、采用客户端心跳检测模式,心跳周期5秒2、心跳间隔超过15秒则标记为不健康3、心跳间隔超过30秒则从服务列表删除二、永久实例:1、采用服务端主动健康检测方式2、周期为2000 + 5000毫秒内的随机数3、检测异常只会标记为不健康,不会删除
getPushService().serviceChanged(service);
ServerStatusReporter.run()
ServiceReporter.run()
MemberUtil.onFail
集群节点心跳接口
transportAgent.getDatumSnapshot(each.getAddress())
如果请求不是Leader发起的,则抛出异常if (remote.state != RaftPeer.State.LEADER) {
2、拉取后更新本地缓存
PersistentNotifier.onEvent(ValueChangeEvent event)
客户端线程任务:1、心跳任务:BeatTask2、接收服务端事件变更推送处理:PushReceiver3、故障转移:SwitchRefresher4、内存落盘:DiskFileWriter:写本地磁盘的任务,主要是备份服务信息,每天执行一次5、定时向服务端拉取数据:UpdateTask服务端线程任务1、对客户端临时实例的健康检查:ClientBeatCheckTask2、对客户端持久实例的健康检查:HealthCheckTask3、集群节点健康检查:MemberInfoReportTask4、心跳处理线程:ClientBeatProcessor5、集群节点状态同步任务(不用了?):ServerStatusReporter6、注册服务实例信息在集群节点间同步:ServiceReporter7、集群启动时,向集群其他节点拉取数据任务:DistroLoadDataTask8、UDP推送到客户端重试任务:Retransmitter事件汇总:1、InstanceHeartbeatTimeoutEvent,触发时机:ClientBeatCheckTask检测到服务不健康了。监听器:未找到2、ServiceChangeEvent,触发时机:服务上线、下线。监听器:PushService设计模式:1、工厂方法模式2、模板方法模式3、观察者模式4、单例模式5、委派模式(非标准名称)6、策略模式性能高的原因1、大量用到缓存、内存队列、异步、线程池2、锁优化:CopyOnWrite(写时复制)、缩小锁粒度3、事件机制设计借鉴1. 缓存机制2. 事件机制3. 设计模式4. 异步机制5. 数据一致性6. 锁优化:CopyOnWriter(写时复制)、缩小锁粒度7. 统一日志规范8. 任务执行引擎吐槽魔法代码:cp架构下,RaftCoreraft算法动态演示:http://thesecretlivesofdata.com/raft/
往阻塞队列tasks里放入注册实例数据
startDistroTask()
开启一个任务:ClientBeatProcessor,更新客户端实例的最后心跳时间
DistroLoadDataTask.run()
本节点是否leader
如果实例不存在重新注册(如网络不通导致实例在服务端被下线或服务端重启临时实例丢失)
createEmptyService
NacosServiceDiscovery.getInstances()
run方法
服务端:临时实例注册处理流程1、更新本地缓存2、放进队列,由队列异步处理3、同步实例信息到集群其他节点
是否成功
ProcessRunnable.run()
receivedVote的主要逻辑:如果发起请求的节点任期小于本节点,则将投票投给本节点,即:不投给发起节点。if (remote.term.get() <= local.term.get()) {.否则:1、将本节点的状态设置为Follower:local.state = RaftPeer.State.FOLLOWER;2、投票给发起节点:local.voteFor = remote.ip;3、设置本节点的任期为发起节点的任期:local.term.set(remote.term.get());4、把投票数据返回给发起节点
latch.await超过指定时间后,抛出异常
这里是一个遍历集群节点列表的循环,因为要同步到集群所有节点
这里的task,并不是一个队列,而只是一个Map。后续是有一个定时线程(ProcessRunnable)判断这个Map里有没有数据,如果有数据,就进行同步的处理。
GlobalExecutor.submitDistroNotifyTask(notifier)
延时执行的定时任务更新客户端的服务缓存
遍历集群节点列表,循环处理
handle(pair);
memberManager.update(self);
RaftCore.receivedVote
0 条评论
下一页