nacos配置中心源码
2022-07-12 21:02:28 0 举报
naocs配置中心源码流程图
作者其他创作
大纲/内容
extends
执行lamda表达式
BootstrapApplicationListener先是这个监听器(优先级较高 - Order)
NotifyCenter.registerSubscriber(new Subscriber() {})lamda表达式处理ConfigDataChangeEvent事件
写入磁盘并更新缓存
服务端配置缓存
readTimeoutMs=45sheader参数:Long-Pulling-Timeout=30sLong-Pulling-Timeout-No-Hangup=true
NacosDelayTaskExecuteEngine
NacosContextRefresher.onApplicationEvent(ApplicationReadyEvent event)
ClientLongPolling clientSub = iter.next()
run()
${spring.application.name}${spring.application.name}.${file-extension:properties}${spring.application.name}-${profile}.${file-extension:properties}
ConfigController @PostMappingpublishConfig
openEventHandler()
将配置刷新信息添加到刷新历史记录中
拿客户端请求过来的缓存配置
创建一个线程任务
addConfigFilesToEnvironment()
监听配置的变更从服务端拉取变更的dataid list
注册Subscriber处理LocalDataChangeEvent事件
extension-configs
locator.locateCollection(environment)
Thread
cache.addListener(listener)
loadExtConfiguration(composite)
发布事件事件处理会去请求集群中每台机器去添加任务
listenerWrap.lastContent = content
刷新容器中标记了@RefreshScope的bean
NotifyCenter.registerSubscriber(new Subscriber() {})lamda表达式处理LocalDataChangeEvent事件
ContextRefresh
这个构造方法初始化了好多东西,得好好看看
queue.offer(event)
长轮询任务用来更新本地配置
CacheData
NotifyCenter.publishEvent(event)
SmartApplicationListener
queue.poll()
2. bean初始化
ConfigFileApplicationListener---> ConfigFileApplicationListener#load
刷新RefreshScope里面的bean实例
@Bean
spring-cloud-starter-alibaba-nacos-config-2.2.5.RELEASE.jar!\\META-INF\\spring.factoriesNacosConfigBootstrapConfiguration
AsyncTask.run()
CacheItem
更新服务端的缓存
DataChangeTask.run()
变更的配置触发事件监听器来处理
loadSharedConfiguration(composite)
ApplicationContextInitializer.initialize(context)
执行延迟的定时任务
client
发布事件监听bootstrap.properties先加载然后是application.properties
executorService.execute(new LongPollingRunnable(i))
processTasks()
2. 再加载application.properties
server主动推送变更配置的dataid给到client
插入数据库的config_info表
processor.process(task)
用于向nacos server发起请求
1. ApplicationReadyEvent事件处理
listener.receiveConfigInfo(contentTmp)
ExternalDumpService
ApplicationListener
prepareContext
listeners.environmentPrepared(environment)
1. 推断构造方法创建原始对象
this.refresh.refresh()
配置信息
EventPublishingRunListener.running(ConfigurableApplicationContext context))
refreshEnvironment()
clientSub.sendResponse(Arrays.asList(groupKey))
for (String groupKey : changedGroupKeys) { 遍历有配置变更的groupKey 从服务端拉取这个groupKey的配置更新本地配置文件和缓存配置}
配置的内容发生变更当前CacheData的content和上次的content不一致if (!md5.equals(wrap.lastCallMd5)) {}
构造方法注册一个DefaultPbulisher
cacheData.checkListenerMd5()
(AbstractConfigChangeListener) listener).receiveConfigChange(event)
获取配置数据
构造方法
allSubs.remove(ClientLongPolling.this)
增量Dump配置
2. RefreshEvent事件处理
createConfigService(this.nacosConfigProperties)
获取新的参数值,并和之前的进行比较找出改变的参数值
PropertySourceLocator#locateCollection(Environment)
DefaultSubcriber#notifySubscriber
ConfigController@GetMappinggetConfig
removeTask(taskKey)
客户端的配置数据还有ip都存在了这个ClientLongPolling对象中了
加载配置到CompositePropertySource优先级是上到下递增的啊后面执行的配置覆盖掉前面的配置
handle((RefreshEvent) event)
拿到有变更配置的dataid返回给客户端
点进去看看,有个三目运算符
子流程
构造方法注入
listener instanceof AbstractConfigChangeListenerlistener是AbstractConfigChangeListener的子类才会执行下面的这个监听任务,暂时没找到用的地方,或许开发者自己可以扩展
applyInitializers(context)
执行监听器的lamda表达式
开启守护线程执行任务
这个i就是taskId,创建了两个对象(执行两个线程任务),第一个对象的taskId为0,第二个对象的taskId为1 ...
NacosContextRefresher
@ServiceAsyncNotifyService
persistService.findConfigMaxId()
builder.run() -> SpringApplication.run()
发布事件ApplicationEnvironmentPreparedEvent1. 先加载bootstrap.properties
从缓存配置cacheMap中拿该groupKey对应的CacheData,然后更新配置数据
查询数据库
/v1/cs/configs//listener
发布配置
发布环境变更事件并带上改变的参数值
发布配置后集群的每台机器都会请求到将dump data添加到tasks中用于增量dump的任务去调用
@PostConstructinit()
cacheMap中缓存这需要刷新的配置,将cacheMap中的数量以3000分一个组(Math.ceil(cacheMap.size() / 3000) - 向上取整),分别创建一个LongPollingRunnable用来监听配置更新
源码入口
将数据库的数据存磁盘并更新缓存
server
DumpConfigHandler.configDump(build.build())
this.context.publishEvent(new RefreshScopeRefreshedEvent())
读服务端的磁盘文件
将数据库中config_info这张表数据查询出来写到服务端磁盘
LocalDataChangeEvent
1. 优先获取本地数据
client的长轮询去获取server最新数据,以及server数据变更后会主动通过长轮询的通道push数据给client来确保client本地数据是最新的
executeAsyncInvoke()
listeners
添加到队列
ClientLongPolling.run()
for (Listener listener : listeners) { 往CacheData中添加监听器 }
PropertySourceBootstrapConfiguration#initialize
在spring-cloud-context包中实现,通过SPI机制加载SpringApplication初始化的过程汇总会通过SpringFactoriesLoader加载ApplicationContextInitializer类型的实现类到initializers中
@ConfigurationProperties(spring.cloud.nacos.config)对应配置文件的一个属性配置类
dumpConfigInfo(dumpAllProcessor)
checkLocalConfig(cacheData)
if (cacheData.isUseLocalConfigInfo()) { 缓存配置使用了本地配置则去检查配置是否有变更,有则去更新}
CommunicationController@GetMapping(\"/dataChange\")notifyConfigInfo
1. 检查本地配置(容错配置)for (CacheData cacheData : cacheMap.values()) { 检查本地配置(容错配置) 更新cacheData使用最新的本地配置 没有本地配置则设置isUseLocalConfig属性false}
/v1/cs/configs
每隔100ms执行任务控制台发布配置后服务端就是这个任务去存磁盘更新缓存的
注册一个Subcriber去处理ConfigDataChageEvent事件
task
agent属性new MetricsHttpAgent(new ServerHttpAgent(properties))
CacheData.setContent(ct[0])
客户端配置缓存
shared-configs
NacosConfigService可以认为就是client和NacosNamingService同级
ConfigController@PostMapping(\"/listener\")listener
ClientWorker
RefreshEventListener#onApplicationEvent
NacosDelayTaskExecuteEngine#addTask
Subscriber#onEvent()
event
subscriber.onEvent(event)
查询出config_info表当前的最大主键id的值用于全量dump完后跳出循环
2. 配置没有变更执行ClientLongPolling任务
NotifyCenter.publishEvent(new LocalDataChangeEvent(groupKey))
listeners.running(context)
执行任务
allSubs.add(this)
listenerWrap.lastCallMd5 = md5
2. 没有本地数据则去服务端拿配置数据
长轮询主要4个步骤:1. 检查本地配置,如果存在本地配置,并且与缓存的本地配置数据不一样,把本地配置内容更新到缓存,并触发事件2. 向nacos config server发出一个长连接,30s超时,nacos config server会返回有变化的dataid list3. 根据变化的dataid,从server拉取最新的配置内容,并更新本地快照和缓存4.对有变化的配置触发事件监听器来处理
checkConfigInfo()
locate(Environment env)
NacosRefreshHistory
发布事件
handle((ApplicationReadyEvent) event)
lamda表达式创建匿名类AbstractSharedListener然后调用ConfigService添加监听器
do long-polling
1. 将当前ClientLongPolling对象添加到队列中虽然这行代码在异步任务的下面但还是先执行的
分页每次查询出1000条配置
@ServiceLongPollingService
NotifySingleTask里的url看下
更新当前CacheData监听器的lastCallMd5为最新的md5
receiveEvent(event)
extract(this.context.getEnvironment().getPropertySources())
dumpAllProcessor.process(new DumpAllTask())
立即执行任务
添加缓存配置
两个线程池:1. 只有一个线程的线程池,用来执行定时任务,每隔10ms执行一次checkConfigInfo()2. 线程数等于处理器个数的线程池,用来执行LongPollingRunnable
MD5Util.getClientMd5Map(probeModify)
客户端和服务端配置数据md5比较
2. 异步任务延迟29.5s执行
new TaskManager(\"com.alibaba.nacos.server.DumpTaskManager\")
SpringApplication#run
this.registerNacosListenersForApplications()
EventPublishingRunListener.environmentPrepared(ConfigurableEnvironment environment)
全量Dump配置数据集群每台机器自己去查数据库的数据写入磁盘并更新缓存
2. 检查服务端配置,有变更则更新本地配置
NacosConfigService(Properties properties)
比较服务端客户端配置数据
nacosConfigManager.getConfigService()
init
客户端缓存配置数据和服务端的配置数据通过md5方式比对不一致的话就是收集到list最后返回给客户端
queue.take()
abstract DumpService
sb拼接了当前缓存配置的信息
super.destroy()
遍历容器中的ApplicationContextInitializer
TaskManager.super.processTasks()
执行lamda表达式的onEvent方法for (Member member : ipList) {遍历集群节点添加配置信息到队列要去给集群中的每台机器发请求}
sendResponse(changedGroups)
遍历allSubsallSubs.iterator()拿到ClientLongPolling对象
ConfigDataChangeEvent
执行lamda表达式的onEvent方法
写入磁盘
NacosConfigProperties
将配置信息封装到ConfigInfo对象中
LongPollingRunnable.run()
将获取到的配置添加到环境变量中这个propertySources是environment.getPropertySources()拿到的
清除scope里面的缓存,下次就会重新从BeanFactory获取一个新的实例(该实例使用新的配置)
反射
NacosConfigManager
ConfigCacheService
1. 数据有变更将groupkey收集到list相应给客户端
集群的每台机器都会去请求包括自己请求自己这台机器
从配置中心获取配置
RefreshScope.refreshAll()
将从服务端获取的配置数据保存客户到本地
通过反射创建NacosConfigService双重检测防并发保证NacosConfigService对象只创建一次
新建一个CacheData对象添加到缓存配置(cacheMap)中,并设置一些初始值。- 设置长轮询的taskIdcacheData.setTaskId(taskId)- 这个用途不清,暂不详细的抠lastCacheData.setInitializing(true)
不是去查mysql,而是去查本地磁盘的缓存,所以直接修改mysql配置是不行的,修改配置需要发布ConfigDataChangeEvent的事件,触发本地文件和内存的更新
setHealthServer(true)
/v1/cs/communication/dataChange
DefaultSubcriber
监听器接收事件并处理
DumpProcessor#process
ProcessRunnable.run()
new Runnable() {@Override public void run() {}}
NacosPropertySourceLocator
把原来的environment里面的参数放到一个新建的spring容器下重新加载,加载完后关闭新容器,这里就是获取参数的新值了
PersistService查数据库的
0 条评论
下一页