Spark-SQL
2023-03-27 15:34:56 0 举报
AI智能生成
对spqrk-sql的详细总结,根据本人工作经验进行重点总结
作者其他创作
大纲/内容
概述
简介
用于处理结构化数据的模块。对于开发人员来讲,Spark SQL可以简化RDD的开发,提高开发效率,提供了两个抽象数据模型,类似于SparkCore中的RDD,即DataFream和DataSet。
特点
1、Integrated:易整合,讲SQL查询与Spark程序无缝混合。
2、Uniform data access:统一数据访问,以相同的方式连接到任何数据源。
3、Hive integration:兼容 Hive,支持在现有的 Hive 仓库上运行 SQL 或 HiveQL 查询。Spark SQL 支持 HiveQL 语法以及
Hive SerDes 和 UDF,可以直接访问现有的 Hive 仓库。
4、Standard connectivity:标准连接,提供了行业标准的 JDBC 和 ODBC 连接。
2、Uniform data access:统一数据访问,以相同的方式连接到任何数据源。
3、Hive integration:兼容 Hive,支持在现有的 Hive 仓库上运行 SQL 或 HiveQL 查询。Spark SQL 支持 HiveQL 语法以及
Hive SerDes 和 UDF,可以直接访问现有的 Hive 仓库。
4、Standard connectivity:标准连接,提供了行业标准的 JDBC 和 ODBC 连接。
数据模型
DataFrame
代表一个不可变的分布式数据集合(一种数据结构),是Spark对需要处理的数据的抽象。其核心目的是让开发者面对数据处理时,只关心要做什么,而不用关心怎么去做。
解析:SQL Player将SQL通过词法、语法解析生成Unresolved Logical Plan(未绑定的逻辑执行计划)
编译:Analyzer(分析器)使用 Analysis Rules,配合元数据(Catalog 信息或 Hive Metastore 信息)将 Unresolved
Logical Plan 编译成 Analyzed Logical Plan(编译后的逻辑计划);
优化:Logical Optimizer(逻辑计划调优器)使用一些 Optimization Rules(调优规则:合并、列裁剪、谓词下推等)
将 Analyzed Logical Plan 优化成 Optimized Logical Plan(优化后的逻辑计划);
执行:Physical Planner(物理计划生成器)使用 Planning Strategies 将前面的 Optimized Logical Plan(还不能被 Spark
执行)生成可执行的物理计划 Physical Plan。这个过程是把 Logical Plan 转换成多个 Physical Plans,根据过去的性能统
计数据,利用代价模型(Cost Model),选择最佳的物理执行计划。
最后通过 Code Generation(代码生成器)把 SQL 查询生成 Java 字节码(代码生成阶段)。最终执行。
解析:SQL Player将SQL通过词法、语法解析生成Unresolved Logical Plan(未绑定的逻辑执行计划)
编译:Analyzer(分析器)使用 Analysis Rules,配合元数据(Catalog 信息或 Hive Metastore 信息)将 Unresolved
Logical Plan 编译成 Analyzed Logical Plan(编译后的逻辑计划);
优化:Logical Optimizer(逻辑计划调优器)使用一些 Optimization Rules(调优规则:合并、列裁剪、谓词下推等)
将 Analyzed Logical Plan 优化成 Optimized Logical Plan(优化后的逻辑计划);
执行:Physical Planner(物理计划生成器)使用 Planning Strategies 将前面的 Optimized Logical Plan(还不能被 Spark
执行)生成可执行的物理计划 Physical Plan。这个过程是把 Logical Plan 转换成多个 Physical Plans,根据过去的性能统
计数据,利用代价模型(Cost Model),选择最佳的物理执行计划。
最后通过 Code Generation(代码生成器)把 SQL 查询生成 Java 字节码(代码生成阶段)。最终执行。
DataSet
Spark1.6中添加的新接口,是DataFrame的扩展,它具有RDD的优点以及Spark SQL的优化执行引擎的优点。DataSet也可以使用转换函数。另外,它可以减少内存的使用。
在 Scala 和 Java 中,Row 类型的 Dataset 代表 DataFrame,即 Dataset[Row] 等同于 DataFrame。
在 Scala 和 Java 中,Row 类型的 Dataset 代表 DataFrame,即 Dataset[Row] 等同于 DataFrame。
基本使用
SQL
createTempView
创建一张会话级别临时表,这张表的生命周期是当前会话,一旦创建这张表的会话关闭,这张表会立马消失,相同表名只能创建一次,否则报错。
createGlobalTempView
创建一张全局级别临时表,这张表的生命周期是整个 Spark 应用程序,只要 Spark 应用程序不关闭,这张临时表就依然可用,并且这张表对其他的 SparkSession 共享。相同表名只能创建一次,否则报错。
createOrReplaceTempView
创建一张会话级别临时表,这张表的生命周期是当前会话,一旦创建这张表的会话关闭,这张表会立马消失。创建相同表名会覆盖,不会报错。
createOrReplaceGlobalTempView
创建一张全局级别临时表,这张表的生命周期是整个 Spark 应用程序,只要Spark 应用程序不关闭,这张临时表就依然可用,并且这张表对其他的 SparkSession 共享。创建相同表名会覆盖,不会报错。
模型互转
RDD转DataFrame
scala> val df = rdd.toDF
df: org.apache.spark.sql.DataFrame = [id: int, name: string ... 2 more fields]
DataFrame = [id: int, name: string ... 2 more fields] :表示 DataFrame 模型的数据类型为一行数据
( Row ),这是弱数据类型的体现
df: org.apache.spark.sql.DataFrame = [id: int, name: string ... 2 more fields]
DataFrame = [id: int, name: string ... 2 more fields] :表示 DataFrame 模型的数据类型为一行数据
( Row ),这是弱数据类型的体现
DataFrame转RDD
scala> val rdd = df.rdd
rdd: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[24] at rdd at <console>:26
因为 DataFrame 模型的数据类型为 Row (弱数据类型),所以 RDD[org.apache.spark.sql.Row] 。
rdd: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[24] at rdd at <console>:26
因为 DataFrame 模型的数据类型为 Row (弱数据类型),所以 RDD[org.apache.spark.sql.Row] 。
RDD转DataSet
scala> val ds = rdd.toDS
ds: org.apache.spark.sql.Dataset[User] = [id: int, name: string ... 2 more fields]
Dataset[User] :表示 Dataset 模型的数据类型为 User,这就是强数据类型的体现。
ds: org.apache.spark.sql.Dataset[User] = [id: int, name: string ... 2 more fields]
Dataset[User] :表示 Dataset 模型的数据类型为 User,这就是强数据类型的体现。
DataSet转RDD
scala> val rdd = ds.rdd
rdd: org.apache.spark.rdd.RDD[User] = MapPartitionsRDD[132] at rdd at <console>:26
因为 Dataset 模型的数据类型为 User (强数据类型),所以 RDD[User] 。
rdd: org.apache.spark.rdd.RDD[User] = MapPartitionsRDD[132] at rdd at <console>:26
因为 Dataset 模型的数据类型为 User (强数据类型),所以 RDD[User] 。
DataFrame转DataSet
scala> val ds = df.as[User]
ds: org.apache.spark.sql.Dataset[User] = [age: bigint, gender: bigint ... 2 more fields]
ds: org.apache.spark.sql.Dataset[User] = [age: bigint, gender: bigint ... 2 more fields]
DataSet转DataFrame
scala> val df = ds.toDF
df: org.apache.spark.sql.DataFrame = [id: int, name: string ... 2 more fields]
df: org.apache.spark.sql.DataFrame = [id: int, name: string ... 2 more fields]
0 条评论
下一页