spark源码之DAGScheduler
2019-07-18 11:32:34 0 举报
AI智能生成
spark源码的DAGScher解读
作者其他创作
大纲/内容
0.包路径
包:org/apache/spark/scheduler/DAGScheduler.scala
1.提交Job的入口
runJob()
runJob()
1)生成job的启动时间
2)submitJob()
提交job,执行Job过程是异步的,
因此submitJob()将立即返回JobWaiter对象
提交job,执行Job过程是异步的,
因此submitJob()将立即返回JobWaiter对象
3)利用JobWaiter等待Job处理完毕,成功:打印日志,失败:日志+异常
2.DAGScheduler调度的核心入口
handleJobSubmitted()
handleJobSubmitted()
1)调用createResultStage(),创建ResultStage
2)创建ActiveJob。
3)调用clearCacheLocs方法(见代码清单7-23 )清空cacheLocs。
4)生成Job提交的时间。
5)将jobld与刚创建的ActiveJob之间的对应关系放人jobldToActiveJob中。
6)将刚创建的ActiveJob放人activeJobs集合中。
7 )使ResultStage的。activeJob 属性持有刚创建的ActiveJob。
3)调用clearCacheLocs方法(见代码清单7-23 )清空cacheLocs。
4)生成Job提交的时间。
5)将jobld与刚创建的ActiveJob之间的对应关系放人jobldToActiveJob中。
6)将刚创建的ActiveJob放人activeJobs集合中。
7 )使ResultStage的。activeJob 属性持有刚创建的ActiveJob。
8)获取当前Job的所有Stage对应的StageInfo (即数组stagelnfos)。
9)向LiveListenerBus投递SparkL itenerlobSart事件,进而引发所有关注此事件的监
听器执行相应的操作。
9)向LiveListenerBus投递SparkL itenerlobSart事件,进而引发所有关注此事件的监
听器执行相应的操作。
10)调用submitage(),
提交ResultStage
---stage划分算法的入口
提交ResultStage
---stage划分算法的入口
01.activeJobForStage(),找到当前stage的所有ActiveJob的身份标识
02.若存在01,判断stage是否还未提交,然后进行以下操作----stage划分算法的精髓
①调用getMissingParentStages(),获取当前所有未提交stage的父Stage
②如果不存在未提交的父stage,则调用submitMissingTasks()提交当前所有未提交的额Task
否则,递归调用submitStage(),提交所有未提交的父stage,并将当前stage加入waitingStages,
(表示当前stage必须等待所有的父stage执行完成)
①调用getMissingParentStages(),获取当前所有未提交stage的父Stage
②如果不存在未提交的父stage,则调用submitMissingTasks()提交当前所有未提交的额Task
否则,递归调用submitStage(),提交所有未提交的父stage,并将当前stage加入waitingStages,
(表示当前stage必须等待所有的父stage执行完成)
getMissingParentStages()----stage的划分算法
如果stage最后一个rdd的所有依赖,都是窄依赖,那么就不会创建任何新的stage
但是只要发现这个stage的rdd宽依赖了某个rdd,那么
用宽依赖的那个rdd,创建一个新的stage,然后立即将新的stage返回
如果stage最后一个rdd的所有依赖,都是窄依赖,那么就不会创建任何新的stage
但是只要发现这个stage的rdd宽依赖了某个rdd,那么
用宽依赖的那个rdd,创建一个新的stage,然后立即将新的stage返回
submitMissingTasks(),
提交stage,为stage创建一批task,task数量与partiton数量相同
提交stage,为stage创建一批task,task数量与partiton数量相同
03.若不存在01,则调用abortStage()终止依赖于当前Stage的所有Job
3.构建stage
创建ResultStage的方法
createResultStage()
createResultStage()
获取或创建父stage的列表
getOrCreateParentStages()
getOrCreateParentStages()
getShuffleDependencies
获取RDD所有shuffleDependency的序列,
逐个访问每个RDD及其依赖的非shuffle的RDD,
获取所有非shuffle的RDD的shuffleDependency
获取RDD所有shuffleDependency的序列,
逐个访问每个RDD及其依赖的非shuffle的RDD,
获取所有非shuffle的RDD的shuffleDependency
getOrCreateShuffleMapStage
为每一个ShuffleDependency获取或者创建对应的ShuffleMapStage
为每一个ShuffleDependency获取或者创建对应的ShuffleMapStage
job包含多个stage,
划分方式从Resultstage开始从后往前边划分边创建
划分方式从Resultstage开始从后往前边划分边创建
生成stage身份标识
将ResultStage注册到stageTdToStage中
调用updateJobIdStageIdMaps(),
更新Job的身份标识与ResultStage机器祖先的映射关系
更新Job的身份标识与ResultStage机器祖先的映射关系
stage划分算法总结:
1.从fianlkStage倒推
2.通过宽依赖来进行stage的划分
3.使用递归有限提交父stage
1.从fianlkStage倒推
2.通过宽依赖来进行stage的划分
3.使用递归有限提交父stage
收藏
0 条评论
下一页