sentinel源码流程图
2024-01-02 10:15:01 1 举报
Sentinel源码流程图展示了其核心功能的实现过程。首先,用户通过调用Sentinel的API创建资源,如Redis、Nacos等。接着,Sentinel将这些资源信息注册到自身的资源管理器中。当资源发生异常时,Sentinel会捕获异常并进行处理,如限流、降级等。同时,Sentinel会将异常信息上报给控制台,以便用户查看和分析。此外,Sentinel还支持动态配置规则,用户可以根据需要调整限流、降级等策略。最后,Sentinel会定期检查资源的状态,确保资源正常运行。整个流程图展示了Sentinel如何实现对资源的监控、保护和自动恢复,为用户提供稳定可靠的服务。
作者其他创作
大纲/内容
context.getCurEntry().setBlockError(e)
这块还是有点问题的,应用做个集群的话只能推送给其中一台机器,需要先获取应用的所有机器然后再遍历推送到每个机器中
在这块源码,我进行了改造(sentinel 服务端重启后需要每个规则列表点一下才能在内存中加载规则数据,以及 ids 重新从 0 开始导致新添加的规则因为 id 相同从而覆盖掉旧的规则)在这里从 nacos 获取应用的 sentinel 规则添加到服务端的内存中,同时将内存 ruleStore 中的 ids 的值更新为当前规则的最大 id
更新sentinel 服务端推送过来的限流规则到本地内存/数据源
ruleProvider.getRules(app)
参考文档:https://github.com/alibaba/Sentinel/wiki/%E5%AE%9E%E6%97%B6%E7%9B%91%E6%8E%A7setRules:服务端推送数据给客户端
与客户端进行通信
客户端信息添加到服务端缓存后面服务端去查询客户端数据以及推送更新各种规则需要用到这里注册进去的客户端信息
DefaultNode链路节点
next
FlowRuleManager.loadRules(flowRules)
circuitBreaker.onRequestComplete(context)
yaml 配置文件配置
@Bean适配springmvc
return old
nacos 修改配置数据
1000ms
当前调用线程数 -1
FlowSlot#entry
将原来的规则清空
sentinel-transport-simple-http
此处扩展点推送规则直接发布到 Nacos 配置中心
只要有一个断路器没有校验通过那么就会抛出异常DegradeExceptionextends BlockException
sentinel dashboard
注册
null
SmartInitializingSingleton
commandCenter.start()
FactoryBean<NacosDataSource>
增加被规则限流的调用数
用于存储资源的统计信息以及调用者信息,例如该资源的RT,QPS,thread count 等等,这些信息将用作为多维度限流,降级的依据
for (ManagerListenerWrap wrap : listeners)
node.increaseThreadNum()
校验资源降级规则核心是断路器
AbstractDataSource
afterSingletonsInstantiated()
if (curCount + acquireCount > count) { return false;}return true;
super(NacosDataSourceFactoryBean.class.getName())
sentinel 源码改造将规则发布到 Nacos 配置中心
double currentRatio = slowCount * 1.0d / totalCount; if (currentRatio > maxSlowRequestRatio) { transformToOpen(currentRatio); }
sentinel 服务端与客户端拉取、更新交互源码sentinel 持久化机制源码、整合nacos 实现持久化源码sentinel 源码改造 - 使用 Nacos 进行持久化
创建 NacosDataSource 对象每种 sentinel 限流规则都会创建一个 NacosDataSource
更新sentinel rules 到客户端本地内存
AppInfo#addMachine
NacosDataSource
断路器是打开状态
preHandle
SentinelConverterConfiguration
客户端启动的接口服务,提供给dashboard查询数据以及接收各种规则使用
ConfigService#getConfig
getServerSocketFromBasePort(port)
抛规则校验异常BlockException
存储到sentinel server 的内存中
客户端源码入口
fireEntry
SimpleHttpCommandCenter.getHandler(commandName)
this.windowLengthInMs = intervalInMs / sampleCount;this.intervalInMs = intervalInMs;this.sampleCount = sampleCount;this.array = new AtomicReferenceArray<>(sampleCount);
2. 正常情况下,再执行StatisticSlot增加当前调用线程数
first
AutoRefreshDataSource#startTimerService
抛BlockException异常entry赋值blockError增加blockQps数
rollingCounterInSecond.addSuccess(successCount)
每种 sentinel 限流规则都会创建一个 ConfigService
漏桶算法
推送规则到对应的客户端机器中
catch (Throwable ex)
单机限流
WindowWrap
执行每个slot的entry方法
WarmUpController#canPass
执行定时任务
rollingCounterInSecond.addRT(rt)
machines.add(machineInfo)
AopAutoConfiguration@EnableAspectJAutoProxy
sentinel客户端api
currentProperty:DynamicSentinelProperty<List<FlowRule>>在 static 静态代码块中会去添加一个默认的 ListenercurrentProperty.addListener(new FlowPropertyListener())
ApiCommandHandler:获取访问的api uri列表ModifyRulesCommandHandler:setRules - 更新规则......
Root
FlowRuleUtil.buildFlowRuleMap(value)
注册 NacosDataSourceFactoryBean 这个 FactoryBean 的 BeanDefinition每种 sentinel 限流规则都会创建一个 NacosDataSourceFactoryBean
DegradeRuleManager.getCircuitBreakers(r.getName())
WritableDataSourceRegistry#getFlowDataSource
从数据源加载sentinel rules
cb.tryPass(context)
end
ReadableDataSource
loadConfig(readSource())
addInterceptors(InterceptorRegistry registry)
CommandHandlerProvider.getInstance().namedHandlers()
500ms
ModifyRulesCommandHandler#handle
DispatcherServlet分发请求的时候会去执行前置拦截器
rt > maxAllowedRt
EntranceNode 1
BeanDefinitionBuilder .genericBeanDefinition(dataSourceProperties.getFactoryBeanName())
以QPS为例
value - MetricBucket
machineDiscovery.addMachine(machineInfo)
this.serverSocket.accept()
windowStart
bean初始化后置处理器进行aop增强
sentinel最核心的部分在客户端,sentinel-core这个包是给到了客户端
聚簇节点增加线程调用数
loadInitialConfig()
bizExecutor.submit(eventTask)
count:1prioritized:false
HALF_OPEN
NacosFactory.createConfigService(this.properties)
SentinelAutoConfiguration
1. 先获取可写入的数据源
sentinel 源码改造从 Nacos 配置中心获取规则
扩展自己的数据源
node.increaseBlockQps(count)
counter.slowCount.add(1)
断路器状态
存储到3个Map 中
calculateTimeIdx(timeMillis)
.............
StatisticSlot#exit
handleStateChangeWhenThresholdExceeded(rt)
node.passQps()
执行链路逐个调用slot校验链条里的每一个校验规则的entry逻辑fireEntry 方法会触发下一个规则的调用
客户端长连接通信
@see com.alibaba.csp.sentinel.datasource.FileWritableDataSource#write(java.lang.Object)客户端如何将规则写入到本地文件中?1. 引入sentinel-datasource-extension依赖2. spi 机制,实现InitFunc接口,实现init方法,直接用 demo 工程中的 {@link com.alibaba.csp.sentinel.demo.file.rule.FileDataSourceInit#init()}
implements
RateLimiterController#canPass
校验不通过抛FlowException extends BlockException
while(true)接收服务端数据
sentinel客户端应用与sentinel-dashboard控制台交互流程源码剖析
遍历list,将一个个ProcessorSlot添加到链路中
repository.saveAll(rules)
counters[event.ordinal()].add(n)
extends
将 前面创建好的 Listener 添加到ConfigService 中便于后面配置更新
exit的链路执行子流程
currentProperty.updateValue(rules)
此处扩展点规则直从 Nacos 配置中心获取规则
flowRules.putAll(rules)
@EnableConfigurationProperties
new Thread(serverInitTask).start()
this.clusterNode.addPassRequest(count)
Default Node A
0
校验通过,执行链路的下一个slot
一个资源不管是不是在不同的Context都只有一个ClusterNode,用来统计该资源在所有Context的聚合数据
ParamFlowSlot
StatisticSlotCallbackRegistry.getExitCallbacks()
cacheData.checkListenerMd5()
申请一个entry,如果能够申请成功,则说明没有限流,否则会抛出BlockException,表示已经被限流了
ProcessorSlotChain
断路器是关闭状态说明校验通过
一个时间窗格中存放的数据 - 各统计指标的计数器
FlowPropertyListener#configUpdate
链路执行完走finally会去走一遍链路的exit方法
this.beanFactory .getBean(dataSourceName)
spring: cloud: sentinel: datasource: flow-rules: nacos: server-addr: nacosIp:8848 dataId: ${spring.application.name}-flow-rules groupId: SENTINEL_GROUP data-type: json rule-type: flow degrade-rules: nacos: server-addr: 123.60.150.23:8848 dataId: ${spring.application.name}-degrade-rules groupId: SENTINEL_GROUP data-type: json rule-type: degrade param-flow-rules: nacos: server-addr: 123.60.150.23:8848 dataId: ${spring.application.name}-param-flow-rules groupId: SENTINEL_GROUP data-type: json rule-type: param-flow authority-rules: nacos: server-addr: 123.60.150.23:8848 dataId: ${spring.application.name}-authority-rules groupId: SENTINEL_GROUP data-type: json rule-type: authority
0 - array索引
这也是一个扩展点
machines.remove(machineInfo)
每种 sentinel 限流规则都对应有自己的NacosDataSourceProperties
重置当前时间窗格的慢调用和总调用次数
一个扩展点
loadConfig()
SentinelDataSourceHandler
校验资源系统规则
将 configListener 添加到 ConfigService 中
SpiLoader.loadPrototypeInstanceListSorted(ProcessorSlot.class)
flowDataSource 默认是 null赋值方法: WritableDataSourceRegistry#registerFlowDataSource使用时可参考 demo 工程中的:FileDataSourceInit#init()
SimpleMachineDiscovery#addMachine
StatisticSlot#entry
从指定的客户端机器中拉取规则
allRulesmachineRulesappRules
EntranceNode入口节点
为factoryBeanName 赋值
将规则发布到Nacos 配置中心
parseOrigin(request)
com.alibaba.nacos.client.config.impl.ClientWorker#checkConfigInfo
慢调用比例
调用链路
初始化 FactoryBean创建出 NacosDataSource
慢调用比例大于降级规则中配置的阈值则打开断路器
StatisticNode统计节点
计算当前时间戳所在时间窗口数组的下标
spring-cloud-starter-alibaba-sentinel.jar里的spring.factories里的SentinelAutoConfiguration
Default Node ...
遍历所有的监听器更新 sentinel 配置的规则到本地
从 nacos 数据源中读取 sentinel 规则配置
AOP 代理的方式
执行监听器的方法
ClusterBuilderSlot
订单微服务02
listener.receiveConfigInfo(contentTmp)
FlowControllerV1#apiQueryMachineRules
执行链路中的每个ProcessorSlot的exit方法StatisticSlot和DegradeSlot有逻辑
SystemSlot
服务端从客户端本地内存拉取 sentinel 的限流规则
主流的数据源 sentinel 都实现了,引入对应的依赖就行了(看 sentinel-extension 下的工程)
@BeanJson的处理
api调用
将客户端的应用和机器信息注册进 sentinel 服务端,同一个应用 app 名一样则机器信息会添加进该 app 的机器链中
@Bean这个bean是用来整合进RestTemplate的
NacosDataSourceFactoryBean#getObject
增加当前节点的线程调用数
DataSourcePropertiesConfiguration
增加时间窗格里的请求通过数
DefaultNode
updateNextRetryTimestamp()
写入本地文件
WritableDataSource#write
loadConfig()
执行目标对象方法
getProperty().updateValue(newValue)
HttpCommandUtils.getTarget(request)
getContextName(request)
appManagement.addMachine(machineInfo)
@CommandMapping(name = \"setRules\")
抛Throwable
avgUsedTokens(node)
客户端主动发送心跳信息给dashboard保活
LogSlot
LogSlot#entry
NacosDataSource.this.parser.convert(configInfo)
@SentinelResource
sentinel-core包下通过spi机制去加载ProcessorSlot并对所有加载到的ProcessorSlot进行排序@SpiOrder指定的值越小越靠前最后返回排好序的Lst<ProcessorSlot>
SentinelWebAutoConfiguration实现了WebMvcConfigurer,容器启动的时候会去调用其实现方法addInterceptors方法注册拦截器
super.addPassRequest(count)
initNacosListener()
ruleType 为 flow 为例
pointcut注解是@SentinelResourced的方法进行增强
ClusterBuilderSlot#entry
ResponseTimeCircuitBreaker
FlowRuleNacosPublisher#publish
这里是一个扩展点
EntranceNode ...
@Bean
/registry/machine
1. 先执行后面的规则校验
BlockException
创建一个 BeanDefinition 它的 beanClass为NacosDataSourceFactoryBean
this.clusterNode.increaseThreadNum()
StatisticSlotCallbackRegistry.getEntryCallbacks()
检查配置是否有变化
ServiceLoaderUtil.getServiceLoader(InitFunc.class)
根据commandName拿到对应的CommandHandler
StatisticSlot
子流程
resetStat()
从本地文件读取 sentinel 规则数据
ServerSocket监听8719端口
api调用(定时查询客户端访问情况,推送各种规则到客户端)
SentinelBeanPostProcessor
构建Context添加EntranceNode
OPEN则直接return
AtomicReferenceArray<WindowWrap<MetricBucket>>
rt > maxAllowedRt如果请求响应时间大于降级规则设置的最大rt时间慢调用次数 +1
构建指定资源的排好序的ProcessorSlot链路每个资源都有自己独立的chain
listener.configUpdate(newValue)
FileRefreshableDataSource
lookProcessChain(resourceWrapper)
当前qps如果大于count阈值返回false,校验不通过
订单微服务01
执行注解@SentinelResource属性blockHandler指定的方法
sentinel 整合 spring mvc 的方式
将客户端机器信息添加到该 app 应用信息的 machines 机器链 Set 中
发布规则到指定应用的指定机器
fromHalfOpenToClose()
1
NodeSelectorSlot
publishRules(entity.getApp())
断路器在OPEN状态下,经过一个熔断周期,断路器变为半开状态去重试一次请求的时间
根据 type 来区分是什么规则(流控、降级 ......)这里只拿流控规则来画流程图,其它规则都是一样的先将推送过来的流控规则反序列化为流控规则对象
new Listener() 匿名类
客户端
断路器半开状态一般都是尝试着调用了一次请求,如果这次调用响应时间依然超过最大rt阈值则将断路器打开rt > maxAllowedRt
获取当前资源对应的断路器资源的每个降级规则都会去创建一个断路器
NodeSelectorSlot#entry
SlotChainProvider.newSlotChain()
super.increaseThreadNum()
抛业务异常
AuthoritySlot
spi机制去加载SlotChainBuilder默认是DefaultSlotChainBuilder
return flowDataSource
这个链路建立的算法的理解
if (totalCount < minRequestAmount) { return; }
从当前时间窗口里取统计指标数据我这里以QPS为例子
commandCenter.beforeStart()
DynamicSentinelProperty#updateValue
dataSource.write(value)
\t@PostConstructinit() 如果spring.cloud.sentinel.eager设置为true那么在该自动配置类初始化的时候就会去和dashboard服务端建立通信而不用懒加载在第一次申请entry的时候才会去建立通信
ruleProvider.apply(resource.getName())
校验资源授权规则黑白名单的校验
增加规则校验通过调用数
root根节点是一个特殊的Entrance Node,存放在常量池(看Constants类中的ROOT常量),所有的Node以Root为根节点形成一个树状结构 --> 存放在常量池,这棵树挺厉害的,每个DefaultNode还会指向一个ClusterNode,这些Node的父类又是StatisticNode,因此具有了统计数据的能力
断路器状态改变
服务端源码入口
registerCommands(handlers)
callExitHandlersAndCleanUp(context)
从请求中拿到commandName
对这个数据结构的理解
http://localhost:8720/api
spring-cloud-starter-alibaba-sentinel.jar里的spring.factories里的SentinelWebAutoConfiguration
apps 有该 app 对应的 AppInfo 则返回该对象没有则给该 app 创建一个新的 AppInfo 对象并添加到 apps 这个ConcurrentMap 中
ip 是客户端的机器ipport 是客户端何服务端通信的端口号,默认 8719 占用则 +1
MetricBucket
LongAdder[]- 计数器数组
pjp.proceed()
SystemSlot#entry
return
FlowRuleNacosProvider#getRules
MachineRegistryController#receiveHeartBeat
发布到 Nacos -> Nacos 推送配置到客户端-> 更新 sentinel 规则
http://ip:port/getRules
array就是放了一个个时间窗格的时间窗口数组
node.decreaseThreadNum()
slotChainBuilder.build()
LeapArray<MetricBucket>
DynamicRulePublisher
校验资源流控规则
node.addPassRequest(count)
rollingCounterInSecond.addBlock(count)
总调用次数 +1
catch (BlockException ex)
尝试着去调用一次请求
http://ip:port/setRules
HandlerInterceptor
将最新的流控规则列表转成以资源名为key,对应的流控规则列表为value 的Map
令牌桶算法
entry.exit()
0ms
DefaultController#canPass
最后会去清理Context和Entry、恢复调用栈,说实话这里不是很懂,应该是和这个调用链路有关系的,后面再琢磨吧
ArrayMetric
ServerThread.run()
从内存中获取客户端的机器的所有流控规则
实例化rollingCounterInSecond为例
HeartbeatSenderInitFunc.init()
FlowRuleManager.register2Property(dataSource.getProperty())
DegradeSlot#entry
@AspectSentinelResourceAspect
SentinelWebInterceptor
serverInitTask.run()
NacosDataSourceFactoryBean
windowStart == old.windowStart如果old时间窗格的起始时间和依据当前时间计算出的时间窗格起始时间相等,那么这个old 时间窗格就是最新的时间窗格,当前时间也就是落在这个时间窗格内,直接返回old
CommandCenterInitFunc.init()
构造方法
calculateWindowStart(timeMillis)
nacos 配置变化推送数据到客户端时会执行nacos 的监听器更新数据
DegradeSlot
FileWritableDataSource#write
WindowWrap<T> old = array.get(idx)
长连接拉取配置
old为null如果old时间窗格为空,则生成一个新的时间窗格,并通过CAS操作将新的时间窗格添加到时间窗口数组中
ExceptionCircuitBreaker
时间窗格中的PASS类型的计数器 +1时间窗格里的请求数据统计实际是放在counter数组里的,不同的下标代表不同的统计指标
curThreadNum.increment()
添加并排序order越小越靠前
StatisticNode- rollingCounterInSecond- rollingCounterInMinute
rollingCounterInSecond.pass() / rollingCounterInSecond.getWindowIntervalInSec()
currentWindow(TimeUtil.currentTimeMillis())
spi加载InitFunc接口sentinel-transport-common包和sentinel-core包
子流程获取当前时间戳所在的时间窗格slot
windowStart > old.windowStart如果根据当前时间计算出的时间窗格的起始时间大于old时间窗格的起始时间,那么就将old时间窗格重置,变为当前时间应该落入的时间窗格
@Bean切面类
windowLengthInMs
如果超过了熔断的时间周期,那么断路器改为半开状态,尝试着调用一次请求retryTimeoutArrived() && fromOpenToHalfOpen(context)
SentinelProperties#datasource
负责收集资源的路径,并将这些资源的调用路径以树状结构存储起来,用于根据调用路径来限流降级
wrap.value().addPass(count)
WindowWrap<MetricBucket> wrap = data.currentWindow()
FlowControllerV1#apiAddFlowRule
ClientWorker.LongPollingRunnable#run
2. 将新的流控规则写入数据源
machines set 机器链先清除该机器信息看下 MachineInfo 的 equals 和 hashCode 方法知道 MachineInfo 比较的是 app、ip、port
timestamp:2300 ---> idx: (2300/200)%5=1 ===> old,窗格索引位置是1代表第二个窗格(B1) ---> windowstart: 2200 ===> 当前时间戳所在窗格的起始时间 - 此时 windowStart > old ---> 需要去重置这个 old bucket,更改起始时间为2200,value重置 ---> 发现没,这5个窗格bucket建好后,不会再去创建bucket,只需要修改它的起始时间就能实现 滑动效果,这样就不需要去不断重复的去创建对象了 - 还有个相当重要的一点(非常重要),重置这个bucket之后,逻辑上讲这个窗格跑到了最后一个位置, 但是,实际上它的物理位置(也就是这个bucket在数组中的索引位置)是不变的 - 上一点很重要,怪不得我计算idx后感觉对不上位置,修改的只是每个bucket的属性值,索引位置是不变的呀 - circle array
从时间窗口数组里获取依据当前时间计算出的数组下标对应的old时间窗格
AuthoritySlot#entry
排队等待
否则 rt <= maxAllowedRt半开状态尝试调用的一次请求的响应时间没有超过最大rt阈值则将断路器关闭
根据每个rule的设置来确定对应的Node用于获取统计的数据
滑动时间窗口计数实现
执行注解@SentinelResource属性fallback指定的方法
rollingCounterInSecond.addPass(count)
实现
执行lambda表达式获取当前资源的流控规则
查询规则列表
flowRules.clear()
(old) B0 B1 B2 NULL B4 |_______||_______|_______|_______|_______|_______||___ ... 1200 1400 1600 1800 2000 2200 timestamp
执行对应的CommandHandler
fromHalfOpenToOpen(1.0d)
如果总调用次数小于降级规则中配置的最小请求数,那么不会去改变断路器的状态,直接return
创建 Nacos 的 ConfigService 对象
epository.save(entity)
DefaultProcessorSlotChain.addLast((AbstractLinkedProcessorSlot<?>) slot)
try...catch
ClusterNode簇点节点
@CommandMapping(name = \"getRules\")
context.getCurEntry().setError(e)
appInfo.addMachine(machineInfo)
DefaultProcessorSlotChain
Default Node B
从数据源读取数据并转成对应的 rule 对象
AutoRefreshDataSource
将 nacos 推送进来的 sentinel 限流规则配置 json 数据转成sentinel 规则实体对象
异常调用比例异常调用数
counter.totalCount.add(1)
响应给服务端
匿名类
EntranceNode:在 ContextUtil.enter(xxx) 的时候就创建了,然后塞到 Context 里面。NodeSelectorSlot:根据 context 创建 DefaultNode,然后 set curNode to context。ClusterBuilderSlot:首先根据 resourceName 创建 ClusterNode,并且 set clusterNode to defaultNode;然后再根据 origin 创建来源节点(类型为 StatisticNode),并且 set originNode to curEntry。
Node
HttpEventTask.run()
如何将 sentinel 限流规则写入 Nacos?需要改造 sentinel dashboard 的 controller 的源码
for (CircuitBreaker cb : circuitBreakers) { 遍历资源的所有断路器,进行校验}
FetchActiveRuleCommandHandler#handle
构建链路
Listener#receiveConfigInfo
sentinel客户端与服务端dashboard的交互过程
executor.submit(new ServerThread(serverSocket))
遍历执行每个 InitFunc
CLOSED
warmup
加载最新的流控规则到客户端本地内存中
遍历当前资源的所有流控规则进行校验只要一个校验不通过都会抛出BlockException
这块对于Socket不是很熟,基本面能够猜到是怎么做的,几个Socket的细节还是得看下基础
AbstractSentinelInterceptor
FlowSlot
nacos 作为数据源 实现sentinel 的规则持久化
将 datasource 的SentinelProperty 属性注册到对应的 sentinel rule manager 中
NacosDataSourceProperties
commandHandler.handle(request)
InitExecutor.doInit()
加载 Env 的静态代码块
w.func.init()
写入数据源
添加流控规则
客户端机器信息添加到 machines 机器链中
sentinel-dashboard
将新的规则放入
计算出总的慢调用次数和总调用次数slowCount += counter.slowCount.sum(); totalCount += counter.totalCount.sum();
dataSourceProperties.postRegister(newDataSource)
计算当前时间所在时间窗格的起始时间
赋值configListener创建一个新的Listener
每中限流规则都有自己的 Listener 来处理,这里以流控规则举例
ConfigService#publishConfig
子流程增加调用完成数和响应时间
SentinelProperties
通过 SPI 加载 CommandHandler 接口刷选出有 @CommandMapping 注解的 CommandHandler
客户端需要引入如下依赖<dependency> <groupId>com.alibaba.csp</groupId> <artifactId>sentinel-datasource-nacos</artifactId></dependency>
快速失败
https://github.com/alibaba/Sentinel/wiki/Sentinel-%E6%A0%B8%E5%BF%83%E7%B1%BB%E8%A7%A3%E6%9E%90这个文档还是得看一下能解惑许多东西
DegradeSlot#exit
0 条评论
回复 删除
下一页