Eureka源码-详细版
2021-10-30 21:44:18 0 举报
netflix eureka源码剖析
作者其他创作
大纲/内容
InstanceInfoReplicator#onDemandUpdate
evictionLimit = 总实例数-总实例数*0.85(摘除服务实例限制)
目的是为了防止因为服务器时间或者gc原因导致定时任务晚于配置执行任务时间间隔(60s)
修改服务实例摘除时间evictionTimestamp
acceptorQueue
初始化配置EurekaServerConfigBeanimplements EurekaServerConfig
初始化eureka上下文initEurekaServerContext
5、从eureka server中抓取注册表(fetchRegistry)
PeerAwareInstanceRegistry#register进行注册
初始化注册对象InstanceRegistryextends PeerAwareInstanceRegistry
peerEurekaNodes.start()eureka集群启动
初始化RefreshablePeerEurekaNodesextends PeerEurekaNodes
2、读取到数据直接返回
nonBatchingDispatcher创建非批处理任务分发器
读写缓存: readWriteCacheMap
创建EurekaServerContext对象里面包含了eureka server中的所有信息(注册表、服务信息等)
初始化配置EurekaClientConfig加载eureka-client.properties
从只读缓存是否读取到数据
ALL_APPS全量
发送请求 http://localhost:8080/v2/apps抓取全量注册表数据
4、StatusChangeListener状态变更监听器ApplicationInfoManager.StatusChangeListener
http://localhost:8080/v2/peerreplication/batch/
registry.init(peerEurekaNodes)初始化本地注册表
获取需要摘除的服务实例数expireLeases
invalidateCache失效缓存-主动过期
guava中的LoadingCache默认180s失效读写缓存中的数据
2、调用discoveryClient.register注册
是
renew()执行续约心跳逻辑
ResponseCacheImpl#generatePayload读写缓存获取注册表信息
BatchWorkerRunnable多任务线程
EurekaConfigBasedInstanceInfoProvider.get基于构造器模式创建InstanceInfo对象此方法加了synchronized,说明系统中获取到的InstanceInfo对象都是同一个
和只读缓存数据比较如果不一致,则覆盖只读缓存
6、初始化实例状态覆盖规则
如果上一分钟续约次数<期望的发送心跳阈值,则进入自我保护机制
把监听器注册到应用信息管理器中applicationInfoManager.registerStatusChangeListener
默认的NotImplementedRegistryImpl对抓取注册表没有实现逻辑,可以自定义抓取,实现BackupRegistry接口,通过参数: eureka.backupregistry来指定自定义类即可
5、最近一分钟复制次数numberOfReplicationsLastMin
否
执行缓存更新定时任务默认30s执行一次
ApplicationResource.addInstance
http://localhost:8080/v2/apps/{appName}/{id}发送心跳
初始化eureka server编码器CloudServerCodecs
初始化配置EurekaClientConfigBeanimplements EurekaClientConfig
修改服务实例状态为DOWN
AbstractJerseyEurekaHttpClient#registerhttp://localhost:8080/v2/apps/appName
EurekaServerAutoConfiguration基于springboot自动装配
5、把读取到的数据返回
Register
Heartbeat
InstanceResource.cancelLease服务端处理下线接口
1、刷新服务实例信息2、刷新leaseInfo信息
ApplicationResourceTest#testGoodRegistration 可以基于单元测试进行debug
1、初始化线程池
2
InstanceResouce.renewLease服务端
释放线程调度资源cancelScheduledTasks
基于接口方法对外提供配置信息获取
3、最近一分钟续约数次数 renewsLastMin
三层队列批处理机制
初始化后台任务处理线程池TaskExecutors
recentlyChangedQueue添加数据到最近变更的队列中
ResponseCacheImpl#get(Key)KEY: ALL_APPS_DELTA
增量抓取注册表时也是基于读写缓存获取注册表
EurekaAutoServiceRegistrationeureka自动注册
初始化服务注册表实例对象PeerAwareInstanceRegistry
把创建好的EurekaServerContext交给EurekaServerContextHolder方便在整个系统中可以随时获取
2、初始化心跳线程池
基于HttpReplicationClient复制的client,添加头信息x-netflix-discovery-replication: true;其他的eureka server接收到请求之后只在本地执行,而不会再向其他的eureka server同步
把这些任务打成一个任务包list,放入batchWorkQueue
replicateToPeers同步数据到其他的eureka server
InstanceReplicationTaskAction.Cancel
初始化eureka环境initEurekaEnvironment
1、时间超过500ms2、超过250个任务
EurekaServiceRegistry#register启动时即注册
计算期望每分钟续约次数阈值updateRenewsPerMinThreshold
EurekaServerInitializerConfiguration
加入
把上面获取到的eureka上下文放到servletContext中
启动一个定时任务,默认10分钟执行一次 peersUpdateTask更新维护本地的集群node
1、注册使用的客户端EurekaHttpClient registrationClientSessionedEurekaHttpClient
SingleTaskWorkerRunnable单任务线程
4、从读写缓存中获取数据把数据写入只读缓存
基于Action行为来进行任务分发
获取队列中的批任务
初始化服务注册EurekaServiceRegistryimplements ServiceRegistry
PeerEurekaNode.register
初始化CloudEurekaClientextends eureka.DiscoveryClient
6、span style=\"font-size: inherit;\
内部监听器notify中逻辑
判断Key的值
关闭和eureka server网络通信资源
初始化EurekaClient(DiscoveryClient)
把任务放入pendingTasks
注入
初始化EurekaHealthCheckHandlereureka健康检查器
ApplicationsResource#getContainers基于jersey,接受请求
InstanceReplicationTaskAction.Heartbeat
InstanceResource.cancelLease
PeerAwareInstanceRegistryImpl#cancel
EurekaClientAutoConfiguration
eureka-server启动入口web.xmlEurekaBooStrap监听器
初始化配置EurekaInstanceConfigBeanimplements EurekaInstanceConfig
ApplicationsResource#getContainerDifferential基于jersey,接受请求
PeerEurekaNode.heartbeat
等待从其他eureka server中获取注册表,如果没有获取到,默认会重试5次,每次间隔30s
更新
初始化读写缓存readWriteCacheMap
7、初始化调度任务initScheduledTasks
3、未读取到数据
eureka源码
2、查询使用的客户端EurekaHttpClient queryClientSessionedEurekaHttpClient
初始化应用信息管理器ApplicationInfoManager
AbstractInstanceRegistry#register实际执行最终注册的代码方法
PeerReplicationResource#batchReplication批量复制
调用服务下线接口unregister
InstanceInfoReplicator#run进行服务注册
LeaseManager#renew
spring容器启动时执行onApplicationEvent基于serviceUrl创建PeerEurekaNode
2、定时心跳任务HeartbeatThread默认30s执行一次
初始化执行器acceptorExecutor
Action
AbstractInstanceRegistry#getApplicationDeltasFromMultipleRegions() 获取增量注册表数据
heartbeat
初始化客户端EurekaDiscoveryClientimplements cloud.DiscoveryClient
调用evict()摘除方法
启动摘除服务任务EvictionTask默认60s执行一次
createPeerEurekaNode创建peerEurekaNode
refreshRegistry刷新注册表
创建EurekaServerBootstrap
定时更新每分钟续约次数阈值默认15分钟执行一次(updateRenewalThreshold)
ConfigurationManager基于double check + volatile来创建单例
调用ReplicationTaskProcessor的process方法处理任务
cancel
PeerEurekaNode.cancel
失效缓存responseCache.invalidate
增加最近一分钟续约次数
http://localhost:8080/v2/apps/delta抓取增量注册表数据
register
服务实例数 * (60 / 期望客户端发送续约间隔(默认30s)) * 续约百分比(默认0.85)(numberOfRenewsPerMinThreshold)
1
是否开启了自我保护默认true
调用internalCancel摘除服务
如果某个服务实例已经在本地存在,获取到的服务实例的注册时间小于本次服务实例的最新更新时间,则使用本地服务实例信息,注册到本地注册表中,否则使用拉取过来的服务实例
EurekaServerBootstrap#contextInitialized初始化eureka server环境和上下文
创建acceptorThread线程并启动
InstanceResource.renewLease
未获取到数据,则全量抓取
1、缓存刷新任务CacheRefreshThread默认30s执行一次
1、eureka server启动2、DiscoveryClient初始化3、服务注册4、eureka集群: 注册表同步,多层队列批处理5、全量拉取注册表: 多级缓存机制6、增量拉取注册表: 一致性hash校验机制7、心跳机制: 服务续约8、服务下线: cancel(手动调用)9、服务故障摘除: evictionTask(bug)10、自我保护机制: 网络故障
ResponseCacheImpl#getCacheUpdateTask缓存刷新任务,每30s执行一次, 遍历只读缓存,如果只读缓存和读写缓存中的数据不一致,则用读写缓存数据覆盖只读缓存数据-被动过期
updateDelta span style=\"font-size: inherit;\
第一次全量抓取注册表DiscoveryClient#getAndStoreFullRegistry
只读缓存: readOnlyCacheMap
5、InstanceInfoReplicator.start() span style=\"font-size: inherit;\
InstanceReplicationTaskAction.Register
遍历注册表
2、初始化最近注册服务实例队列recentRegisteredQueue
初始化应用管理器ApplicationInfoManager
初始化eureka server上下文DefaultEurekaServerContext
AbstractInstanceRegistry#getApplications() 获取注册表中的全量信息
从acceptorQueue获取数据
过期判断Lease#isExpired: System.currentTimeMillis() > (lastUpdateTimestamp + duration + additionalLeaseMs(补偿时间))
batchingDispatcher创建批处理任务分发器
1、读取数据
调用父类postInit()方法,启动服务摘除任务
ReplicationTaskProcessor初始化复制任务处理器
遍历所有的Listeners调用notify方法
调用父类的下线方法internalCancel
Cancel下线
初始化配置EurekaServerConfig加载eureka-server.properties
3、实例化InstanceInfoReplicator实例信息副本组件,也是个线程
1、初始化最近下线服务实例队列recentCanceledQueue
更新最近一次的心跳时间(当前时间戳+90s)Lease#lastUpdateTimestamp
getAndUpdateDelta增量抓取注册表
AbstractInstanceRegistry#renew
ResponseCacheImpl#invalidate失效掉读写缓存readWriteCacheMap
可以禁用只读缓存,配置参数: eureka.shouldUseReadOnlyResponseCache默认为true
创建jerseyClientHttpReplicationClient
基于EurekaInstanceConfig初始化服务实例对象InstanceInfo
遍历只读缓存
ApplicationInfoManager#setInstanceStatus
如果本地计算出来的hash值和服务端返回的hash值不一致,则重新全量抓取注册表
把下线的服务实例加入recentCanceledQueue
从本地注册表中删除服务实例
初始化注册表缓存接口对象ResponseCache
4、30s执行一次从recentlyChangedQueue摘除最近三分钟变更的服务实例
基于本地全量的注册表数据生成一个hash值,和服务端返回的hash做比较
3、初始化缓存刷新线程池
计算摘除的服务实例的最大数量evictionLimit
EurekaServerContext.initialize启动集群、初始化本地注册表
计算补偿时间getCompensationTimeMs
ALL_APPS_DELTA增量
EurekaDiscoveryClientConfiguration
recentRegisteredQueue添加数据到最近注册的队列中
此处有个bug: 由于心跳时,修改lastUpdateTimestamp为当前时间 + duration(默认90s), 而此处判断时又加了一个duration,导致整个服务实例故障摘除时间为90s * 2=180s,而不是90s
HttpReplicationClient#submitBatchUpdates批量发送任务
spring cloud集成eureka源码
registerHealthCheck注册健康检查处理器
把服务实例加入recentlyChangedQueue
registry.openForTraffic启动检查服务实例是否故障宕机
初始化配置EurekaInstanceConfig加载eureka-client.properties
创建PeerEurekaNodes对象可以理解为eureka集群(用来管理每个PeerEurekaNode生命周期)
ResponseCacheImpl#get(Key)KEY: ALL_APPS
http://localhost:8080/v2/apps/{appName}/{id}DELETE请求
@EnableEurekaServereureka server启动入口
1、遍历recentlyChangedQueue(只保留了最近三分钟有状态变更的服务数据)队列中的数据,把数据封装为Applications2、针对本地全量的注册表数据,计算出一个hash值,同时返回给客户端
把任务放入processingOrder队列
1、阈值大于02、最近一分钟续约次数()大于阈值
是否抓取到数据
收藏
0 条评论
下一页