Nacos单机+CAP架构的整体源码流程
2021-08-18 18:04:02 0 举报
Nacos单机+CAP架构的整体源码流程
作者其他创作
大纲/内容
NacosAutoServiceRegistration
添加key对应的操作到内存队列task中
定时执行5s后执行,间隔也是5s
InstanceController.register
NacosServiceRegistry
Notifier.run
创建Worker的时候启动了一个内部Worker线程
httpPost
instance.setLastBeat(System.currentTimeMillis()) instance.setHealthy(true);
handleFailedTask()
DistroTaskEngineHolder
distroProtocol.sync
processTasks
发布两个事件
B
数据提交
放入内存注册表
Client
beanName=consistencyDelegate
queue.put(task)
开启一个线程延时(5s)任务同步
根据服务名称去nacos服务器取所有的服务列表
A
createEmptyService
延迟过后执行run
注册的时候使用到了NacosServiceRegistry导入IOC的实例
这里源码精髓:nacos这里涉及到覆盖注册表的时候,因为可能注册表正在使用,所以这里nacos使用了copyonwrite的思想,在进行操作的时候先复制一份注册表到一个临时变量中,等待操作完成以后直接内存覆盖过去
监听了事件WebServerInitializedEventweb容器初始化事件
从服务列表删除
NotifyCenter.publishEvent(ValueChangeEvent)
服务注册
发送http 服务同步
C
spring启动会启动这个线程
DistroData distroData = transportAgent.getDatumSnapshot(each.getAddress())boolean result = dataProcessor.processSnapshot(distroData)
父类
发布一个客户端心跳检测任务
开启一个延迟任务
这个任务就是向机器内的其他集群发送服务健康的状态,动态修改服务的状态
HttpClient.asyncHttpPostLarge
processServiceJson(result)
这里是对集群的处理,大概意思就是说,在一个集群AP架构里面,心跳检测由一台机器做心跳检测就可以了,当所有的nacos服务器都启动,那么都会启动这个心跳检测的schedule,这个时候对serviceName做hash,然后与服务器数量取余得到的服务器就是心跳检测的服务器,那么这样不管几台服务器,那么相同的serviceName只会hash到一台nacos服务器做心跳
InstancePreRegisteredEvent
nacos的服务发现是在第一次调动的时候,也就是ribbo在第一次调用的时候调用的,也就是启动过后没有自动去发现服务,而是在ribbo第一次调用的时候触发的服务发现,nacos的服务发现底层代码是NacosNamingService#getAllInstances
@Bean
同步写实例到数据文件
最后达到B节点
NacosDelayTaskExecuteEngine.ProcessRunnable.run
开启一个健康检查任务(立马0s))
向leader节点发送实例注册
队列有数据过后进行处理也就是有新的注册任务了
注册的时候使用到了NacosRegistration导入IOC的实例
NamingProxy.getAllData(targetServer)通过http(/distro/datums)
通知客户端服务变动
Cluster.Cluster.updateIps
向其他server端同步
在心跳的时候如果发现这个实例不存在,那么可能由于服务重启被下线重启或者网络原因被下线,那么这个时候发现没有这个实例可进行重新注册
/instance
从更新内存注册表
发送服务注册
更新Cluster的注册表
队列的存取关系
/v1/ns/instance/service
通过UDP端口通知客户端
serviceManager.registerInstance
if (!getDistroMapper().responsible(service.getName()))
计算UDP端口然后查询服务端的服务列表(instance/list)
mapConsistencyService(key)
HealthCheckReactor.scheduleNow(clientBeatProcessor)
创建Service并且初始化
@PostConstruct public void init() { GlobalExecutor.submitDistroNotifyTask(notifier);}
是
创建一个执行引擎
bind(event)
DistroController#onSyncDatum
处理客户端的心跳
if (!isLeader())
new InnerWorker(name).start()
new DistroDelayTaskExecuteEngine()
GlobalExecutor.schedulePartitionDataTimedSync(new DistroVerifyTask
创建
DistroProtocol.sync
获取所有非自己的集群服务器
service.init()
如果同步失败,result=false就进行重试
InstanceController.beat
hostReactor.getServiceInfo
临时:ephemeralConsistencyService持久:persistentConsistencyService
DistroProtocol
超时15s没有收到client发送的心跳设置服务为非健康
NacosServiceRegistryAutoConfiguration
DistroConsistencyServiceImpl#onPut
parseInstance(request)
存放内存注册表
service.processClientBeat(clientBeat)
int index = servers.indexOf(EnvUtil.getLocalAddress()); int lastIndex = servers.lastIndexOf(EnvUtil.getLocalAddress());int target = distroHash(serviceName) % servers.size(); return target >= index && target <= lastIndex;
loadAllDataSnapshotFromRemote(each)
instance.setHealthy(false)
true
HealthCheckReactor.scheduleCheck(clientBeatCheckTask)
/raft/datum/commit
这里利用了内存队列的不同线程处理思路,一个线程存,一个线程取,达到性能的最优
HttpMethod.DELETE
raftStore.write(datum);
启动线程
startDistroTask()
节点之间相互同步节点状态如果有节点挂了,那么会影响到心跳任务计算机器的算法
执行同步
发送远程注册是,client创建的serviceName的格式是:分组名称 + @@ + serviceNameDEFAULT_GROUP@@mall-order
根据key是否包含ephemeral.
客户端本地注册表
spring-cloud-starter-alibaba-nacos-discovery-2.2.5.RELEASE.jar
TaskExecuteWorker worker = getWorker(tag); worker.process(task);
触发事件处理
raft协议的实例注册
创建一个空的Service
持久实例,走CP架构
PushService.onApplicationEvent
发布一个服务准备注册事件
内存注册表
超过30s没有收到心跳则删除这个服务
service = new Service();...putServiceAndInit(service)
/v1/ns/instance/beat
注册
InstanceController#deregister
udpPush(ackEntry)
解析json放入本地注册表
注册实例封装到Datum中
其他机器的server端
false
boolean result = distroComponentHolder.findTransportAgent(type).syncData
raft协议的实现cp架构的实现(nacos这里简单的实现,当注册实例为持久实例时走CP架构)
ServiceChangeEvent
distroMapper.responsible(serviceName)
调用A(服务变动事件)也就是通过UDP发送给client
putService(service)
根据http请求参数创建Instance实例
利用countDownLatch简单实现了一个,必须半数以上成功才会返回给客户端的半数以上的数据同步协议raft
ServerStatusReporter.run
通过@Component导入
这里的B 就是DistroConsistencyServiceImpl#onPut
运行实例同步任务
onPut执行完成时
AbstractAutoServiceRegistration
初始化
实例选择,是临时还是持久?
获取添加队列
addInstance
processor.process(task)
服务发现
return
将封装后的实例datum放入dataStore中
将任务放入hashmap
HTTP POST
for (Member each : memberManager.allMembersWithoutSelf()) {
本节点是否是leader节点
客户端服务注册(服务启动时注册)
这里解释下为什么要使用udp,我们都知道upd是不可靠的,可能会丢消息,但是性能高,客户端本来就有服务发现机制,所以即使这里的upd不能通知到也有一个客户端的服务发现机制兜底,但是正常情况都能通知到,这样服务端和客户端就保持了服务状态的准实时同步
Notifier notifier = new Notifier()
注册服务实例在集群间的数据同步
队列中添加一个DistroSyncChangeTask任务
ServerListManager.init
5s后的执行间隔
NamingProxy.syncData
发布一个服务变动事件
InstanceRegisteredEvent
Datum(封装对象)instancekeytimestamp
发布一个事件更新内存注册表
删除服务
InnerWorker.run
ClientBeatCheckTask.run
触发事件调用
Runnable task = queue.take()
临时实例,走AP架构
BeatTask#run
ServiceChangeEvent:服务变动事件InstanceHeartbeatTimeoutEvent:心跳超时事件
load();
ClientBeatProcessor#run设置心跳最后时间设置健康状态为true
同步发起的机器就是做心跳检测的那台机器
否
onApplicationEvent
发布一个服务注册完成事件
reqApi
ServiceManager.init
更新本地注册表
开始数据同步
通过resful发送服务注册
handle(pair);
RaftConsistencyServiceImpl#put
ServiceReporter.run
deleteIp(instance)
发送httppost请求
发送心跳
服务发现的自动装配jar
取出任务
DistroConsistencyServiceImpl.put
serverProxy.sendBeat
ephemeralInstances =toUpdateInstances
NacosRegistration
DistroSyncChangeTask#run
将service对应的实例写入内存注册表
开始实例同步
节点状态同步任务
ApplicationListener
register();
对于AP架构的onPut执行完成以后,需进行其他节点的数据实例同步
如果是临时实例instance.isEphemeral()添加一个心跳检测
是否同步成功,同步成功直接return,否则继续选择下一台机器进行同步
/distro/datum
Nacos集群中新节点启动过后会开启一个任务去其他节点同步实例数据
内存队列的操作
serverProxy.registerService
更新Service内存注册表
this.start();
NacosNamingService.registerInstance()
serviceManager.removeInstance
得到所有的实例
DelegateConsistencyServiceImpl.put
service.allIPs(true)
自动装配入口
spring.factories中EnableAutoConfiguration的NacosServiceRegistryAutoConfiguration
result=true
出队
NacosNamingService.getAllInstances
upd端口是服务发现时候传入的
serviceManager.registerInstance
DistroLoadDataTask#run
DistroMapper.responsible判断心跳检测的服务器是否是本机,返回true表示为本机检测
0 条评论
回复 删除
下一页