SparkSQL
2024-06-23 21:11:49 1 举报
AI智能生成
SparkSQL是Apache Spark的一个功能模块,用于进行大规模结构化数据处理。它提供了类似于SQL的查询语言,允许用户方便地进行数据查询、转换和聚合操作。此外,SparkSQL还可以处理来自不同数据源(如Hive、Parquet、JSON等)的数据,以及支持用户定义函数(UDF)和窗口函数等高级特性。这使得SparkSQL在处理大规模数据集时具有高效性、灵活性和高可扩展性。
作者其他创作
大纲/内容
基本概念
➢Spark SQL是一个用来处理结构化数据的Spark框架
➢可被视为一个分布式的SQL查询引擎,并且提供了一个抽象的可编程数据模型DataFrame
➢Spark SQL可以直接处理RDD,也可以处理Parquet文件或者JSON文件,甚至可以处理外部数据库中的数据以及Hive中存在的表数据
DataFrame
概念
➢Spark SQL提供了一个抽象的编程数据模型DataFrame,DataFrame是由SchemaRDD发展而来的,从Spark 1.3.0开始,SchemaRDD更名为DataFrame。SchemaRDD直接继承自RDD,而DataFrame则自身实现RDD的绝大多数功能
➢可以将Spark SQL的DataFrame理解为一个分布式的Row对象的数据集合,该数据集合提供了由列组成的详细模式信息
DataFrame与Schema与RDD
RDD
特点
弹性分布式数据集:RDD是一个不可变的分布式集合,可以容忍故障并自动恢复
操作类型:支持两种类型的操作:转换操作(如map、filter)和行动操作(如collect、count)
类型安全:RDD中的每个元素都是类型化的,且类型在编译时得到检查
低级API:提供了更多对数据处理的控制,但编写起来相对繁琐,特别是对于复杂的查询和聚合操作
适用场景
需要精细控制数据分区和计算过程
需要处理非结构化或半结构化数据
高度定制化的计算操作
示例
val rdd = spark.sparkContext.parallelize(Seq((1, "Alice"), (2, "Bob"), (3, "Charlie")))
val namesRDD = rdd.map { case (id, name) => name }
namesRDD.collect().foreach(println)
val namesRDD = rdd.map { case (id, name) => name }
namesRDD.collect().foreach(println)
DataFrame
特点
结构化数据集:DataFrame是一个分布式的行列式数据集,类似于关系数据库的表格或Pandas的DataFrame
高层API:提供了更高级别的API,简化了数据处理和查询操作
优化器:利用Catalyst优化器来进行查询优化,能提高性能
灵活性:可以使用SQL语法进行查询,便于数据分析师和工程师使用
适用场景
处理结构化和半结构化数据
需要高效地进行查询和聚合操作
想要使用SQL风格的API进行数据处理
示例
import spark.implicits._
val df = Seq((1, "Alice"), (2, "Bob"), (3, "Charlie")).toDF("id", "name")
df.select("name").show()
val df = Seq((1, "Alice"), (2, "Bob"), (3, "Charlie")).toDF("id", "name")
df.select("name").show()
Schema
特点
数据结构定义:Schema定义了DataFrame或Dataset的结构,包括字段名称、类型及是否允许为空
元数据:包含关于DataFrame或Dataset的元数据信息,如字段类型和注释等
灵活性:可以通过编程方式显式定义,也可以通过反射机制自动推断
适用场景
需要对数据集的结构进行明确约束和验证
在读取外部数据源(如文件或数据库)时,需要自定义数据结构
示例
import org.apache.spark.sql.types._
val schema = StructType(Array(
StructField("id", IntegerType, true),
StructField("name", StringType, true)
))
val data = spark.sparkContext.parallelize(Seq(Row(1, "Alice"), Row(2, "Bob"), Row(3, "Charlie")))
val dfWithSchema = spark.createDataFrame(data, schema)
dfWithSchema.show()
val schema = StructType(Array(
StructField("id", IntegerType, true),
StructField("name", StringType, true)
))
val data = spark.sparkContext.parallelize(Seq(Row(1, "Alice"), Row(2, "Bob"), Row(3, "Charlie")))
val dfWithSchema = spark.createDataFrame(data, schema)
dfWithSchema.show()
创建
通过结构化数据文件创建DataFrame
➢一般情况下,结构化数据文件存储在HDFS中,较为常见的结构化数据文件是Parquet文件或JSON文件
➢Spark SQL可以通过load()方法将HDFS上的结构化文件数据转换为DataFrame,load()方法默认导入的文件格式是Parquet
➢若加载JSON格式的文件数据,将其转换为DataFrame,则还需要使用format()方法
➢也可以直接使用json()方法将JSON文件数据转换为DataFrame
示例
使用 load() 方法加载 Parquet 文件
子主题
使用 format() 和 load() 方法加载 JSON 文件
使用 json() 方法直接加载 JSON 文件
通过外部数据库创建DataFrame
➢Spark SQL可以从外部数据库(比如MySQL、Oracle等数据库)中创建DataFrame
➢使用这种方式创建DataFrame需要通过JDBC连接或ODBC连接的方式访问数据库
示例
通过RDD创建DataFrame
自动指定Schema
利用反射机制推断RDD模式,使用这种方式首先需要定义一个case class,因为只有case class才能被Spark隐式地转换为DataFrame
示例
手动指定
示例
通过Hive中的·表创建DataFrame
使用SparkSession对象并调用sql方法查询Hive表的数据并转换为DataFrame
示例
#选择Hive中的test数据库
spark.sql("use test")
#讲Hive中test数据库中的people表转换为DataFrame
val people = spark,sql("select * from people")
spark.sql("use test")
#讲Hive中test数据库中的people表转换为DataFrame
val people = spark,sql("select * from people")
DataFrame操作
DataFrame查看/操作数据的方法
printSchema查看数据模式,会打印出列的名称和类型
show显示数据
show()和show(true)的作用都是显示前20条数据
show(false)显示所有数据
show(n),显示前n行的数据,n为int类型
show(n:Int,truncate: Boolean)
n是显示前n行
truncate: Boolean:控制是否截断列内容。如果设置为 true,则超过列宽度的内容会被截断显示。如果设置为 false,则会显示完整的列内容
first()获取第一行记录
无参数
head(n:Int)获取前n行记录
take(n:Int)获取前n行记录
takeAsList(n:Int)获取前n行数据,并以列表形式展现
collect()获取所有数据
无参数,将DataFrame的所有数据都获取到,并返回一个数组
collectAsList()获取所有数据
无参数,获取所有数据返回一个列表
将DataFrame注册为临时表,通过sql语句查询
regisrerTemTable方法注册为临时表
DataFrame.registerTemTable("TableName"),括号内是表名
sql语言查询,使用sqlContext方法,最后.rdd转换为RDD
val RDD = sqlContext.sql("select name,age from TableName where age > 20").rdd
查询结果
RDD.collect
DataFrame专门提供的查询方法
where条件查询
直接.where("条件" and/or "条件1")
括号里写条件,条件之间可以使用and或者or
返回的类型还是DataFrame类型
filter和where类似
select获取指定字段值
user.select("字段1",("字段2"),……)
获取指定字段的值,可以有多个,最后以DataFrame类型返回
selectExpr对指定字段进行特殊处理
传入字符串类型的参数,返会DtaFrame类型
字符串是数据的字段,字符串也可以加上UDF函数或者指定别名的代码
示例
col()和apply()获取一个字段
只能获取一个指定字段,返回对象为Column类型
使用 col 或者 apply 方法来获取DataFrame中的指定字段返回的是 Column 类型,而不是实际的数据。Column 类型代表DataFrame中的一列,并且提供了丰富的操作和转换方法,但它并不包含实际的数据值
当你调用 df.select(nameColumn, ageColumn) 时,实际上是构建了一个新的DataFrame,其中只包含了指定的列,而并没有实际取出这些列的数据。然后,调用 show() 方法来显示这个新的DataFrame中的数据
写法:val userCol = user.col("指定字段")同理可以换成apply
查看结果:df.select(userCol).collect
懒/转换操作:limit获取前n行数据
df.limit(n)获取前n行数据,形成一个新的DataFrame
orderBy字段排序
默认升序,可以有多个字段,逗号连接
降序
desc("字段")
import org.apache.spark.sql.functions.desc
df.orderBy(desc("age")).show()
df.orderBy(desc("age")).show()
$"字段".desc
df.orderBy($"age".desc).show()
-$"字段"
在字段名称前加 - 表示降序排序,这种用法更为简洁,但需要依赖于 Scala 的隐式转换机制,将 Column 对象进行取反操作实现降序排序。直接在字符串前加 - 无法生效,需要结合 $ 符号来引用列对象
df.orderBy(-$"age").show()
sort()方法和orderBy相同
groupBy字段分组
根据字段分组
参数可以是String类型的字段名或者Column类型的对象,参数可以有多个
字符串字段分组
一个参数
grouped_df = df.groupBy("department").sum("salary")根据字段分组并且计算这个字段的总salary薪水
多个参数
grouped_df_multiple = df.groupBy("department", "name").sum("salary")大概和上面差不多,多个字段分组
按Column对象分组
单个对象
grouped_df_col = df.groupBy(col("department")).sum("salary")
多个对象
grouped_df_col_multiple = df.groupBy(col("department"), col("name")).sum("salary")
返回的是GroupedData对象
GroupedData的操作方法
max("字段")
获取分组后,max里指定字段的最大值
min("字段")
获取分组后min里指定字段的最小值
mean("字段")
获取分组后,mean里指定最短的均值
sum("字段")
获取分组后,指定sum里面指定字段的和值
grouped_df_multiple = df.groupBy("department", "name").sum("salary")
grouped_df_col = df.groupBy(col("department")).sum("salary")
grouped_df_col_multiple = df.groupBy(col("department"), col("name")).sum("salary")
count()
获取分组中的元素个数
join连接表
三个参数:第一个必写,参数为另一个DataFrame,DataFrame1.join(DataFrame2)
给两个表做笛卡尔积连接
最终的表的行数是第一个表的行数乘以第二个表的行数
简单来说就是给两个表的每行数据当成一个独立的单个的数据然后做分组,找出所有的组合
参数二可选是一个Column对象
根据两表中相同的某个字段进行连接
joined_df = orders_df.join(customers_df, orders_df["customer_id"] == customers_df["customer_id"], "inner")
参数三可选是一个字符串,表示指定的连接类型
连接类型joinType只能是inner, cross, outer, full, fullouter, full_outer, left, leftouter, left_outer, right, rightouter, right_outer, semi, leftsemi, left_semi, anti, leftanti and left_anti.中的一种
inner: 内连接,仅返回两张表中符合连接条件的行
cross: 笛卡尔积连接,没有连接条件,会返回所有可能的行组合
outer / full / fullouter / full_outer: 完全外连接,返回左表和右表中的所有行,并且对于没有匹配的行,结果中包含 Null 值
left / leftouter / left_outer: 左外连接,返回左表中的所有行以及右表中符合连接条件的行,对于没有匹配的右表行,结果中包含 Null 值
right / rightouter / right_outer: 右外连接,返回右表中的所有行以及左表中符合连接条件的行,对于没有匹配的左表行,结果中包含 Null 值
semi / leftsemi / left_semi: 半连接,返回左表中有匹配右表记录的行,不包括右表的列
anti / leftanti / left_anti: 反连接,返回左表中没有匹配右表记录的行
示例:
DataFrame输出操作/保存
save()方法保存为文件
➢将DataFrame数据保存为文件,实现步骤如下
•首先创建一个映射对象,用于存储save()方法需要用到的数据,这里将指定文件的头信息及文件的保存路径
•从user数据中选择出userId、gender和age这3列字段的数据
•调用save()方法将步骤(2)中的DataFrame数据保存至copyOfUser.json文件夹中
•在HDFS的/user/root/sparkSql目录下查看保存结果
完整示例
如果文件保存成功,你应该能够看到文件和文件夹列表。同时,可以使用 hdfs dfs -cat 命令来查看文件内容:
hdfs dfs -cat /user/root/sparkSql/copyOfUser.json/part-*.json
hdfs dfs -cat /user/root/sparkSql/copyOfUser.json/part-*.json
saveAsTable方法持久化为表
概念
➢saveAsTable()方法可以将DataFrame数据保存成持久化的表,并在Hive的元数据库中创建一个指针指向该表的位置,持久化的表会一直保留,即使Spark程序重启也没有影响,只要连接至同一个元数据服务即可读取表数据
➢读取持久化表时,只需要用表名作为参数,调用spark.table()方法方法即可得到对应DataFrame
➢默认情况下,saveAsTable()方法会创建一个内部表,表数据的位置是由元数据服务控制的。如果删除表,那么表数据也会同步删除
示例
0 条评论
下一页