Flink SQL执行流程源码(Parse-->Validate-->Optimize-->Execute)
2022-04-08 17:27:53 16 举报
Flink SQL执行流程源码(Parse-->Validate-->Optimize-->Execute)
作者其他创作
大纲/内容
1
case BACK_TICK:
在将 SQL 语句解析成 Operation 后,为了得到 Flink 运行时的具体操作算子,需要进一步将 ModifyOperation 转换为 Transformation,转换的流程主要分为四个部分,即 1)将 Operation 转换为 另外一种RelNode,2)优化 RelNode,3)转换成 ExecNode,4)转换为底层的 Transformation 算子
以RelNodeBlock为单位进行优化,在子类中实现,StreamCommonSubGraphBasedOptimizer,BatchCommonSubGraphBasedOptimizer
Optimize
val optimizedRelNodes = optimize(relNodes)
生成transformations
SQL 转换及优化开始
FlinkSqlParserImpl.switchTo()
1、用config里边得到Factory2、Factory生成的解析器的抽象基础类
4、Optimize:(2)SQL 转换及优化
return
def translateToPlan(planner: E): Transformation[T] = { if (transformation == null) { transformation = translateToPlanInternal(planner) } transformation }
parser.switchTo(\"BTID\");
1
递归各个节点,转换成transformationnode.translateToPlan(planner)
2
Flink源码(1.11.x)——Flink SQL执行流程源码(Parse-->Validate-->Optimize-->Execute)
val sinkBlocks = doOptimize(roots)
parser.parse(query);
执行sql
创建一个Table环境
CalciteParser.parse()
optimizeBlock()
StreamTableEnvironmentImpl
toQueryOperation()
底层是通过JavaCC生成的解析器去解析SQL。过程非常复杂返回sqlNode(抽象语法树)
Operation operation = operations.get(0);
val sqlToRelConverter: SqlToRelConverter = new SqlToRelConverter()
toDataStream()
ExecNode.translateToPlan()
val relNodes = modifyOperations.map(translateToRel)
SqlCall.accept()
SqlValidatorImpl.validate()
CommonSubGraphBasedOptimizer.optimize()
org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery()
StreamPlanner.translateToPlan()
SqlToOperationConverter.convert()
List<Transformation<?>> transformations = planner.translate(Collections.singletonList(modifyOperation));
依赖calcite将sql语句解析为抽象语法树sqlNode
this.token_source.SwitchTo(state);
3
visitor.visit(this);
创建关系树转化器
org.apache.calcite.sql.parser.SqlParser
返回 transformations
executionEnvironment.addOperator(transformation);
FlinkStreamProgram.buildProgram()
return optimizedRelNodes
def validate()
val inputTransform = getInputNodes.get(0).translateToPlan(planner) .asInstanceOf[Transformation[RowData]]
TableEnvironmentImpl.sqlQuery()
StreamCommonSubGraphBasedOptimizer.doOptimize()
override protected def translateToPlanInternal( planner: StreamPlanner): Transformation[RowData] = { val transformations = getInputNodes.map { input => input.translateToPlan(planner).asInstanceOf[Transformation[RowData]] } new UnionTransformation(transformations) }
返回RelRoot
translateToRel()
return sqlNode
val optimizedRelNodes = getOptimizer.optimize(relNodes)
递归
将 RelNode 封装成 PlannerQueryOperation
Parse:语法解析,把 SQL 语句转换成为一个抽象语法树(AST),在 Calcite 中用 SqlNode 来表示;Validate:语法校验,根据元数据信息进行验证,例如查询的表、使用的函数是否存在等,校验之后仍然是 SqlNode 构成的语法树;Optimize:查询计划优化,这里其实包含了两部分,1)首先将 SqlNode 语法树转换成关系表达式 RelNode 构成的逻辑树,2)然后使用优化器基于规则进行等价变换,例如我们比较熟悉的谓词下推、列裁剪等,经过优化器优化后得到最优的查询计划;Execute:将逻辑查询计划翻译成物理执行计划,生成对应的可执行代码,提交运行。
返回 transformation
返回优化后的node
override def optimize(roots: Seq[RelNode]): Seq[RelNode] = { //以RelNodeBlock为单位进行优化,在子类中实现,StreamCommonSubGraphBasedOptimizer,BatchCommonSubGraphBasedOptimizer val sinkBlocks = doOptimize(roots) //获得优化后的逻辑计划 val optimizedPlan = sinkBlocks.map { block => val plan = block.getOptimizedPlan require(plan != null) plan } //将 RelNodeBlock 使用的中间表展开 expandIntermediateTableScan(optimizedPlan) }
ParserImpl.parse
开始执行任务1、生成四种Graph2、部署Task
def translateToPlan(planner: E): Transformation[T] = { if (transformation == null) { transformation = translateToPlanInternal(planner) } transformation }
Validate
StreamExecUnion.translateToPlan()
3、Optimize:(1)查询计划优化
循环优化逻辑子树
FlinkPlannerImpl.rel()
转换sql
ExecNode.translateToPlanInternal()
FlinkSqlParserImpl.parseSqlStmtEof()
optimizeTree()
SqlNode parsed = parser.parse(statement);
使用ModifyOperation来创建Table
PreValidateReWriter.visit()
PlannerBase.optimize()
StreamExecCalc.translateToPlanInternal()
Table result = = tEnv.sqlQuery(\"select * from ****\")
parseQuery()
public static void main(String[] args) throws Exception { final ParameterTool params = ParameterTool.fromArgs(args); String planner = params.has(\"planner\") ? params.get(\"planner\") : \"blink\
校验表属性、字段、表的存在性
create()
1)将 Operation 转换为 另外一种RelNode
Execute
parser.parseSqlStmtEof()
return operations
case outputConversion: OutputConversionModifyOperation
PlannerBase.translate()
val execNodes = translateToExecNodePlan(optimizedRelNodes)
Parse
解析sql
return Optional.of(converter.convertSqlQuery(validated));
2、Validate:语法校验校验之后仍然是 SqlNode 构成的语法树
// 1)将 Operation 转换为 另外一种RelNode val relNodes = modifyOperations.map(translateToRel) // 2)优化 RelNode val optimizedRelNodes = optimize(relNodes) // 3)转换成 ExecNode val execNodes = translateToExecNodePlan(optimizedRelNodes) // 4)转换为底层的 Transformation 算子 translateToPlan(execNodes)
translateToExecNodePlan()
SqlAbstractParserImpl parser = config.parserFactory().getParser(reader)
返回relNodes即逻辑计划图(Logical Planning)
return new PlannerQueryOperation(relational.rel);
2)优化 RelNode
3)转换成 ExecNode(物理执行计划)
return createTable((QueryOperation) operation);
transformation转换完成
parse()
final SqlNode validated = flinkPlanner.validate(sqlNode);
Flink应用服务主类启动Start
Execute:将逻辑查询计划翻译成物理执行计划
optimizedRelNodes 是flink逻辑执行计划
转换结束
1、Parse:语法解析
rel()
convertSqlQuery() 使用 SqlToRelConverter 将 SqlNode 转换成 RelNode
返回table
return parser.parseStmt();
4)转换为底层的 Transformation 算子
return parser
this.transformations.add(transformation);
创建parser
RelRoot relational = planner.rel(validated);
return ret;
toAppendStream()
FlinkPlannerImpl.validate()
SqlNode stmt = this.SqlStmt();this.jj_consume_token(0);return stmt;
创建table和SQL的入口点和中心上下文环境StreamTableEnvironment(Blink Planner )
参考:Task部署、Task间数据交换、Task(线程)复用 源码流程图
预校验结合元数据验证 Sql 的合法性
转换为关系树使用 SqlToRelConverter 将 SqlNode 转换成 RelNode
val programs = calciteConfig.getStreamProgram .getOrElse(FlinkStreamProgram.buildProgram(config.getConfiguration))
返回validated
设置优化规则(物理执行计划和逻辑执行计划)
如果:validated.getKind().belongsTo(SqlKind.QUERY))
env.execute();
FlinkSqlParserImplTokenManager.SwitchTo()
translateToPlan(execNodes)
返回Collections.singletonList(operation);
0 条评论
下一页