Sparksql计划生成过程
2021-07-30 20:45:41 2 举报
Sparksql计划生成过程
作者其他创作
大纲/内容
QueryExecution
lazy val sparkPlan: SparkPlan = { SparkSession.setActiveSession(sparkSession) planner.plan(ReturnAnswer(optimizedPlan)).next() }
SparkPlan
def executeCollect(): Array[InternalRow] = { val byteArrayRdd = getByteArrayRdd() ..... byteArrayRdd.collect().foreach { countAndBytes => ..... }
生成QueryExecution
HashAggregateExec...
lazy val analyzed: LogicalPlan
SessionState
lazy val sessionState: SessionState
匿名函数action SparkPlan=>U即 def action(s: SparkPlan): Seq[InternalRow]={s.executeCollect()}
lazy val analyzed: LogicalPlan
xxCommand--->DataWritingCommand || RunnableCommand--->Command
SparkSession
def sql(sqlText: String): DataFrame
FileFormatWriter
def write(...){ plan.execute() ..... sparkSession.sparkContext.runJob(....)}
df.explain()
def write(...){ ...... sparkSession.sparkContext.runJob(....)}
SparkPlanner
override def strategies: Seq[Strategy] = experimentalMethods.extraStrategies ++ extraPlanningStrategies ++ ( DataSourceV2Strategy :: FileSourceStrategy :: DataSourceStrategy(conf) :: SpecialLimits :: Aggregation :: JoinSelection :: InMemoryScans :: BasicOperators :: Nil)
Command 接口: 表示系统要执行的非查询命令的逻辑节点。例如,解析器可以使用命令来表示DDL操作。与查询不同的是,命令会立即执行。
Dataset
lazy val sparkPlan: SparkPlan
AbstractSqlParser
def parsePlan(sqlText: String): LogicalPlan
insert overwrite table good_student_infosselect * from student_infos
BaseSessionStateBuilder
def createQueryExecution: LogicalPlan => QueryExecution = { plan => font color=\"#f44336\
FileSourceScanExec
def createBucketedReadRDD(。。。): RDD[InternalRow]
case①
execute()
把sql通过Antrl4解析成语法树
case: sparksession.read.parquet/orc/text(path) ||sql(\"select ...\")中hive table 是parquet、orc格式val rdds = child.asInstanceOf[CodegenSupport] .inputRDDs()
case: 数据源是 text 格式的hivetable
abstract class QueryPlanner
def plan(plan: LogicalPlan): Iterator[PhysicalPlan]
df.count()df.first()
def createNonBucketedReadRDD(。。。。): RDD[InternalRow]
循环调用,直到第一个RDD,生成整条RDD链
②case Union(children: Seq[LogicalPlan])
生成SparkPlan
生成
DataSet
生成RDD[InternalRow]
HiveTableScanExec
def doExecute(): RDD[InternalRow]{ hadoopReader.makeRDDForTable(hiveQlTable) hadoopReader.makeRDDForPartitionedTable(prunePartitions(rawPartitions))}
createBucketedReadRDD 形成的RDD的分区和数据源文件的bucket 有关
[DataWritingCommandExec || ExecutedCommandExec]--->SparkPlan
override def executeCollect(): Array[InternalRow] = sideEffectResult.toArray
plan.execute()
final def execute(): RDD[InternalRow]
commands.scala 内的SparkPlan(ExecutedCommandExec、DataWritingCommandExec)
....Exec ---->SparkPlan
def doExecute(): RDD[InternalRow] {child.execute() ...}
case ②
RDD
def collect(): Array[T] = withScope { val results =font color=\"#d32f2f\
TODO
def doExecute(): RDD[InternalRow]
①case c: Command
lazy val withCachedData: LogicalPlan
③case_=
lazy val executedPlan: SparkPlan=prepareForExecution(sparkPlan)
byteArrayRdd.collect()
lazy val optimizedPlan: LogicalPlan
def executePlan(plan: LogicalPlan)
def plan(plan: LogicalPlan): Iterator[PhysicalPlan] {val candidates = strategies.iterator.flatMap(_(plan))}
@transient private[sql] val logicalPlan: LogicalPlan
=
sparksql
LogicalPlan
0 条评论
回复 删除
下一页