trino--调度---DAG
2023-07-08 21:07:41 0 举报
trino Push 调度流程
作者其他创作
大纲/内容
StageScheduler
等待第一个完成的 Stage
ListenableFuture<V> whenAnyComplete( Iterable<? extends ListenableFuture<? extends V>> futures)
A
font color=\"#569230\
stageExecutions 循环调用
执行计划演进过程 Stages
CoordinatorStagesScheduler
当前线程执行
void schedule()
StageStateMachine
StateMachine<StageState> stageState
PLANNED --> SCHEDULING
SqlStage
10:为每个 StageExecution 创建一个 RemoteTask
Optional<RemoteTask> createTask
PipelinedStageExecution
StagesScheduleResult
Set<StageExecution> stagesToSchedule
为每个节点,创建一个 RemoteTask
ScheduleResult schedule()
DistributedStagesScheduler
PhasedExecutionSchedule
触发已经就绪的 Fragment 的目标 Fragment
Set<PlanFragmentId> unblockStagesWithFullOutputBuffer()
Worker1
Worker2
Worker3
1:清除已经完成的 Stages2:返回所有要执行的 Fragments
Set<PlanFragmentId> removeCompletedStages()
Graph.removeVertex()方法用于从图中移除一个顶点及其相关边。举个示例:Graph<String> graph = Graph.create();graph.addVertex(\"A\");graph.addVertex(\"B\");graph.addVertex(\"C\");graph.addVertex(\"D\");graph.addEdge(\"B\
Fragment
DistributedStagesScheduler
控制 DAG 的执行
void schedule()
策略见:https://www.processon.com/diagraming/641d16b5f10804568d59c13a
InternalNode: coordinator --> NodeScheduler 选出partition:0initialSplits:nil
blockedStages
被调度,未完成的 Stages
1:N
RemoteTask
@ForQueryExecution
newCachedThreadPool(query-execution)
CoordinatorModule
D
HttpRemoteTask
Executor executor
void start()
SqlQueryExecution
ExecutorService queryExecutor
ExchageNode
HttpRemoteTasks
为每个参与计算的 Worker 节点,创建一个
B
List<StageExecution> stageExecutions
Fragments
SqlQueryExecutionFactory(SINGLETON)
任务有两个通信链接: 任务 <-> 协调器(用于状态更新) 任务 <-> 下游任务(用于交换任务结果)在下游任务与任务之间的链接中断的情况下(而任务与协调器之间的链接没有中断),如果没有启用故障恢复,下游任务将发现通信链接中断并且会导致查询失败。然而,如果启用了故障恢复,下游任务将被配置为忽略失败以便在上游任务失败时继续运行。这可能导致\"死锁\",因为协调器认为任务是活动的,但是由于任务与下游任务之间的通信链接中断,没有人在获取结果,导致任务处于阻塞状态。因此,将此类通信故障通知调度器非常重要,以便调度器能够做出反应并重新调度任务。目前,只有\"协调器\"任务必须在上游任务失败时继续运行(例如执行表提交的任务)。重新启动表提交任务会引入另一组挑战(例如确保提交操作始终是幂等的)。如果需要在不同节点上分离调度和协调器任务,就必须实现一个用于此通知的RPC机制。注意:对于没有任何协调器阶段的查询,情况仍然类似。拉取最终查询结果的交换客户端如果在交换客户端和一个输出任务之间的通信链接中断,也必须传播同样的通知。
Workers
HttpRemoteTaskFactory(SINGLETON)
ExecutorService coreExecutor
PipelinedQueryScheduler
ExecutorService executor 线程池
PhasedExecutionSchedule
Set<StageExecution> activeStages
span style=\
Set<PlanFragmentId> removeCompletedStage( StageExecution stage)
ExecutionSchedule executionSchedule
循环直到SQL执行完成
QueryExecutionFactoryModule
public StagesScheduleResult getStagesToSchedule()
HttpRemoteTaskFactory
RemoteTask createRemoteTask
C
RemoteTaskFactory
Split 为空
stageScheduler
RemoteTask1
RemoteTask2
RemoteTask3
把要执行的Fragment 对应的 Stage 按顺序加入到 activeStages
void schedule()
newCachedThreadPool(remote-task-callback)
0 条评论
下一页