Nacos注册中心
2024-09-05 16:05:03 0 举报
Nacos注册中心
作者其他创作
大纲/内容
EphemeralClientOperationServiceImpl#registerInstance
ClientSubscribeServiceEvent
return result
NacosServiceRegistry#register
services = client.getAllPublishedService()
继承
client = clientManager.getClient(clientId)
singleton = ServiceManager.getInstance().getSingleton(service)
if (null == serviceInfo) 本地为空
客户端依赖版本
AbstractNamingInterceptorChain#doInterceptor
推送消息给该服务所有订阅者所以PushDelayTask未指定目标Client
注册表 新增/删除 Service与代表其的ClientIdpublisherIndexes.get(service).add(clientId) / .remove(clientId)
添加Client的健康检查任务每个Client独有
serviceInfoHolder.processServiceInfo(notifyRequest.getServiceInfo())
获取服务下的所有Client挨个从Client的注册表publishers属性中取出Instance信息
查询服务实例请求处理ServiceQueryRequestHandler#handle
switch (distroDelayTask.getAction())
客户端断开事件
false
distroDelayTask = new DistroDelayTaskspan style=\"font-size:inherit;\
for (String outDateConnectionId : outDatedConnections)
注册服务实例
ServiceSubscribedEvent
先从本地serviceInfoMap中获取key的组成:groupName + \"@@\" + serviceName + \"@@\" + clusters (如果配置了集群名称才拼接)
服务订阅事件
DistroSyncChangeTask
if (response != null && response.isSuccess())
publishers.get(service)
AbstractClient#addServiceSubscriber / #removeServiceSubscriber
client = clientManager.getClient(each)
serviceDataIndexes.get(service)
DistroProtocol#syncToTarget
服务实例注册请求InstanceRequest
处理服务信息
init()
健康检查ConnectionManager#start
result = serviceInfoHolder.getServiceInfoMap().get(serviceKey)
任务最终放到了TaskExecuteWorker中的阻塞队列中:BlockingQueue<Runnable> queue然后通过内部类 TaskExecuteWorker@InnerWorker#run作为消费线程处理任务,最终调用到了Task的doExecute方法
connection.asyncRequest
deleteTimeout默认30秒
for (Member each : memberManager.allMembersWithoutSelf())
connections = connectionManager.connections
推送服务变化给订阅者NotifySubscriberRequest
KEEP_ALIVE_TIME = 20000L默认超过20s没有收到心跳的客户端加入到过期列表中
beatCheckTask =new ClientBeatCheckTaskV2(this)
构建DistroKey对象,组成:resourceKey:clientIdresourceType:\"Nacos:Naming:v2:ClientData\"
注销探活失败的客户端
ClientRegisterServiceEvent
InstanceBeatCheckTask#passIntercept
clients.remove(clientId)
PushExecutorRpcImpl#doPushWithCallback
this.clientProxy = new NamingClientProxyDelegate
false(取消订阅服务)
NotifyCenter.publishEvent(new InstancesChangeEvent
atomicInteger.decrementAndGet()
NamingClientProxyDelegate#queryInstancesOfService
ClientReleaseEvent客户端释放事件处理
HealthCheckEnableInterceptor#intercept
ejectOutdatedConnection()
PushExecuteTask#run
DistroProtocol#onReceive
client.recalculateRevision()
遍历除自己外的其他server挨个发送同步请求
定时调度5秒钟执行一次任务底层使用JUC下的ScheduledExecutorService实现调度
ServiceChangedEvent
只推送给当前订阅该服务的订阅者(其实就是本次注册服务的Client)
instance.setHealthy(false)
处理同步实例数据DistroClientDataProcessor#processData
NacosExecuteTaskExecuteEngine#addTask
ipPortBasedClient.init()
注销服务实例
Grpc查询服务实例请求
case CHANGE、ADD:
查询服务列表时会暴露一个udp端口给NacosServer用于Server端在服务实例列表有变化时主动通知Client
publisherIndexes 是2.1版本中的注册表,记录了服务与服务下所有的ClientId通过ClientId找到Client对象进而操作客户端Client对象里的publishers属性记录了Service与Instance读/写注册表是分开的,读的是另外一个缓存Map相较于1.4.1中整个注册表只有一个serviceMap,结构更简单,能更好的支持并发操作
心跳过期的修改实例健康状态为false
case DE_REGISTER_INSTANCE
client.removeServiceInstance(singleton)
添加异步任务
DistroSyncChangeTask / DistroSyncDeleteTask任务执行逻辑
case REGISTER_INSTANCE
从Client里拿出 publishers(客户端所对应的实例) 进行遍历
根据客户端传递的参数 subscribe 执行订阅/取消订阅服务逻辑
服务注册入口
PushDelayTask
实例心跳检查UnhealthyInstanceChecker#doCheck
AbstractClient#addServiceInstance
客户端改变事件
clientConnected(client)
DistroClientTransportAgent#syncData
true(订阅服务)
Client初始化逻辑ipPortBasedClient#init
从本地serviceInfoMap中获取服务,如果为空或者未订阅该服务,执行订阅操作
if (subscribe)
AbstractClient#removeServiceInstance
ServiceSubscribedEvent服务订阅事件
根据service从订阅记录表 subscriberIndexes 删除Client
publishers.remove(service)
dataProcessor.processData(distroData)
return serviceInfo
for (Service each : client.getAllSubscribeService())
<dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId> <version>2022.0.0.0</version></dependency>discovery Jar包下/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports
result = serviceStorage.getData(service)
onApplicationEvent
客户端取消订阅服务事件
ClientDisconnectEvent
ClientUnsubscribeServiceEvent
注册服务请求处理InstanceRequestHandler#handle
subscribers.remove(service)
getPushData(service)
AbstractClient#getInstancePublishInfo
超时的从Client的publishers中移除服务
serviceDataIndexes.containsKey(service)
实例过期检查ExpiredInstanceChecker#doCheck
实现并监听WebServerInitialized事件所以Nacos的服务注册是在Web容器初始化完成后执行的
消费队列任务
for (String each : serviceIndexesManager.getAllClientsRegisteredService(service))
returnserviceInfoHolder.processServiceInfo(result)
beatTimeout默认15秒
ConnectionBasedClientManager#clientDisconnected
HealthCheckReactor.scheduleCheck(beatCheckTask)
构造方法
DistroSyncDeleteTask
EphemeralClientOperationServiceImpl#deregisterInstance
NamingGrpcClientProxy#doRegisterService
NacosAutoServiceRegistration
ApplicationListener<WebServerInitializedEvent>
client.getInstancePublishInfo(service)
客户端释放事件
ProcessRunnable#run
订阅服务
ClientChangedEvent
拦截都通过后,才执行检查任务
for (String each : getTargetClientIds())
添加异步任务,定时请求Server同步实例数据延迟一秒后执行,后续默认6s执行一次
对心跳超时的客户端主动发起一次探活请求
ClientDisconnectEvent客户端断开事件处理
instance = getNacosInstanceFromRegistration(registration)
处理服务端推送的变化数据NamingPushRequestHandler#requestReply
从ClientManager中的clients中移除
ClientChangedEvent客户端改变事件处理
case DELETE客户端 删除
true(正常情况执行)
subscriberIndexes 是订阅表,记录了服务与订阅该服务的ClientId集合当该服务有变动时,新增/删除实例 会通知所有订阅者
定时同步服务实例ServiceInfoUpdateService#scheduleUpdateIfAbsent
将 namespaceId、serviceName、group等信息组装成实体 InstanceRequest
this.serviceRegistry.register(getRegistration())
本地缓存 serviceDataIndexes 中存在直接取出来返回
for (Service each : client.getAllPublishedService())
添加clientId到ClientManager的clients中
连接数量过载检查-移除数量过多的客户端连接ejectOverLimitConnection()
Grpc同步实例请求(Distro协议)
ClientServiceIndexesManager#onEvent
执行client的初始化逻辑
客户端心跳检查
EphemeralIpPortClientFactory
System.currentTimeMillis() - instance.getLastHeartBeatTime() > deleteTimeout
为了提升性能,临时实例使用Grpc与NacosServer通信,持久化实例仍然使用http。因为持久化实例用的非常少,所以并没有进行大多重构
InstancesChangeNotifier#onEvent
Grpc订阅服务请求
同步实例请求处理(Distro协议)DistroDataRequestHandler#handle
从Client里拿出 subscribers(客户端所订阅的服务) 进行遍历
EphemeralIpPortClientManager#clientDisconnected
构建DistroKey对象,用于异步任务使用
服务 注册/注销/订阅/取消订阅 事件处理
processTasks()
distroProtocol.onReceive(distroData)
服务实例查询请求ServiceQueryRequest
registerService
DistroClientDataProcessor#onEvent
服务变化事件
订阅/取消订阅服务请求处理SubscribeServiceRequestHandler#handle
Nacos通信相关类包:com.alibaba.nacos.client.naming.remote
Grpc注册服务实例请求
实现
@Bean
移除订阅表中的关系
NacosRuntimeConnectionEjector#doEject
handleClientOperation((ClientOperationEvent) event)
核心类NacosNamingService
DistroProtocol#sync
EphemeralClientOperationServiceImpl#subscribeService / #unsubscribeService
onResponse
processor.process(task)
NamingClientProxyDelegate#registerService
订阅服务请求SubscribeServiceRequest
1、从connectionManager中拿到所有的Connection对象2、挨个检查Connection心跳是否过期3、将过期的Connection添加到一个集合4、对过期集合中的Connection主动发起一次探活请求5、探活失败的Connection执行注销操作
connectionManager.unregister(outDateConnectionId)
服务注册
switch (request.getType())
or (Service each : services)
NamingClientProxyDelegate#subscribe
NacosDelayTaskExecuteEngine#addTask
ClientReleaseEvent客户端释放事件
case ADD、CHANGE客户端 添加/改变
result = emptyServiceInfo(service)
subscriber = client.getSubscriber(service)
Distro延时任务处理器DistroDelayTaskProcessor#process
getAllInstancesFromIndex(singleton)
探活成功添加客户端到 successConnections
HealthCheckResponsibleInterceptor#intercept
false本地缓存中没有
ServiceInfoHolder#processServiceInfo
remove = this.connections.remove(connectionId)
获取推送的目标Client未指定时推送给服务下全部订阅者
判断服务是否有变化(实例列表)
NacosServiceRegistryAutoConfiguration
service就是要订阅/取消订阅的服务封装对象subscriber 就是客户端的封装对象
UpdateTask#run
result.add(instance); clusters.add(instance.getClusterName())
distroData.setType(DataOperation.DELETE)
register()
doExecute
服务变更事件
put订阅关系到订阅表中
NacosNamingService#selectInstances
创建DistroData对象作为Distro协议同步请求的数据载体
switch (distroData.getType())
handleClientDisconnect((ClientOperationEvent.ClientReleaseEvent) event)
true服务有变化
RpcPushService#pushWithCallback
instance = (HealthCheckInstancePublishInfo) client.getInstancePublishInfo(each)
NamingGrpcClientProxy#queryInstancesOfService
内存中获取服务ServiceStorage#getData
ServiceChangedEvent服务变更事件处理
connection.freshActiveTime();successConnections.add(outDateConnectionId);
客户端注册事件
服务发现
MetricsMonitor.decrementInstanceCount()
ClientReleaseEvent
NamingSubscriberServiceV2Impl#onEvent
ClientDeregisterServiceEvent
syncToAllServer((ClientEvent) event)
EphemeralIpPortClientManager#syncClientConnected
if (now - client.getMetaInfo().getLastActiveTime() >= KEEP_ALIVE_TIME)
根据service从服务记录表 publisherIndexes 删除Client
ClientDisconnectEvent客户端断开事件
return serviceInfoMap.get(key)
Nacos延时任务处理引擎NacosDelayTaskExecuteEngine
客户端订阅服务事件
注销客户端连接
任务最终包装成了PushDelayTask放到了TaskExecuteWorker中的阻塞队列中:BlockingQueue<Runnable> queue然后通过内部类 TaskExecuteWorker@InnerWorker#run作为消费线程处理任务,执行Task的run方法
订阅表 新增/删除 Service与代表其的ClientIdsubscriberIndexes.get(service).add(clientId) / .remove(clientId)
参数开启后生效nacos.naming.expireInstance
return instance.isEphemeral() ? grpcClientProxy : httpClientProxy
client.release()
构造方法中调用init()对属性 clientProxy 创建对象
从 connectionManager 中拿到所有的Connection对象,挨个判断是否心跳超时
System.currentTimeMillis() - instance.getLastHeartBeatTime() > beatTimeout
case DELETE:
DistroDataRequest同步实例请求(Distro协议)
ConnectionManager#unregister
Client健康检查任务ClientBeatCheckTaskV2#doHealthCheck
NamingGrpcClientProxy#subscribe
1、将服务信息放入本地缓存:serviceInfoMap2、判断服务下实例列表是否有变化,如有变化, 2.1、发布事件:InstancesChangeEvent 2.2、将服务信息写入磁盘 {user.home}/nacos/naming/{namespace}
AbstractAutoServiceRegistration
instanceInfo = getPublishInfo(instance)
request.isSubscribe()
MetricsMonitor.incrementInstanceCount()
创建延时任务,异步执行数据同步
clientConnectionEventListenerRegistry.notifyClientDisConnected(remove)
将实例信息添加到publishers
return new LinkedList<>(result)
remove.close()
1、从本地集合 connections 中移除2、将clientIp对应原子统计类的连接统计数量减一3、执行Connection的close方法,关闭channel4、通知监听ClientDisConnected事件的监听者
clientId 就是connectionId:客户端与服务端建立连接时的连接Id在ClientManager中,有个集合保存了所有客户端连接:span style=\"font-size:inherit;\
客户端注销事件
NacosNamingService#registerInstance
创建Client对象
distroData = new DistroData()
getExecuteClientProxy
Grpc请求推送变化给订阅者
for (Service each : services)
distroData.setType(DataOperation.CHANGE)
namingService = namingService()
outDatedConnections.add(client.getMetaInfo().getConnectionId()
handleSyncData(request.getDistroData())
0 条评论
回复 删除
下一页