Eureka源码(七)— 三层队列任务批处理机制
2022-05-10 10:20:25 1 举报
Eureka源码, 三层队列任务批处理机制
作者其他创作
大纲/内容
获得实际批量任务
调用 #drainInputQueues() 方法循环处理完输入队列( 接收队列 acceptorQueue + 重新执行队列 reprocessQueue ),直到有 待执行的任务pendingTasks
PeerEurekaNodes初始化
<<interface>>TaskProcessor
+ process(task : T) : ProcessingResult+ process(List<T>) : ProcessingResult
初始化任务处理器
AcceptorExecutor
内部抽象类
处理完重新执行队列 reprocessQueue
细箭头:任务执行经历的操作粗箭头:任务队列流转的方向
调用 任务分发器 TaskDispatcher#process() 方法
相关代码
初始化任务接收器的同时,也会初始化AcceptorRunner(任务接收线程,调度任务),代码流程后面讲解
处理完接收队列 acceptorQueue
SingleTaskWorkerRunnable
+ run() : void
workQueue
<<interface>>TaskDispatcher
任务接收执行器【处理任务】
任务1
任务持有者 TaskHolder
updatePeerEurekaNodes(resolvePeerUrls())
ReplicationTaskProcessor
任务的执行拆分了三层队列:第一层:接收队列【acceptorQueue】,重新处理队列【reprocessQueue】 绿箭头:分发器在收到任务执行请求后,提交到接收队列,任务实际未执行 蓝箭头:执行器的工作线程处理任务失败,将符合条件的失败任务提交到重新执行队列第二层:待执行队列【processingOrder】 紫箭头:接收线程(Runner)将重新执行队列,接收队列提交到待执行队列第三层:工作队列【workQueue】 紫箭头:接收线程(Runner)将待执行队列的任务根据参数(maxBatchingSize)将任务合并成批量 任务,调度(提交)到工作队列 蓝箭头:执行器的工作线程池,一个工作线程可以拉取一个批量任务进行执行
TaskDispatchers
任务持有者(任务)
后台线程执行 任务接收线程 AcceptorRunner#run() 方法,调度任务
1.获取任务处理
EurekaBootStrap初始化
Task
批量任务工作后台线程( BatchWorkerRunnable )执行 #run() 方法,调度任务
2.添加到 pendingTasks && processingOrder
~ id : ID~ task : T~ expiryTime : long~ submitTimestamp : long
reprocessQueue
3
PeerEurekaNode初始化
任务接收器
调用 任务处理器 TaskProcessor 执行任务。当任务执行结果为 Congestion 或 TransientError ,调用 任务接收器 AcceptorExecutor#reprocess() 提交整个批量任务重新处理
Eureka-Server 集群通过任务批处理同步应用实例注册实例在服务注册,取消,故障等情况会调用replicateToPeers方法同步应用实例
1
任务执行流程
初始化任务执行器
TaskProcess
任务3
TaskDispatcher
结束
任务2
初始化任务接收器(AcceptorExecutor)
调用 TaskDispatcher#requestWorkItems() 方法,发起请求信号量,并获得批量任务的工作队列
调度批量任务,将任务添加到 批量任务工作队列 batchWorkQueue
3.分派任务到workQueue
将 ReplicationTaskProcessor 和 AcceptorExecutor 作为参数去初始化批量任务执行器 taskExecutor
调用 任务接收器 AcceptorExecutor#process() 方法,添加任务到接收任务队列acceptorQueue
函数式,完整写法如下
WorkerRunnableFactory 创建工作线程工厂接口。单任务和批量任务执行器的工作线程实现不同,通过自定义工厂实现类创建单任务工作线程SingleTaskWorkerRunnable,方法#singleItemExecutors()批量任务工作线程BatchWorkerRunnable,方法#batchExecutors()
ReplicationTaskProcessor 重写 #process() 方法里面是处理单任务和批量任务的逻辑,调用其他Eureka Server的逻辑
pendingTasks&&processingOrder
以 注册操作批量处理代码 为例(单任务处理代码类似)
serverContext.initialize()
processingOrder
批量任务工作线程【执行任务】
workerThreads 属性,工作线程池。工作任务队列会被工作线程池并发拉取,并发执行WorkerRunnable 任务工作线程抽象类BatchWorkerRunnable(代码流程后面讲解) 和 SingleTaskWorkerRunnable 都实现该类,差异在 #run() 的自定义实现
初始化taskExecutor时调用构造器,创建 工作线程池
任务分发器
2.处理失败的任务,添加会reprocessQueue
acceptorQueue
ReplicationTaskProcessor 实现 TaskProcessor 接口
调度任务
初始化任务工作后台
2
TaskExecutors
AcceptorRunner初始化
AcceptorRunner
TrafficShaper 网络通信整形器。当任务执行发生请求限流,或是请求网络失败的情况,则延时 AcceptorRunner 将任务提交到工作任务队列,从而避免任务很快去执行,再次发生上述情况。
taskExecutor
初始化任务分发器
初始化任务分发器(TaskDispatcher)
workerThread
1.接收 acceptorQueue && reprocessQueue
初始化任务接收器
batchingDispatcher#process()
#process() 提交任务
集群注册表同步调用下面方法#replicateInstanceActionsToPeers然后调用PeerEurekaNode#register()
任务接收线程【调度任务】
三层队列的好处:1、接收队列,避免处理任务的阻塞等待2、接收线程(Runner)合并任务,将相同任务编号的任务合并,只执行一次3、Eureka-Server为集群同步提供批量操作多个应用实例的接口,一个匹狼任务可以一次调度接口完成,避免多次调用的开销。当然,这样做的前提是合并任务,这也导致Eureka-Server集群之间对实例的注册和下线带来了更大的延迟毕竟Eureka是在CAP之间,选择了AP【一致性(Consistency)、可用性(Availability)和分区容错性(Partition tolerance)】
batchingDispatcher
WorkerRunnable
循环,直到同时满足如下全部条件:1、重新执行队列( reprocessQueue ) 和接收队列( acceptorQueue )为空2、待执行任务映射( pendingTasks )不为空
WorkerRunnableFactoryBatchWorkerRunnable执行任务
peerEurekaNodes.start()
createPeerEurekaNode()
任务执行器
初始化任务执行器(TaskExecutors)
<<enum>>TaskProcessor
+ Success : Enum+ Congestion : Enum+ TransientError : Enum+ PermanentError : Enum
- isShutdown : AtomicBoolean- workerThreads : List<Thread>
<<interface>>WorkerRunnableFactory
create(idx : int) : font color=\"#f44336\
任务处理器
具体的代码逻辑在下方
优先从重新执行任务的队尾拿较新的任务,从而实现保留更新的任务在待执行任务映射( pendingTasks ) 里添加任务编号到待执行队列( processingOrder ) 的头部,如下图
BatchWorkerRunnable
注册调度#replicateToPeers
开始
AcceptorRunner是AcceptorExecutor的内部类
初始化任务处理器(TaskProcessor)
线程数组
内部类
内部接口
TaskDispatchers 任务分发器工厂类,用于创建任务分发器。其内部提供两种任务分发器的实现;批量任务执行的分发器,用于 Eureka-Server 集群注册信息的同步任务单任务执行的分发器,用于 Eureka-Server 向亚马逊 AWS 的 ASG ( Autoscaling Group ) 同步状态
获取批量任务 调用 getWork() 方法,获取一个批量任务直到成功
涉及的类
继承
#process() 添加到acceptorQueue
0 条评论
下一页