Eureka源码分析
2021-10-09 17:06:29 0 举报
eureka服务端和客户端的源码运行流程图
作者其他创作
大纲/内容
设置实例的状态
释放锁,返回批量处理的任务队列
为开启直接从读写缓存中获取实例信息
当重试请求的队列不为空并且任务没有满时
执行run方法
发布eureka可以注册的事件
(evictionTimestamp > 0 || System.currentTimeMillis() > (lastUpdateTimestamp + duration + additionalLeaseMs))
执行任务的run方法
long now = System.currentTimeMillis()
重新修改实例的dirty标志位
heartbeatTask = new TimedSupervisorTask( \"heartbeat\
更新每分钟续约数量的阈值
start()
new ReplicationClientAdditionalFilters
httpResponse = eurekaTransport.registrationClient.register(instanceInfo)
为空,说明这是个新的实例,以前注册表中并没有,更新续约数量,注册一个新的实例
registrant = existingLease.getHolder()
开始执行expectedNumberOfRenewsPerMin定时更新 默认60s一次每分钟统计下当前1分钟内接收到了多少个心跳请求,并且存储起来
taskHolder != null
payload = currentPayload
如果线程中断退出
bindEIP()
生成EIP绑定对象
Set<String> toShutdown = new HashSet<>(peerEurekaNodeUrls); toShutdown.removeAll(newPeerUrls)
isInstanceInfoDirty = true; lastDirtyTimestamp = System.currentTimeMillis();
isShutdown.get()
@Bean
使该实例的缓存失效
由于客户端希望剔除这个新的实例,因此减少一个发送续订的客户端数量
设置延迟时间
applicationInfoManager.refreshLeaseInfoIfRequired()
遍历实例信息列表
read.unlock()
Heartbeat
返回true
为空没找到服务返回false
new EurekaServiceRegistry()
Register
Lifecycle
renew()
是否启动标志默认false
peerEurekaNodes.start()
!isSuccess(statusCode)
publish(new EurekaRegistryAvailableEvent(getEurekaServerConfig()))
maybeInitializeClient(reg)
不为空获取实例信息
判断实例信息是否为空
for (Application app : apps.getRegisteredApplications())
这是一个>而不是>=因为如果时间戳相等,我们仍然使用远程传输的InstanceInfo而不是服务器本地副本
run()(TimedSupervisorTask)
将失败重试的请求入队到任务队列
如果任务队列满了,从有序链表中移除第一个任务id,丢弃最老的任务
InstanceInfo instanceInfo = leaseToCancel.getHolder()
通过CAS改变启动标志位
获取补偿时间
计算保留最少保留的服务实例数量
返回状态
不为空,将批处理的任务列表加入到队列中
统计过期的任务数量
singleItemWorkRequests.release();return singleItemWorkQueue;
eurekaServerContext()
run()
scheduleRenewalThresholdUpdateTask()
responseCache.get(cacheKey)
更新服务状态
new TimerTask()
开启eureka server注解
是否开启只读缓存默认为true
增量更新
reg.getApplicationInfoManager()\t\t\t\t.setInstanceStatus(reg.getInstanceConfig().getInitialStatus())
instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds())
implements
最大的缓冲大小maxElementsInPeerReplicationPool(10000),批处理请求的最大个数batchSize(250),批处理的最大线程数maxThreadsForPeerReplication(20),在分派一批任务之前等待新任务的最长时间maxBatchingDelayMs(500ms),服务阻塞等待的时间serverUnavailableSleepTimeMs(1000ms),如果网络故障,则等待retrySleepTimeMs(100ms)重试,
将任务添加到任务map中,id相同会覆盖并返回以前的任务
eurekaTransport.queryClient.getApplications(remoteRegionsRef.get())(RestTemplateEurekaHttpClient)
boolean success = fetchRegistry(remoteRegionsModified)
获取要处理的任务并从任务队列中移除
获取执行任务阻塞或超时的时间
AutoServiceRegistrationConfiguration自动服务注册
ServletContextAware
dirtyTimestamp != null
任务没完成将上次任务取消
springboot类似SPI的机制,一些配置类会放在这个文件下
Renews threshold = 服务实例总数 *(60/续约间隔)*自我保护续约百分比阈值因子。Renews(last min) = 服务实例总数 * (60/续约间隔)
Y,都为空,说明集群节点没有改变
如果已存在的实例最后修改的时间戳大于要注册的那么就用已存在的替换要注册的
获取要注册的实例的最后修改时间
注册表信息
!isShutdown.get() && result == null
getCacheUpdateTask()
instanceInfoReplicator.onDemandUpdate()
registrant.setLastUpdatedTimestamp()
batchWorkRequests.release();return batchWorkQueue;(AcceptorExecutor)
overriddenTasks++
不是成功返回
SmartLifecycle
InstanceStatus instanceStatus = overriddenInstanceStatusMap.remove(id)
将读写缓存的值更新同步到只读缓存中
this.shouldUseReadOnlyResponseCache = serverConfig.shouldUseReadOnlyResponseCache()
为空,释放锁
heartbeatTask心跳续约
REGISTER.increment(isReplication)
刷新缓存
从读写缓存中获取值
int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs()
!isShutdown.get()
开启一个定时任务,默认一分钟执行一次
更新续约时间
!isEIPBound
当失败重试的请求队列不为空,接受请求的队列不为空或者正在执行的任务map为空时
pendingTasks.size() >= maxBufferSize(1000)
int registrySizeThreshold = (int) (registrySize * serverConfig.getRenewalPercentThreshold())
将服务实例加入到最近取消的队列中
Congestion
自身节点不需要复制
构建一个批处理任务执行器
判断接受请求的队列是否为空
创建服务拉取的任务cacheRefreshTask并监听
!reprocessQueue.isEmpty() && !isFull()
如果在,从列表中删除,将该节点关闭
如果scheduleTime 当前时间当前时间加上延时时间
将该任务放入到执行的任务队列中,id相同会覆盖
holders.size() < len && !processingOrder.isEmpty()
!processingOrder.isEmpty()
private ApplicationInfoManager applicationInfoManager
初始化eureka环境配置信息
long expiryTime = System.currentTimeMillis() + getLeaseRenewalOf(info)
调用任务处理器处理任务
为空,从读写缓存中获取
如果不为空,从注册表中移除
更新实例最近修改时间
使缓存失效
Applications applications = getApplications()
getContainers
StatusUpdate
加入到最近注册的循环队列中
pendingTasks.size() >= maxBufferSizedelay >= maxBatchingDelay
payload = readWriteCacheMap.get(key)
EurekaClientAutoConfiguration
Success
run()(CacheRefreshThread)
更新状态
服务注册
return
initializedResponseCache()
instanceInfo.setLastUpdatedTimestamp()
判断期望收到客户端续约的总数 (服务的总数)是否大于0
drainReprocessQueue()
获取任务id
successCounter.increment()
Y,不为空,说明有节点需要关闭下线,遍历节点列表,获取每一个节点
this.scheduledPeriodicRef = new AtomicReference<Future>()
long expiryTime = System.currentTimeMillis() + maxProcessingDelayMs
count++
hasEnoughTasksForNextBatch()
更新实例状态
返回成功处理返回信息
获取绑定策略,默认是EIP
ProcessingResult.Congestion
new RefreshablePeerEurekaNodes
更新续约数并开启剔除定时任务
PeerEurekaNode eurekaNode = newNodeList.get(i)
创建一个Jersey的客户端对象,主要用来进行网络通信发送请求
ApplicationResource
(EIPManager)start()
this.applicationContext.publishEvent(event)
从集群节点中获取将要关闭的节点urls
Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp()
判断是否允许后台监听改变,默认为true
没有过期加入到任务队列中
refreshRegistry()
clientConfig.shouldRegisterWithEureka()
上次修改时间改为当前时间加上90s
释放读锁
this.serviceRegistry.register(this.registration)(EurekaServiceRegistry)
开启集群节点的同步
apps/GET
@Autowired
Y,不为空,说明有节点需要新加入到集群,遍历要新加的节点列表
!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())
从url地址中获取端口信息
task.handleSuccess()
获取最大的同步复制时间,默认30s
如果正在执行的任务中包含该id,说明已经有该任务但是还没执行结束,统计覆盖的任务数量
没有停止,从Acceptor监听请求的线程中获取任任务队列里要执行的TaskHolder
直接返回退出循环
@Bean\tpublic Marker eurekaServerMarkerBean() {\t\treturn new Marker();\t}
加上读锁
break
从缓存中获取服务信息
开启一个守护线程负责监听请求
获取时间补偿的这个方法是防止网络波动,JVM的stop world造成的任务没有按照设定好的时间进行执行,弥补这个现象造成的影响
从只读缓存中获取值
Set<String> toAdd = new HashSet<>(newPeerUrls); toAdd.removeAll(peerEurekaNodeUrls)
实例化
用单处理分发器消费
构建一个同步任务处理的对象
EurekaAutoServiceRegistration
readWriteCacheMap.invalidate(key)
如果读写缓存和只读缓存中的值不一样
每30s执行一次
InstanceInfo instanceInfo = leaseToRenew.getHolder()
从相邻的eureka节点同步复制注册表中并返回数量
判断是否允许该服务注册到eureka上,默认为true
switch (action)
latestPeriodic != null && !latestPeriodic.isDone()
从overriddenInstanceStatusMap移除该实例
小于,直接进行同步节点数据
singleItemWorkQueue.add(holder)
实例状态更新
心跳请求
心跳请求PUT
applicationInfoManager.setInstanceStatus(status)
不为空直接返回
自启动以来监测的实例注册的数量加1
updateRenewsPerMinThreshold()
switch (result)
将eureka服务上下文信息存入到ServletContext
renewLease
先从shuffledInstances属性里获取,如果没有再从instances属性获取
判断返回的状态码
先从只读缓存中获取实例
N
run()(HeartbeatThread)
刷新实例信息
future = executor.submit(task)
run()(InstanceInfoReplicator)
TransientError
从重试请求任务队列里获取并移除最后一个任务
initScheduledTasks()
Y
(EurekaServerBootstrap)contextInitialized(ServletContext context)
没有集群节点或者已经复制直接返回
最大的缓冲大小maxElementsInStatusReplicationPool(10000)个,处理的最大线程数maxThreadsForStatusReplication(1),在分派一批任务之前等待新任务的最长时间maxBatchingDelayMs(500ms),服务阻塞等待的时间serverUnavailableSleepTimeMs(1000ms),如果网络故障,则等待retrySleepTimeMs(100ms)重试
boolean success = register()
new TaskExecutors<>(idx -> new SingleTaskWorkerRunnable<>(\"TaskNonBatchingWorker-\
心跳续约请求
apps/deltaGET
获取上次的结果
new InstanceRegistry
开启一个eureka自我保护的定时任务,默认15分钟一次更新每分钟续约数量的阈值
主要负责网络抖动和服务故障失败后的重试
notify
将实例信息放入注册表中
通过rest发送注册请求
addInstance
registrySyncRetries默认为0,因为在eureka server初始化启动的时候不需要同步节点数据
leaseToCancel.cancel()
return count
如果获取到了数据
EurekaDiscoveryClientConfiguration
创建instanceInfoReplicator任务
localRegionApps.get()
EurekaServerContextHolder.initialize(this.serverContext)
scheduleTime <= now
synchronized (lock)
clientConfig.shouldOnDemandUpdateStatusChange()
服务注册请求POST
开启同步实例,服务注册的定时任务第一次延迟40s执行后面默认30s执行一次
true同步数据到其他节点
检查实例是否可以在region中注册
extends
false直接返回
node.register(info)
batchingDispatcher.shutdown(); nonBatchingDispatcher.shutdown(); replicationClient.shutdown();
atchWorkRequests.release()
Value currentPayload = readOnlyCacheMap.get(key)
通过信号量尝试获取锁
useReadOnlyCache
leaseToRenew == null
作为参数传入
开始eureka server的初始化
taskHolder.getExpiryTime() <= now
EurekaHttpResponse<ReplicationListResponse> response = replicationClient.submitBatchUpdates(list)
通过Jersey提交批量任务发送网络请求
registrant.setStatusWithoutDirty(overriddenInstanceStatus)
Optional.ofNullable(shuffledInstances.get()).orElseGet(this::getInstancesAsIsFromEureka)
如果是404,代表之前该服务不在注册表中那么就直接注册
不为空说明新的任务覆盖之前的任务
private EurekaClient eurekaClient(DiscoveryClient)
将任务构建成一批ReplicationTask的列表
判断任务是否覆盖
通过rest请求,发送心跳
instanceInfo.unsetIsDirty(dirtyTimestamp)
根据覆盖的状态规则设置状态
获取客户端本地的服务信息
run()(AcceptorRunner)
statusCode == 503
scheduler.submit(new Runnable()
previousTask == null
获取实例上次更新时间
首先检查 这个实例是starting 还是down的。然后在检查已知明确的override,最后我们检查已经存在续租实例潜在的状态
scheduleTime < now
getApplicationsInternal(\"apps/\
如果不为空说明实例信息已经修改过重新注册服务
获取实例信息
getContainerDifferential
没有任务,等待10ms
this.readWriteCacheMap = CacheBuilder.newBuilder().initialCapacity
返回分发器对象
调用run方法
通过rest向服务端发送请求
通过更新收回时间取消租约
如果不为空,从队列中获取请求加入到任务map中
依赖注入
(EvictionTask)run()
全量拉取更新
将读写缓存中的数据写入到只读缓存中
this.maxProcessingDelayMs = config.getMaxTimeForReplication()
int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound()
this.expectedNumberOfClientsSendingRenews = this.expectedNumberOfClientsSendingRenews + 1
delay.set(timeoutMillis)
!overriddenInstanceStatusMap.containsKey(registrant.getId())
拉取服务的时间间隔,默认为30s
@ConditionalOnBean(EurekaServerMarkerConfiguration.Marker.class)
InstanceResource
使读写缓存失效
return false
Y判断overriddenInstanceStatusMap是否包含有该实例的id
遍历applications队列
super.postInit()(AbstractInstanceRegistry)
Y,都为空,在接受请求的队列阻塞10秒钟获取数据
ProcessingResult.PermanentError
执行定时任务,默认30s执行一次
将任务请求入队
在AWS云中处理EIP绑定过程
recentlyChangedQueue.add(new RecentlyChangedItem(leaseToCancel))
不为空,获取已存在的实例的最后修改时间
for (InstanceInfo instance : app.getInstances())
获取对应的规则
获取缓存更新时间间隔,默认30s
DeleteStatusOverride
清空注册表信息
cacheValue != currentCacheValue
获取最大批处理数量和要处理任务数量中小的一个
(AbstractInstanceRegistry)overriddenInstanceStatusMap.clear(); recentCanceledQueue.clear(); recentRegisteredQueue.clear(); recentlyChangedQueue.clear(); registry.clear();
获取任务id并从处理的列表中移除
!acceptorQueue.isEmpty()
private InstanceRegistryProperties instanceRegistryProperties
recentlyChangedQueue.add(new RecentlyChangedItem(lease))
latestPeriodic.cancel(false)
Thread.sleep(serverConfig.getRegistrySyncRetryWaitMs())
List<PeerEurekaNode> newNodeList = new ArrayList<>(peerEurekaNodes)
reprocessQueue.clear()
判断任务的过期时间
AcceptorRunner线程每10ms轮询一次
从注册表中获取该实例的信息
通过单处理消费队列任务
peerEurekaNodes == Collections.EMPTY_LIST || isReplication
handleEIPBinding()
计算可以摘除的服务实例数量
this.expectedNumberOfClientsSendingRenews > 0
new EurekaController(this.applicationInfoManager)
Runnable peersUpdateTask = new Runnable()
queueOverflows += reprocessQueue.size()
long compensationTimeMs = getCompensationTimeMs()
定时任务scheduleServerEndpointTask
注册监听器
缓存刷新执行器指数回退相关属性,默认为10
totalItems == processingOrder.size()
创建一个批处理的任务分发器对象
new CloudServerCodecs(this.eurekaServerConfig)
获取总共要处理的任务数量
释放锁
构建一个非批处理任务执行器
int totalItems = processingOrder.size()
assignBatchWork()
判断是否有状态不为UNKNOWN的重写状态
Thread.sleep(10)
sendHeartBeat()(RestTemplateEurekaHttpClient)
是否开启只读缓存,默认开启
singleItemWorkRequests.release()
InstanceStatus prev = instanceInfo.setStatus(next)
我们可以自己实现监听事件来获取eureka一些相关信息
drainAcceptorQueue()
batchWorkRequests.tryAcquire(1)
applicationInfoManager.registerStatusChangeListener(statusChangeListener)
获取批量处理的任务队列
@EnableDiscoveryClient
for (int i = 0; ((i < serverConfig.getRegistrySyncRetries()) && (count == 0)); i++)
判断该节点是否在要下线的节点列表中
cacheRefreshTask = new TimedSupervisorTask( \"cacheRefresh\
在开启定时任务之前,先执行一次更新集群节点的操作,因为eureka服务启动需要将自己本身节点信息更新
int registrySize = (int) getLocalRegistrySize()
失效很多种缓存,比如增量,全量,服务等
getAndUpdateDelta(applications)
updateInstanceRemoteStatus()
获取当前服务实例数量
构建一个单个任务处理的任务执行器
initEurekaEnvironment()
根据registrySyncRetries和count循环遍历
通过批处理消费队列任务
initEurekaServerContext()
PermanentError
(EIPBindingTask)run()
创建一个集群节点
获取返回状态
判断任务是否已经满了
eurekaTransport.queryClient.getDelta(remoteRegionsRef.get())
添加过期的实例到列表中
是否开启只读缓存,默认为true
selectImports
notify()
this.numberOfRenewsPerMinThreshold = (int) (this.expectedNumberOfClientsSendingRenews * (60.0 / serverConfig.getExpectedClientRenewalIntervalSeconds()) * serverConfig.getRenewalPercentThreshold())
this.awsBinder.start()
getAndStoreFullRegistry()
存放返回结果
Value currentCacheValue = readOnlyCacheMap.get(key)
更新eureka集群节点
获取当前时间戳
刷新注册表拉取最新服务信息
N,失败回调
定义一个批处理的任务列表
初始化远程region的注册
initialize()(DefaultEurekaServerContext)
peerEurekaNodes.isThisMyUrl(node.getServiceUrl())
将任务id添加到有序链表中的第一个
this.registry = registry
EurekaServerInitializerConfiguration
定义count=0
registry.init(peerEurekaNodes)(PeerAwareInstanceRegistryImpl)
从TaskHolder中获取要执行的任务列表
onCacheRefreshed()
cacheRefreshTask服务发现
Y,成功回调
new Thread().start()开启一个线程执行线程体
new ArrayList<Application>(this.applications)
存放任务执行的返回值
这里控制上次出现同步错误后,在100/1000ms内不能再次执行
实例覆盖状态删除
如果满了,则统计队列溢出数量加上重试请求的队列大小
appendTaskHolder(acceptorQueue.poll())
设置任务超时时间为当前时间加上getLeaseRenewalOf(info)的时间第一次实例信息为空是90s,以后为30s
拉取注册表信息
如果有必要刷新实例的租约信息
接受请求入队到任务队列
创建一个核心线程数为1定时执行的线程池,并将线程设置为后台线程
new TaskExecutors<>(idx -> new BatchWorkerRunnable<>(\"TaskBatchingWorker-\
new InstanceReplicationTask
通过监听状态改变事件调用notify
源码入口
设置任务超时时间为当前时间加上最大延迟时间默认30s
clientConfig.shouldFetchRegistry()
开启执行instanceInfoReplicator任务
evict(compensationTimeMs)
续约成功设置lastSuccessfulHeartbeatTimestamp为当前时间
构建一个AcceptorExecutor对象,主要负责开启一个线程接受请求
线程池执行任务并返回结果
发布eureka已启动的事件
构建一个list用来存放过期的租约实例
scheduledPeriodicRef.set(next)
判断是否移除成功
Value cacheValue = readWriteCacheMap.get(key)
newNodeList.add(createPeerEurekaNode(peerUrl))
statusChangeListener = new ApplicationInfoManager.StatusChangeListener()
processingOrder.addFirst(id)
this.started = new AtomicBoolean(false)
int count = registry.syncUp()
遍历instance列表
isFull()
this.lastUpdatedTimestamp = System.currentTimeMillis()
同步数据到其他节点
心跳续约
Y将overriddenStatus不为unknown的要添加到overriddenInstanceStatusMap
int evictionLimit = registrySize - registrySizeThreshold
leaseToCancel = gMap.remove(id)
assignSingleItemWork()
创建一个非批处理的任务分发器对象
将实例加入到最近修改的队列中
if (evictionTimestamp <= 0) { evictionTimestamp = System.currentTimeMillis(); }
开启定时剔除任务EvictionTask,默认60s一次
获取实例的状态
Lease<InstanceInfo> existingLease = gMap.get(registrant.getId())
成功数量加一
renewsLastMin.start()
leaseToRenew.renew()
导入EurekaServerMarkerConfiguration组件
将运行状态改为true
lastUpdateTimestamp = System.currentTimeMillis() + duration
int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds()
调用批处理分发器处理实例覆盖状态的删除任务
read.lock()
调用批处理分发器处理服务下线任务
当线程处于活跃状态
httpResponse.getStatusCode() == Status.NOT_FOUND.getStatusCode()
scheduleTime = now + trafficShaper.transmissionDelay()
创建一个核心线程数为2并设置为后台线程的定时执行的线程池调度任务执行器
@Import(EurekaServerMarkerConfiguration.class)
往容器中导入组件
设置任务超时时间为当前时间加上最大延迟时间,默认30s
isSuccess(statusCode)
EurekaServerInitializerConfiguration.this.running = true
existingLastDirtyTimestamp > registrationLastDirtyTimestamp
Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp()
大于,休眠registrySyncRetryWaitMs再进行同步节点,默认是30S
N,将集群节点放到一个新的节点list
this.peerEurekaNodes = newNodeList; this.peerEurekaNodeUrls = new HashSet<>(newPeerUrls);
调用批处理分发器处理服务注册任务
将更改的状态更新到实例中
调用批处理器处理心跳任务
discoveryClient.register()
创建一个核心线程数为1最大线程数为2并设置为后台线程的线程池缓存刷新执行器cacheRefreshExecutor
开启剔除定时任务
开启eureka集群节点更新的定时任务,默认10分钟执行一次
执行任务run方法
调用分发器的process会触发acceptor的process
同步的客户端对象
InstanceStatusOverrideRule rule = getInstanceInfoOverrideRule()
batchWorkQueue.add(holders)
private EurekaServerConfig eurekaServerConfig(EurekaServerConfigBean)
将该租约实例放入到最近改变的队列中
随机摘除toEvict个实例
leaseToCancel == null
获取当前时间
newNodeList.remove(i); eurekaNode.shutDown();
从队列中获取任务,超时时间1ms
Cancel
initRemoteRegionRegistry()
创建一个状态改变的监听器并注册到applicationInfoManager
释放锁,返回单个处理的任务队列
ProcessingResult result = processor.process(tasks)(ReplicationTaskProcessor)
获取已存在的租约实例信息
EurekaMonitors.registerAllStats()
discoveryClient.refreshInstanceInfo()
this.expectedNumberOfClientsSendingRenews = this.expectedNumberOfClientsSendingRenews - 1
int statusCode = response.getStatusCode()
重新赋值
初始化缓存
ID id = taskHolder.getId()
run()(BatchWorkerRunnable)
检查EIP是否已绑定到实例
this.numberOfReplicationsLastMin.start()
lastDirtyTimestamp
expiredTasks++;
!scheduler.isShutdown()
进行节点初始化
lease.isExpired(additionalLeaseMs) && lease.getHolder() != null
EurekaServerAutoConfiguration
Y,注册具有给定持续时间的新实例
existingLease != null && (existingLease.getHolder() != null)
当任务列表小于批处理的大小并且要处理的任务列表不为空
EnableDiscoveryClientImportSelector
如果任务的过期时间=当前时间,说明这个任务已经到期还没有执行,expiredTasks加一,用来统计过期的任务数量
this.replicationClient = replicationClient
@Import(EurekaServerInitializerConfiguration.class)
singleItemWorkRequests.tryAcquire(1)
drainInputQueues()
初始化调度任务
创建一个租约实例对象,存活时间默认为90s
如果有必要刷新中心数据信息
publish(new EurekaServerStartedEvent(getEurekaServerConfig()))
!toShutdown.isEmpty()
this.expectedNumberOfClientsSendingRenews = count
判断线程是否中断停止
isEIPBound = isEIPBound()
eurekaServerBootstrap.contextInitialized(\t\t\t\t\t\tEurekaServerInitializerConfiguration.this.servletContext)
shouldUseReadOnlyResponseCache
List<T> tasks = getTasksOf(holders)
int retries = serverConfig.getEIPBindRebindRetries()
expiredTasks++
holders.add(holder)
holder.getExpiryTime() > now
绑定成功,直接return并且将定时任务执行时间改为5分钟一次
List<Lease<InstanceInfo>> expiredLeases = new ArrayList<>()
pendingTasks.containsKey(id)
lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis()
setServletContext(ServletContext servletContext) this.servletContext = servletContext;
reprocessQueue.isEmpty() && acceptorQueue.isEmpty() && pendingTasks.isEmpty()
status = getHealthCheckHandler().getStatus(instanceInfo.getStatus())
InstanceInfoReplicator.this.run()
执行AcceptorRunner的run方法
private EurekaClientConfig eurekaClientConfig(EurekaClientConfigBean)
@PostConstruct
判断是否允许客户端从服务端拉取服务,默认为true
实现了SmartLifecycle接口的bean,会调用start()方法
往容器里添加一个Marker的bean实例
没绑定成功
ApplicationsResource
创建一个核心线程数为1最大线程数为2并设置为后台线程的线程池心跳执行器heartbeatExecutor
long scheduleTime = 0
将请求放入队列
获取重试次数,默认是3
isRegisterable(instance)
Applications apps = eurekaClient.getApplications()
初始化client信息,本地的服务副本信息
i > 0
!reprocessQueue.isEmpty() || !acceptorQueue.isEmpty() || pendingTasks.isEmpty()
toShutdown.contains(eurekaNode.getServiceUrl())
apps/ GET
获取注册表里的服务信息
获取心跳发送时间间隔,默认为30s
currentPayload != null
registry.clearRegistry()
同步实例信息到其他节点
@Import
holders.isEmpty()
pendingTasks.remove(processingOrder.poll())
updatePeerEurekaNodes(resolvePeerUrls())
判断是否所有队列都为空
ID id = processingOrder.poll()
registerHealthCheck
run()(SingleTaskWorkerRunnable)
ReplicationList list = createReplicationListOf(tasks)
并将重试请求队列释放一些空间
如果状态为503
定义scheduleTime=0
@EnableEurekaServer
instanceInfo.setIsDirty()
定义更新集群节点的任务
从注册表获取实例信息
expiredLeases.add(lease)
toShutdown.isEmpty() && toAdd.isEmpty()
DiscoveryClient
applicationInfoManager.refreshDataCenterInfoIfRequired()
AwsBindingStrategy bindingStrategy = serverConfig.getBindingStrategy()
获取本地应用信息
创建一个固定大小的定时线程池,线程的名字为Eureka-PeerNodesUpdater并设置为后台守护线程
!toAdd.isEmpty()
没有过期加入到批处理的列表中
由于客户端希望注册这个新的实例,因此增加发送续订的客户端数量
开启一个定时任务,更新只读缓存默认30s更新一次
获取过期服务数量和应该摘除数量的最小值
spring.factories
调用批处理分发器处理实例状态的更新任务
processingOrder.add(taskHolder.getId())
注册健康检查
设置修改标志以便下次心跳时能发现该实例
eureka引入overriddenstatus用来解决状态被覆盖问题客户端调用updateStatus方法时,同时更新server端实例的status和overriddenStatus状态客户端调用renew方法时,也要更新server端实例的status和overriddenstatus状态,但是有一下规则的(1):如果客户端上传的实例状态是down或者starting,表明客户端是重启或者healthCheck失败。此时这个实例不能作为服务提供服务。因此即使客户端调用updateStatus把实例状态更新为up,也是没用的。此时客户端实例的准确状态就是down或者starting。(2):如果客户端的实例是up或者out_of_service,此时是不可信的。有可能client端的实例状态已被改变,此时要使用overriddenstatus状态作为当前实例的状态,避免被覆盖。(3):(2)中的overriddenstatus有可能不存在,缓存失效,此时要使用server端已经存在的实例的状态。
实例化一个绑定端的代理对象
开始执行numberOfReplicationsLastMin定时更新 默认60s一次每分钟统计当前一分钟接受了多少个节点同步并存储起来
int count = 0;
获取将要添加的节点urls
Future latestPeriodic = scheduledPeriodicRef.get()
batchingDispatcher.process(taskId(\"heartbeat\
@PostConstruct初始化
服务下线
监听状态改变事件调用notify
String targetHost = hostFromUrl(peerEurekaNodeUrl)
注册所有eureka的监听信息
后台任务线程未关闭
构建一个批处理的任务执行器
初始化eureka服务上下文信息
getApplicationsInternal(\"apps/delta\
long responseCacheUpdateIntervalMs = serverConfig.getResponseCacheUpdateIntervalMs()
优先用批处理消费pendingTasks队列,批处理执行的条件是:pendingTasks队列长度大于maxBufferSize(10000),或pendingTasks队列中的任务等待时间大于maxBatchingDelay(500ms)经过批处理消费过后,如果pendingTasks队列中还有数据(没有达到批处理条件/一次批处理没有消费完),就用单次处理消费
count变量加1
自动服务故障移除中服务实例摘除的数量不会超过15%
Long dirtyTimestamp = instanceInfo.isDirtyWithTime()
创建心跳续约的任务heartbeatTask并监听
0 条评论
下一页