微服务源码汇总
2022-08-25 15:06:34 0 举报
nacos-注册中心、配置中心 sentinel
作者其他创作
大纲/内容
chain.exit
作为初始化@Bean参数
instanceController.beat
cache.md5 = md5; cache.lastModifiedTs = lastModifiedTs; NotifyCenter.publishEvent(new LocalDataChangeEvent(groupKey));
通过nacos控制台修改配置发布POST 请求 /v1/cs/configs
SlotChainProvide.newSlotchain()
next
为每一个dataID注册监听器
逐个调用slot校验链条里的每个校验规则的退出exit逻辑
@PostConstruct
null
2.同步集群其它节点
service.init()
发布ApplicationReadyEvent事件
serviceRegistry.register(this.getRegistration)
springboot 提供的加载配置文件的接口
这个添加的拦截器会拦截所有的WEB请求的URL
如果断路器是半打开状态currentState.get() == State.HALF_OPEN
获取新的参数值,并和之前的比较,找出变化的参数值
调用退出
新增配置
client
本地没取到,远程调用配置中心获取配置信息
SmartApplicationListener
调用异常数及比例判断逻辑
false查本地磁盘文件
拦截把stock-service换成真实的IP
this.registerNacosListenersForApplications()
如果断路器是关闭状态currentState.get() == State.CLOSE
DistroProtocol
1将实例数据更新到内存注册表
定制RestTemplate(将LoadBalancerInterceptor封装到RestTemplate的拦截器集合里去)
statisticSlot.exit
serviceManager.registerInstance
key 判断 是否临时数据
抽取SYSTEM,JNDI,SERVLET之外的所有参数变量
NacosAutoServiceRegistration
本节点是否leader
计算当前时间窗口的统计数据
serviceManager.removeInstance
@Bean
spring容器启动refresh()
抛BlockException
抛业务异常Throwable会调用处理注解属性fallback方法
degradeSlot.exit
PushService.onApplicationEvent(ServiceChangeEvent event)
if (instance.isEphemeral()){添加一个延时的心跳任务BeatTask}
Instannce(order)127.0.1.1
Ribbon原理
NacosConfigBootstrapConfiguration
将service的全量实例instance写入内存注册表
DataChangeTask#run
int listenerSize = cacheMap.size(); int longingTaskCount = (int) Math.ceil(listenerSize / ParamUtil.getPerTaskConfigSize())
服务端返回接受的心跳信息null,NamingResponseCode.RESOURCE_NOT_FOUND重新注册
logSlot
standalone部署,并且使用的是内置数据库derby
createEmptyService()
增加抛出非限流Block异常调用数
Fegin原理
服务发现
SH-cluster
注册
rt maxAllowedRt
node.increaseThreadNum()
刷新容器中@RefreshScope 标记的类
Instannce(stock)127.0.1.1
handFallBack
InnerWorker.run()
Instannce(pay)127.0.0.1
long currentMaxId = persistService.findConfigMaxId();
DistroConsistencyServiceImpl.init()
first
MetricsHttpAgent#httpGet
HZ-cluster
1.事件是否是ApplicationReadyEvent
是否临时实例ephemeral: true(默认) 是 存内存false 否 存文件
正常情况,则增加当前调用现场数
HttpClient.asyncHttpPostLarge
SentinelWebAutoConfiguration
queue.take()
SpringApplication#run()
集群模式下轮询获取一个服务节点调用
再次比较md5返回
初始化
长轮询
NacosRestTemplate#get
listeners.running(context)
获取配置中心服务ConfigService
通知订阅者
dumpAllProcessor.process(new DumpAllTask())
bind(event)
新节点启动后,从集群其它节点一次性同步数据
SentinelWebAutoConfiguration实现接口WebMvcConfigurer容器启动时会调用其实现方法addInterceptor
NacosConfigService(Properties properties)
sentinel
调用service的实例心跳接口(HttpMethod.POST)
cacheMap放着需要刷新的配置,已3000为一个组
isRefreshEnabled()
fireEntry
源码入口
NacosFactory.createConfigService
实现ApplicationListener接口的类spring容器启动时会调用处理事件的方法
刷新记录
断路器再次打开,按配置的熔断时长更新断路器的熔断周期
资源降级规则
@LoadBalanced
@SentinelResource
prepareEnvironment
isFixedPolling()
chain.entry
SystemSlot
RaftCore
扫描FeginClient注解的类注入spring容器
ServiceManager.init()
Group交易微服务分组
handle(pair)
慢调用比例如果超过配置阈值将断路器打开
NacosRegistration
Instannce(EMS)127.0.1.2
将FeginClient注解的类的信息交给工厂bean代理类并将代理类的定义注册到spring容器
loadExtConfiguration(composite)
spring-cloud-starter-alibaba-nacos-config.jar里的spring.factories文件里
集群节点同步
异步更新内存注册表
调用真实实例接口
DumpService
distroComponentHolder.findDataStorage(type).getDistroData(getDistroKey())
用于分页每次取1000条数据,刷进磁盘和内存
Instannce(order)127.0.0.2
ConfigController#getConfig
对资源逐条校验每个规则
LoadBalancerClient
LoadBalancerInterceptor
hostReactor.getServiceInfo
startDistroTask()
allinstances.addAll(persistenInstances)allinstances.addAll(ephemeralInstances )
nacosConfigManager.getConfigService()
优先使用本地配置
GlobalExecutor.submitLoadDataTask
实现
检查本地配置(容错)
每10s执行一次
canPassCheck
rt = maxAllowedRt
refreshContext(context)
DistroSyncChangeTask.run()
如果超过15s没有收到心跳将Healthy属性设置false
ApplicationListener
notifier.run()
raftStore.write(datum)
加载nacos配置中心的配置信息
扫描EnableFeginClients标签里信息并注册
2.同步实例信息到nacos server 集群其它节点
添加服务健康检测任务
dumpChangeProcessor.process(new DumpChangeTask())
LoadBalancerFeginClient
doSrvIpxt
获取
获取的结果保存到本地
ClientWorker.LongPollingRunnable#run()
Instannce(EMS)127.0.1.1
return
loadNacosDataIfPresent
2.事件是否是RefreshEvent
再尝试调用一次请求
end
对变化的配置,调用监听器处理
NamingService. registerInstance
Instannce(EMS)127.0.0.2
/distro/datum
Service库存服务
NameService. getAllInstances()
断路器关闭currentState.get() == State.CLOSE
同步写实例数据到文件
RestTemplate
extends
cacheData.checkListenerMd5()
对注册的代理工厂bean调用getObject()方法获取真实的代理类
PropertySourceBootstrapConfiguration#initialize
/instance/beat
@AspectSentinelResourceAspect
checkLocalConfig(cacheData)
持久化配置到mysql
Instannce(stock)127.0.0.1
NacosContextRefresher
客户端工作类
延迟29s执行
ApplicationContextInitializer
service.processClientBeat(clientBeat);
ExternalDumpService#init
CtSph.entryWithPirority
将实例数据更新到内存注册表
逐个调用slot校验链条里的每一个校验规则的entry逻辑
加载配置合并到CompositePropertySource
distroProtocol.sync
DistroLoadDataTask.run()
往阻塞队列tasks放入注册的实例数据
重置慢调用和总调用次数为0
调用service的服务发现接口(HttpMethod.GET)传入的参数里有客户端的udp端口,这个是方便服务端实例有变化了通过udp的方式同步给客户端
发布RefreshEvent事件,监听器RefreshEventListener
直接返回客户端NamingResponseCode.RESOURCE_NOT_FOUND
NacosConfigManager#createConfigService
第一次调用服务接口时根据服务名去服务端获取
慢调用比例降级逻辑判断
分页查询
单机限流
获取当前时间的统计时间窗口
implement
implements
增加时间窗口里的请求通过数
AbstractAutoServiceRegistration
/instance
NacosPropertySourceLocator
配置写入磁盘
监听事件变化,回调listener
向nacos服务端发送长连接30s超时返回有变化的dataId
instanceController.register
rt maxAllowedRt如果请求响应时间超过最大rt时间把慢调用次数加1
校验资源系统规则
nacos服务注册中心
利用CountDownLatch实现一个简单的raft协议,必须半数以上的节点写入成功才会返回给客户端
把原来的environment里的参数,放到一个新的Spring context容器下重新加载,完事后关闭新容器,这里就获取参数的新值了
发布环境改变事件,带上变化的参数值
监听ApplicationReadyEvent事件
返回注册时写入得到实例属性
spring容器初始化过程中bean初始化后置处理过程中对AOP的增强处理
SentinelWebInterceptor
FlowSlot.entry
BlockException
ConfigCacheService.dump
NameSpace(Dev)
Instannce(stock)127.0.1.2
DistroTaskEngineHolder
spring-cloud-alibaba-nacos-discovery.jar里的spring.factories文件里EnableAutoConfiguration对应的NacosServiceRegistryAutoConfiguration
putService(service)
NacosConfigProperties
校验资源授权规则
Instannce(pay)127.0.1.2
while (lastMaxId < currentMaxId)
onApplicationEvent
从时间窗口数组里获取依据当前时间计算的下标对应的old时间窗口
定时执行
同步数据到集群其他节点
Group仓库微服务分组
ClientLongPolling clientSub = iter.next()
allSubs.add(this)
如果超过30s没有收到心跳剔除改实例
校验通过,继续触发slot链上的下一个规则
counter.slowCount.add(1)
distroMapper.responsible(serviceName)
DistroController.onSyncDatum
refreshEnvironment()
从内存Map获取当前服务,没有创建
排队等待
SpringApplication#run
用RestTemplateCustomizer定制所有的RestTemplate
ProcessRunnable.run()
nacos更新注册表内存方法里,为了防止读写并发冲突,大量运用copyOnWirte思想。具体做法就是把原内存结构复制一份,操作完最后再替换回真正的注册表内存里去Eureka防止防止读写并发冲突用的是注册表的多级注册缓存结构,只读缓存,读写缓存,内存注册表,各缓存之间定时同步,客户端感知的及时性不及nacos
发布服务变化的事件
GlobalExecutor.submitDistroNotifyTask(notifier)
task.run
NacosServiceRegistry
有一个被@LoadBalanced注解的RestTemplates(装了有一个被所有@LoadBalanced注解的RestTemplate)
初始化的构造函数里会,订阅了LocalDataChangeEvent事件
RestTemplateCustomizer
不通过
否持久话数据阿里CP模式Raft协议
handle((ApplicationReadyEvent) event)
//增加总调用次数slowCount+=counter.slowCount.sum();totalCount+=counter.totalCount.sum();
lookProcessChain
如果old时间窗口为空,则新生成一个时间窗口,并放入时间窗口数组里。
deleteIp
获取资源相关的校验规则
令牌桶算法
span style=\"font-size: inherit;\
Instannce(order)127.0.1.2
DefaultProcessorSlotChain
stock-service2
load()
ConfigController#publishConfig
将新注册实例加入对应服务service的实例列表中去
存在不一样的直接返回
ClientWorker
PropertySourceLocator#locateCollection
pair = tasks.take()
BeatTask
否
int target = distroHash(serviceName) % servers.size()
/v1/cs/listener
nacos Serverstock-service192.168.0.1:8001192.168.0.1:8001
LongPollingService#LongPollingService()
AuthritySlot
distro/datums
addInterceptors
Server
调用service的实例同步接口(HttpMethod.PUT)
线程从tasks里取任务执行
checkFlow
readTimeoutMs 45sheaders参数Long-Pulling-Timeout 30sLong-Pulling-Timeout-No-Hangup true(首次,后面false)
node.addPassRequest(count)
processor.process(task)
curThreadNum.incrementAndGet()
添加监听器
调用Feign的接口通过代理类最终调用LoadBalancerFeginClient的execute方法
校验资源流控规则
是
serviceRegistry是NacosServiceRegistry的实例
如果依据当前时间计算出的时间窗口起始时间大于old时间窗口起始时间则将old时间窗重置,变为当前时间应该落入的时间窗口
NacosRestTemplate#postForm
微服务调用关系
开启一个任务ClientBeatProcess,更新客户端实例最后更新时间
consistencyService.put
ribbonstock-service192.168.0.1:8001192.168.0.1:8001
以Udp方式将服务变动通知发给订阅的客户端
如果实例不存在重新注册(例:网络不通导致实例在服务端被下线或服务端重启临时实例丢失)
SPI加载slot chain加载sentinel-core包下META-INF/services里对应文件的类
ServerStatusReporter.run()
加载顺序:1 文件名(微服务名称)2 文件名.文件扩展名3.文件名-profile.文件扩展名有相同的配置项,后面加载的会覆盖前面加载的
beatReactor.addBeatInfo
NacosConfigAutoConfiguration
serviceProxy. seadBeat(beatInfo)
全量dump配置信息
true查数据库
增加规则校验通过调用数
同步的发起机器就是做健康检查任务的那台机器
和历史对比Md5不相同
NameProxy.syncData
addConfigFilesToEnvironment()
增加当前调用线程数
handFailedTask()
根据变化的dataId向nacos服务端获取配置数据,更新本地快照
加载共享配置文件sharedConfigs
allSubs.remove(ClientLongPolling.this)
NacosConfigManager
客户端长轮询,出现异常,延迟2s执行
RefreshEventListener#onApplicationEvent()
SystemSlot.entry
滑动时间窗
调用service的实例注册接口(HttpMethod.POST)
配置中心属性配置类,对应bootstrap.yml
获取配置数据
获取当前时间对应的时间窗口
比较md5,不一样的记录在changedGroup
new DistroDelayTaskExecuteEngine()
在spring-cloud-context包中实现,通过SPI机制加载:SpringApplication初始化过程中会通过SpringFactoriesLoader加载ApplicationContextInitializer类型的实现类到initializers中
Service物流服务
注册服务实例信息在集群节点间同步
不成功重试
加载当前应用配置
nodeSelectSlot
SentinelWebInterceptors.preHandle
当前时间对应的时间窗口
调用完处理断路器状态
定时获取客户端最新服务数据更新到本地
是否刷新配置,默认true
Instannce(pay)127.0.1.1
instanceController.list
计算当前时间窗口数组下标
nacos配置中心
Service支付服务
Instannce(EMS)127.0.0.1
构建资源的slot校验链条,每个资源都有自己独立的校验链条有点类似Netty的pipeline
lastMaxId = id > lastMaxId ? id : lastMaxId;
degradeSlot
否则,执行长连接任务
init()
将临时的注册实例更新到cluster的ephemeralInstances 属性上去,服务发现查找临时实例最终从内存里找到的就是这个属性
迭代allSubs队列
registry.addInterceptor
集群节点之间相互同步节点状态。如果有节点宕机了,集群其他节点会感知到并更新集群节点状态,这个会影响心跳任务机器选择的计算
LoadBalance()用jdk动态代理生成代理
内部有两个map,一个clusterMap,一个dataMap。1.取dataMap数据2.拿clusterMap实例健康状态,心跳信息更新实例3.添加新实例
缓存配置信息的Md5到内存中并发布LocalDataChangeEvent事件
statisticSlot
循环从阻塞队列tasks取数据处理
service为null
listener就是Service实例
MetricsHttpAgent#httpPost
this.getRegistration是NacosRegistration实例
OPERATE_LOCK.lock()
springboot 启动时调用PropertySourceLocator.locate(env)用来加载配置信息
LoadBalancerAutoConfiguration
subscriber.onEvent(event)
EventPublishingRunListener#running
start()
PropertySourceLocator
Instannce(pay)127.0.0.2
spring-cloud-start-alibaba-sentinel.jar里的spring.factories文件里SentinelWebAutoConfiguration
NamingProxy.getAllData(targetServer)
entry.exit()
用服务名称hash后对机器数取模,确定执行健康检查的机器
UpdateTask
checkConfigInfo()
counter.totalCount.add(1)
增加被规则限流的调用数
漏桶算法
startLoadTask();
clusterBuilderSlot
快速失败
NacosPropertySourceLocator#locate
拦截
抛Throwable
addConfigInfo
ClientBeatCheckTask
遍历集群中不包括自己的机器,向其发送同步数据的请求,只要有一台获取数据成功,就不再向其它机器请求
getPushService().serviceChanged(this)
instance = serviceManager.getInstance
rollingCounterInSecond.addPass(count)
触发后面的规则校验
Instannce(order)127.0.0.1
update
queue.put(task)
会更新lastRefTime为当前时间
FeignClientFactoryBean(代理Bean)
增量
@AutoConfigurationBefore
DistroData distroData = transportAgent.getDatumSnapshot(each.getAddress())
延时执行的定时任务更新客户端的服务缓存
总调用次数加1
nodeSelectSlot.entry
false(默认)
insert
RibbonAutoConfiguration
时间窗口里的请求数据统计放在一个counter数组里,不同的下标代表不同的数据统计指标
getDistroMapper().responsible(service.getName())
instance为空
ropertyUtil.isDirectRead()
调用service的实例注销接口(HttpMethod.DELETE)
GET请求
stock-service1
发布一个LocalDataChangeEvent事件
生成
查找oldConfigInfo更新当前配置保存旧的配置到历史记录
dumpConfigInfo(dumpAllProcessor)
LongPollingService.ClientLongPolling#run()
1.告诉客户端配置变更
instance.setHealthy(false)
Sphu.entry(resourceName)
clientSub.sendResponse(Arrays.asList(groupKey))
spring 构建bean的过程会执行带有@PostConstruct
ServerStatusReporter.init()
onApplicationEvent(ApplicationReadyEvent event)
加载配置中心配置文件bootstrap.ym
Spring的扩展点之一,在ConfigurableApplication Context#refresh()之前调用,通常用于需要对应用上下文做初始化的web应用中,比如根据上下文环境注册属性源或激活配置文件等
passLoaclCheck
clientBeat为空
如果总调用次数小于配置最小请求数,则不修改断路器状态,直接返回
继承
遍历查询结果
AuthritySlot.entry
executorService.execute(new LongPollingRunnable(i))
Instannce(stock)127.0.0.2
计算当前时间窗口的起始时间位置
FlowSlot
FeginAutoConfiguration
调用NacosConfigService的构造器
handBlockException
AbstractSharedListener#receiveConfigInfo()
如果old时间窗口起始时间跟依据当前时间计算出的时间窗口起始时间相同,则当前时间应该落在old时间窗内,直接返回old时间窗
lastMaxId=0
serviceProxy. registerService
ephemeralInstances = toUpdateInstances
putServiceAndInit(service)
如果feignClient没有配url
查询mysql获取最大主键值
抛规则校验BlockException会调用处理注解属性blockHandle方法
/v1/cs/configs
ConfigController#listener
心跳周期5s间隔
响应配置发生变化的key
NacosRefreshHistory
Service订单服务
instance.setLastBeat(System.currentTimeMillis())
SlotChainBuilder.build
断路器打开currentState.get() == State.OPEN
/instance/list
listener.receiveConfigInfo(contentTmp)
RibbonRequest
MetricsHttpAgent
写入磁盘
registerFeignClient
最终整合ribbon去实现负载均衡
nacos这种推送模式,对于Zookeeper那种通过tcp长连接来说会节省很多资源,就算大量节点更新也不会让Nacos出现太多的性能瓶颈,在Nacos中客户端如果接受到了udp消息会发送一个Ack,如果一定时间内nacos-server没有收到Ack,那么还会进行重发,当超过一定时间后,就不在重发了,虽然通过udp不能保证真正的送达订阅者,但是Nacos还有定时轮询作为兜底,不需担心数据不会更新的情况。Nacos通过这两种手段,既保证了实时性,有保证了数据更新不会漏掉
ServiceReporter.run()
集群节点状态同步
HealthCheckReactor.scheduleCheck(clientBeatCheckTask)
NacosPropertySourceBuilder#loadNacosData
/raft/datum/commit
statisticSlot.entry
如果超过熔断的时间周期将断路器改为半开状态,尝试着调用一次请求
负责收集资源的路径,并将这些资源的调用路径以树状结构保存起来,用于根据调用路径来限流降级
PropertySourceLoader
SELECT max(id) FROM config_info
增加调用完成数和调用执行执行时间RT减少当前调用线程数
NotifyCenter.publishEvent(event)
currentState.get() == State.OPEN
OPERATE_LOCK.unlock()
用来向Nacos server发请求的代理
true
extract(this.context.getEnvironment().getPropertySources())).keySet()
AsyncNotifyService#onEvent
spring-cloud-start-alibaba-sentinel.jar里的spring.factories文件里SentinelAutoConfiguration
degradeSlot.entry
加载扩展配置文件extensionConfigs
Nacos集群启动新节点时向其它节点拉取数据同步流程
loadSharedConfiguration(composite)
prepareContext
order-service
Warm up
run方法
restTemplate.getForObject(stock-service)
。。。。
worker.process(task)
handle((RefreshEvent) event)
申请一个entry,如果能申请成功,则说明没有限流否则抛出BlockException
raftProxy.proxyPostLarge(将服务转发到集群的leader节点)
nacos clientstock-service192.168.0.1:8001192.168.0.1:8001
RibbonLoadBalancerClient
。。。。。
是阿里AP模式Distro协议
服务端的push模式,推送给客户端
服务注册
sendResponse(null)
0 条评论
回复 删除
下一页