eureka总结
2021-06-30 16:12:21 0 举报
个人针对netfix中整理的1.7的eureka
作者其他创作
大纲/内容
registry.getApplicationDeltas()
返回null,证明这个是新的任务,如果不是nbull,则证明该key原先已经绑定一个任务
new AcceptorRunner()
EurekaServerContextHolder
将reprocessQueue中的元素取出来放入pendingTasks和processingOrder中,满了则情况reprocessQueue?
EurekaClient
组装
将pendingTask取出来添加到batchWork中
每60s执行一次
创建分发器
2.2 每30s发送心跳
调用框架
有数据,返回
取出task列表
eurekaTransport.registrationClient.register(instanceInfo)
创建缓存服务ResponseCacheImplResponseCacheImpl
执行任务
这个是远程服务实例如果本地服务都挂了,可以去远程拉去实例的意思
replicationClient.submitBatchUpdates(list);
是
刷新下内存中的ip:port等信息
readWriteCacheMap.get(key);
请求的方法类
Lease<InstanceInfo>
拉取是否成功
PeerAwareInstanceRegistryImpl#replicateInstanceActionsToPeers
生成一个编码器
peerEurekaNodes.start()
初始化
taskExecutor
fetchRegistryFromBackup()
MeasuredRate.start()
initializedResponseCache()
A
ResponseCacheImpl#generatePayload
不为空,则update
这里会判断是否是复制方法addExtraHeaders(requestBuilder) 这里设置的
registry.get(registrant.getAppName())
PeerReplicationResource#batchReplication
fetchRegistry
batchingDispatcher
创建
2. 初始化调度任务(注册逻辑在这里)
G 构建上下文
InstanceResource#renewLease
registry.openForTraffic();
父类注册核心逻辑
1、重新处理的队列
这里只会失效readWriteCache,并不会失效readOnlyCache,后续通过定时器同步两个缓存
封装一下
本地增删改localRegionApps
延迟40s后开始
previousTask == null
返回数据是否为空
BatchWorkerRunnable
batchingDispatcher.process();
增量拉取定时器
否
判断pendingTasks是否大于maxBufferSize
续约计数器
是否有值
实例化的时候启动
workerThread.start();
一个个处理
每一分钟的时候更新lastBucket,并重新设置currentBucket
创建一个任务
isFull()
instanceRegionChecker
更新心跳阈值
B
EurekaInstanceConfig
useReadOnlyCache是否先拿缓存
getTasksOf(holders)
保存到localRegionsApps
1
MeasuredRate
获取配置的集群地址resolvePeerUrls()
反写
获取本地和保存在本地的远程区域的registry.getApplications()+remoteAppsDelta
ServerCodecs
加入处理队列
acceptorThread.start()
getApplicationsInternal(\"apps/delta\
cacheKey(ALL_APPS_DELTA)
所有服务不可用的时候,总备份类中获取,在创建的时候传进来,其实目前是没实现的
createReplicationListOf(tasks)
H
设置环境变量加载properties文件
replicationTask
write.lock();
processor.process(tasks)
getApplicationsInternal(\"apps/\
assignBatchWork()
实际走的
initScheduledTasks()
每隔30s刷新下readOnlyCacheMap
initEurekaServerContext核心流程
expectedNumberOfRenewsPerMinnumberOfRenewsPerMinThreshold
eurekaTransport
instanceInfoReplicator.start();
E 创建服务感知
scheduleRenewalThresholdUpdateTask()
PeerEurekaNode是集群的节点,当一个服务注册上来的时候,通过这个复制到其他节点
每隔60s
updateRenewalThreshold()
serverContext
SingleTaskWorkerRunnable
InstanceInfo
这里会过滤旧的不在配置文件中的地址
DefaultEurekaServerConfig
有则判断下本地的是否更新,是则取本地的
变化队列中获取recentlyChangedQueue
2、请求的队列
discoveryClient.refreshInstanceInfo()
2
DiscoveryClient#renew
registry.init(peerEurekaNodes);
getCacheUpdateTask()
计算拉回来的hashcodegetReconcileHashCode
drainAcceptorQueue();
提交
如果不一致,则重新全量拉取reconcileAndLogDifference()
apps
DefaultEurekaClientConfig
覆盖续约
recentRegisteredQueuerecentlyChangedQueue
peerEurekaNodes
initRemoteRegionRegistry()
http请求工具
ResponseCacheImpl#get()
instanceInfoReplicator
实例信息
是否全量抓取
super.postInit()
eurekaTransport.queryClient.getDelta(remoteRegionsRef.get())
discoveryClient.register();
hasEnoughTasksForNextBatch
ApplicationsResource#getContainers
AbstractJersey2EurekaHttpClient#sendHeartBeat
根据服务名获取到该服务的注册表为空则创建并返回去
ResponseCacheImpl
保存在本地区域内
判断是否本地区域
registry.syncUp()
是,更新只读缓存并返回
acceptorQueue.add()acceptedTasks++;
先去ApplicationsResource.getApplicationResource再去ApplicationResource.getInstanceInfo 该类接受
PeerAwareInstanceRegistry
是否有数据
全量获取registry.getApplications()
readOnlyCacheMap.get(key)
C
2.1 默认30s刷新注册表(delta)
AbstractConfiguration
批量提交
serverContext.initialize();
注册到其他集群上
F 创建节点集群实例
CacheRefreshThread
没有,证明是新加入实例更新下心跳阈值跟频率
心跳线程池
deltaRetentionTimer
创建PeerEurekaNode
recentCanceledQueuerecentCanceledQueue
drainInputQueues()
将处理中的task从等待中移除?
2.3 创建一个同步器,每30s同步一次
将localRegionApps中的实例注册到registry
initEurekaEnvironment初始化环境变量
ApplicationResource#addInstance
判断是否足够一个批次队列中大于最大size或者最新一个的延迟时间大于设定的时间对应getWork()
cacheKey(ALL_APPS)
将task加入processingOrder
是否获取远程区域的
acceptorExecutor(acceptorQueue)
3、 阻塞等待加入
invalidateCache();
D
该注册实例是否已经存在注册表中
依次同步给其他实例PeerAwareInstanceRegistryImpl#replicateToPeers
DefaultEurekaTransportConfig
1. 抓取注册表单机不走这里
ApplicationsResource#getContainerDifferential
缓存线程池
new EvictionTask()
失效缓存
是,全量再来
overriddenTasks++
创建变化队列
更新下每次续约的频率和时间renewsLastMin.increment();Lease#renew
AbstractInstanceRegistry#register
pendingTasks.remove(processingOrder.poll());
drainReprocessQueue();
获取本地和保存在本地的远程区域的变化的实例recentlyChangedQueue+remoteAppsDelta
getWork()
eurekaTransport.queryClient.getApplications(remoteRegionsRef.get())
applicationInfoManager
该玩意用来处理注册、心跳、下线及将这些状态复制到其他机器用的,服务也保存在这里!!
这里会根据心跳更新时间,来将过期的服务失效,这里有服务保护功能的判断,而且有个bug,原先需要90s就可以过期,但这里判断了90*2s之后才算过期,另外,这里会根据阈值失效某些服务,但是不会一次性全部失效了
HeartbeatThread
EurekaConfigBasedInstanceInfoProvider
封装成TaskHolder
添加到最新变化队列
PeerAwareInstanceRegistryImpl#register
getAndUpdateDelta抓取更新的注册表
getAndStoreFullRegistry全量抓取注册表
heartbeatExecutor
DiscoveryClient#updateDelta
EurekaBootStrap#contextInitialized
cacheRefreshExecutor
获取acceptorExecutor中的workQueue中的TaskHolder列表
每隔15分钟
同步到其他服务上去
regionNameVSRemoteRegistry
1.1
DynamicPropertyFactory
appendTaskHolder
转换成ReplicationList
processingOrder.add()
0 条评论
回复 删除
下一页