sparkRDD复习
2024-06-23 21:06:41 2 举报
AI智能生成
SparkRDD复习涵盖了Apache Spark核心功能的关键概念,包括弹性分布式数据集(RDDs)。RDDs是Spark的基本数据结构,用于处理大规模数据。复习还涵盖了转换(如map、filter、reduceByKey)和行动(如collect、count、take)等操作。此外,复习还包括了对Spark SQL和DataFrame的介绍,它们提供了结构化数据处理的功能。复习资料还可能包括Spark Streaming和Spark MLlib等内容,它们分别用于处理实时数据和进行机器学习任务。这些内容对于理解Spark如何处理大数据以及如何在各种场景下应用Spark至关重要。
作者其他创作
大纲/内容
完整过程示例
自定义分区,打包jar,运行命令全过程示例
编译
打包jar包
sbt package
这会在 target/scala-2.12/ 目录下生成一个 JAR 文件,比如 your-spark-application_2.12-0.1.jar
运行命令
spark-submit --class PartitioningExample target/scala-2.12/your-spark-application_2.12-0.1.jar /path/to/input/file.txt /path/to/output/directory
--class PartitioningExample:指定要运行的主类,这里是 PartitioningExample
target/scala-2.12/your-spark-application_2.12-0.1.jar:你的应用程序的 JAR 文件
/path/to/input/file.txt:这是传递给程序的第一个参数,对应于 args(0)
/path/to/output/directory:这是传递给程序的第二个参数,对应于 args(1)
RDD创建读取存储数据
从内存中创建/读取RDD
将程序中已有的Seq集合(集合,列表,数组)转换为RDD
parallelize()方法
参数1:要转换的集合,必须是Seq集合
参数2:分区数,不设的话,默认为该程序分配到的资源的CPU核心数
makeRDD()方法
第一种用法和parallelize方法一致
第二种方法通过接受一个是Seq[(T,Seq[String])]参数类型创建RDD
T为数据值的类型
Seq[String]是该数据值对应(计算或存储)的分区位置(集群节点或本地分区)
对已有的RDD转换得到新的RDD
用zip方法将两个普通RDD组合成键值对RDD
具体内容见下面的大主题:RDD之间的操作
从外部读取数据创建RDD
通过SparkContext对象的textFile()方法读取数据集
读取hdfs文件创建RDD
通过textFile()方法读取HDFS文件的位置即可
读取本地文件创建RDD
也是通过sc.textFile("路径")的方法实现的,在路径前面加上“file://”表示从本地文件读取
但在spark-shell中,要求在所有节点的相同位置保存该文件才可以读取它
读取/存储万能方法
这个是文本文件的操作,但是其他任何文件也都可以先当作文本文件读取
读取
textFile
存储
saveAsTextFile
JSON文件的读取和存储
读取
读取json数据
首先,你需要使用Spark的SparkSession对象来读取JSON数据。这可以通过read()函数和json()方法来完成。
val spark = SparkSession.builder.appName("SparkRDDWithJSON").getOrCreate()
val jsonData = spark.read.json("path/to/json/file.json")
这段代码创建了一个名为SparkRDDWithJSON的Spark应用,并使用getOrCreate()方法创建了一个SparkSession对象。然后,使用read()函数和json()方法来读取JSON数据,并将其存储在jsonData变量中
val spark = SparkSession.builder.appName("SparkRDDWithJSON").getOrCreate()
val jsonData = spark.read.json("path/to/json/file.json")
这段代码创建了一个名为SparkRDDWithJSON的Spark应用,并使用getOrCreate()方法创建了一个SparkSession对象。然后,使用read()函数和json()方法来读取JSON数据,并将其存储在jsonData变量中
解析json数据
接下来,你需要解析JSON数据,将其转换为结构化的数据格式。Spark提供了select()函数来选择特定的列,以及from_json()函数将JSON字符串转换为结构化的数据格式。
import org.apache.spark.sql.functions._
val parsedData = jsonData.select(from_json(col("jsonColumn"), schema).alias("parsedData"))
这段代码导入所需的函数,并使用select()函数选择名为jsonColumn的列。然后,使用from_json()函数将该列中的JSON字符串转换为结构化的数据格式,并将结果存储在parsedData变量中
import org.apache.spark.sql.functions._
val parsedData = jsonData.select(from_json(col("jsonColumn"), schema).alias("parsedData"))
这段代码导入所需的函数,并使用select()函数选择名为jsonColumn的列。然后,使用from_json()函数将该列中的JSON字符串转换为结构化的数据格式,并将结果存储在parsedData变量中
处理json数据
在这一步中,你可以根据需求对数据进行处理,例如筛选特定的字段、过滤数据等。
val filteredData = parsedData.select("parsedData.field1", "parsedData.field2").filter("parsedData.field1 = 'value'")
这段代码使用select()函数选择parsedData中的特定字段,并使用filter()函数过滤出满足条件的数据。你可以根据实际需求进行相应的处理
val filteredData = parsedData.select("parsedData.field1", "parsedData.field2").filter("parsedData.field1 = 'value'")
这段代码使用select()函数选择parsedData中的特定字段,并使用filter()函数过滤出满足条件的数据。你可以根据实际需求进行相应的处理
转换为RDD
使用rdd()函数将DataFrame转换为RDD。
val rddData = filteredData.rdd
这段代码将filteredData转换为RDD,并将结果存储在rddData变量中。
进行操作 | 在这一步中,你可以对RDD执行各种操作,例如map()、reduce()、filter()等。
val result = rddData.filter(_.startsWith("value")).count()
这段代码使用filter()函数过滤出以"value"开头的数据,并使用count()函数计算符合条件的数据的数量。你可以根据实际需求进行相应的操作
val rddData = filteredData.rdd
这段代码将filteredData转换为RDD,并将结果存储在rddData变量中。
进行操作 | 在这一步中,你可以对RDD执行各种操作,例如map()、reduce()、filter()等。
val result = rddData.filter(_.startsWith("value")).count()
这段代码使用filter()函数过滤出以"value"开头的数据,并使用count()函数计算符合条件的数据的数量。你可以根据实际需求进行相应的操作
返回结果
result
存储
// 写入 JSON 文件
val outputPath = "path/to/your/output.json"
filteredDF.write.json(outputPath)
val outputPath = "path/to/your/output.json"
filteredDF.write.json(outputPath)
CSV文件的读取和存储
读取
与上面类似改为:read.csv这样
存储
与上面类似改为:write.csv这样
SequenceFile文件的读取和存储
读取
读取
// 读取 SequenceFile
val inputPath = "hdfs://path/to/your/input/sequencefile"
val sequenceFileRDD = sc.sequenceFile[String, String](inputPath)
val inputPath = "hdfs://path/to/your/input/sequencefile"
val sequenceFileRDD = sc.sequenceFile[String, String](inputPath)
显示
// 显示原始数据
println("Original Data:")
sequenceFileRDD.foreach { case (key, value) =>
println(s"Key: $key, Value: $value")
}
println("Original Data:")
sequenceFileRDD.foreach { case (key, value) =>
println(s"Key: $key, Value: $value")
}
处理
// 进行一些简单的处理,例如过滤以 "key2" 开头的键
val filteredRDD = sequenceFileRDD.filter { case (key, _) => key.startsWith("key2") }
// 显示处理后的数据
println("Filtered Data:")
filteredRDD.foreach { case (key, value) =>
println(s"Key: $key, Value: $value")
}
val filteredRDD = sequenceFileRDD.filter { case (key, _) => key.startsWith("key2") }
// 显示处理后的数据
println("Filtered Data:")
filteredRDD.foreach { case (key, value) =>
println(s"Key: $key, Value: $value")
}
存储
// 将处理后的数据存储为新的 SequenceFile
val outputPath = "hdfs://path/to/save/output/sequencefile"
filteredRDD.saveAsSequenceFile(outputPath)
val outputPath = "hdfs://path/to/save/output/sequencefile"
filteredRDD.saveAsSequenceFile(outputPath)
完整示例
文本文件的读取和存储
就是上面的万能方法
RDD操作数据
懒操作/转换操作
map()方法
遍历RDD对每个元素进行某种函数操作
使用点操作符连接RDD,括号里是函数
sortBy()方法
第一个参数是x => K,x为每个RDD元素,k为要排序的元素
x=> x,表示就是x自己要排序
x=>x._2表示真正要排序的元素是x这个元素里面索引第二个元素
除了索引也可以进行其他处理,比如split切分之类的
第二个参数(可选)ascending,true升序(默认)
第三个参数(可选)numPartitions,排序后RDD分区个数
flatMap()方法
主要起遍历转换元素和展平嵌套列表结构的作用,只是根据括号内容不同而结果不同
切分单词作用
每行文本切分为单词并展平成一个单词的 RDD。
展平嵌套列表
过滤数据
filter()方法
过滤元素,接受一个参数为过滤的函数,返回值为Boolean(true,false)类型
例如:过滤每个元组第二个值小于等于1的元素,也就是保留下大于1的元素
rdd1.filter(_._2 > 1).collect
rdd2.filter(x => x._2 > 1).collect
distinct()方法
去重,去除RDD中相同的元素只留下一个,没有参数,但要写括号
例如:rdd.distinct().collect
combineByKey()方法
它接收三个参数,例如在进行累加操作时
combineByKey 会将所有不重复的键的一个值或第一个值放入第一个函数中做参数,然后将第一个函数返回的初始累加器放入第二个函数中做参数,并且将对应键的当前分区的所有值也送入这个函数做参数,然后第二个函数最后会返回每个分区每个键的总的累加器,然后combineByKey 会将这些累加器两个两个的放入第三个函数,将所有累加器合并成最后的累加器通过combineByKey 返回出来
这个combineByKey 还有一个特点就是函数在括号里面调用好像是不用自己写参数进去的,直接写个函数名就像,不用加括号
用法示例
累加示例
也不一定做累加操作,第一个函数,返回的也不一定是累加器,也可以是一个值,例如求最大值
也可以自定义数据结构
启动操作/行动操作
collect()方法
将RDD中所有元素转换成数组并返回到Driver端,适用于返回处理后的少量数据
使用
直接调用,sql.list.collect,返回一个数组
再括号内写函数(函数名调用或匿名函数都行)。这个函数一般用于数据清洗处理
创建函数:val one: PartialFunction[Int, String] = {
case 1 => "one"
case _ => "other"
}这个函数一般用于数据清洗/处理/标记,这里是标记,但也可以用于删除不需要的数据。
case 1 => "one"
case _ => "other"
}这个函数一般用于数据清洗/处理/标记,这里是标记,但也可以用于删除不需要的数据。
使用:val numbersRDD: RDD[Int] = ... // 一个包含整数的 RDD
// 使用 collect 和偏函数 one,将整数转换为字符串
val transformedRDD: RDD[String] = numbersRDD.collect(one)
// 使用 collect 和偏函数 one,将整数转换为字符串
val transformedRDD: RDD[String] = numbersRDD.collect(one)
take()方法
点操作符连接,括号里是获取前几个元素,最后返回数组
lookup()方法
查找键值对RDD中指定的键的所有值
接收一个元素:"查找的键",返回的是这个键的所有的值的列表
RDD之间的操作
union合并RDD
合并整数RDD
合并元组RDD
两个RDD的元素类型必须一致
括号外的RDD的元素排在前面
join连接两个键值对RDD
内连接join
返回两个RDD相同键的值的列表(相同的键,(这个键在两个RDD中所有值的列表))
返回的是一个列表里面有一个一个键值对括号,每个括号里面第一个元素为键,第二个元素为这个键所有的值的列表
例如返回:((a,(1,1,……)),(b,(4,2,……)),……)这类的
右外连接rightOuterJoin
rdd1.rightOuterJoin(rdd2)
rdd1中有相同的键时
rdd2中有相同的键时
左外连接leftOuterJoin
与右外连接类似
全外连接fullOuterJoin
rdd1.fullOuterJoin(rdd2)
rdd1中有相同的键时
rdd2中有相同的键时
intersection求交集
求出两个RDD共同的元素
参数为另一个RDD,例:rdd1.intersection(rdd2).collect
substract删除相同元素
将前一个RDD中在后一个RDD出现的元素删除
返回值为前一个RDD去除与后一个RDD相同元素后的剩余值所组成的新的RDD。两个RDD的顺序会影响结果
cartesian()方法求笛卡尔积
笛卡尔积就是求两个集合两两组合的所有可能组合
用zip方法将两个普通RDD组合成键值对RDD
将两个RDD组合成键值对RDD,要求两个RDD的分区数量以及元素数量相同,否则会抛出异常
括号外的为键,括号内的为值
键值对RDD及其转换操作
概要
➢键值对RDD存储二元组,二元组分为键和值,RDD的基本转换操作对于键值对RDD也同样适用
➢因为键值对RDD中包含的是二元组,所以需要传递的函数会由原来的操作单个元素的函数改为操作二元组的函数
➢Spark的大部分RDD操作都支持所有种类的单值RDD,但是有少部分特殊的操作只能作用于键值对类型的RDD
➢顾名思义,键值对RDD由一组组的键值对组成,这些RDD被称为PairRDD。PairRDD提供了并行操作各个键或跨节点重新进行数据分组的操作接口
➢例如,PairRDD提供了reduceByKey()方法,可以分别规约每个键对应的数据,还有join()方法,可以把两个RDD中键相同的元素组合在一起,合并为一个RDD
创建
将普通RDD使用map转换为键值对RDD
根据现有数据创建
从文本文件数据中生成
用zip方法将两个普通RDD组合成键值对RDD
具体的见上一个大主题:RDD之间的操作
keys和values方法
keys方法将所有键合在一起作为RDD返回
values方法将所有值合在一起作为RDD返回
reduceByKey()方法
点操作符连接RDD,带一个函数参数
一般用于统计学生总成绩之类的,键为学生,值为某个科目的成绩
reduceByKey 操作会对键值对 RDD 中的每个键进行分组,将具有相同键的值聚集到一起,并对这些值应用一个指定的合并函数来进行聚合。然后返回一个新的键值对 RDD,其中每个键对应的是通过函数合并后的单个值。
作用:将具有相同键的值锁定,然后一个一个放入函数中计算,把所有键的总值算完后合并以及返回新的键值对RDD,只是这个里面不会出现重复的键了
groupByKey()方法
单纯是对具有相同键的值进行分组
返回的结果类型为((键,{值的集合例如1,2,3……}),(同上),……)
返回类型的键也不会重复,然后原本值的位置变为了值的集合用{}包起来
join连接两个键值对RDD
具体内容见上一个大主题:RDD之间的操作里面
lookup()查找指定键的所有值
具体内容见上面的大主题:RDD操作数据
RDD持久化
cache()和persist()方法
cache()是默认级别MEMORY_ONLY,persist()可以在括号里指定级别
持久级别
MEMORY_ONLY
MEMOTY_ONLY_SER
MEMORY_AND_DISK
MEMORY_AND_DISK_SER
DISK_ONLY
调用
持久化
val rdd = sc.parallelize(Seq(1, 2, 3, 4, 5))
val initialSum = rdd.sum()
initialSum.persist() // 持久化初次计算的总和
val initialSum = rdd.sum()
initialSum.persist() // 持久化初次计算的总和
调用
// 假设新的数据到来了
val newRdd = sc.parallelize(Seq(6, 7, 8))
// 计算新数据的总和
val newSum = newRdd.sum()
// 更新持久化的总和
val updatedSum = initialSum + newSum
// 将更新后的总和再次持久化(如果需要)
updatedSum.persist()
val newRdd = sc.parallelize(Seq(6, 7, 8))
// 计算新数据的总和
val newSum = newRdd.sum()
// 更新持久化的总和
val updatedSum = initialSum + newSum
// 将更新后的总和再次持久化(如果需要)
updatedSum.persist()
broadcast广播变量
作用意义
广播变量用于将一个只读变量高效地分发到集群中的所有节点上。在一些情况下,我们需要在每个节点上多次使用一个较小的只读数据集(例如,一个查找表、配置参数或机器学习模型参数)。如果不使用广播变量,这些数据会在每个任务中通过网络重复传输,这样会导致大量的网络开销和性能下降。
场景
小型只读数据集:当你有一个相对较小的数据集,并且需要在每个任务中多次访问该数据集时,使用广播变量是非常合适的。这些数据集包括查找表、配置参数、机器学习模型参数等
避免重复传输:如果不使用广播变量,每个任务可能会在每次执行时通过网络传输相同的数据。这种重复传输会消耗网络带宽并导致性能瓶颈。广播变量可以将数据一次性发送到每个节点,从而避免重复传输
保持一致性:在需要确保数据一致性的场景下,广播变量是一个有效的解决方案。广播变量可以在分布式环境中保持一致性,即所有节点上的任务都会看到相同的数据副本
提高效率:频繁访问外部资源(如数据库、文件系统等)可能会导致性能瓶颈。通过将这些数据加载到内存中并以广播变量的形式分发,可以显著提高计算效率
使用示例
销毁广播变量释放内存
broadcastVar.destroy()
设置数据分区
分区ID:0~numPartitions-1,分区个数numPartitions
设置分区方式partitionBy()这个需要传递一个分区方式作为参数
分区方式
不做设置默认
Spark会根据数据的来源和当前集群的配置自动选择默认的分区方式。例如,如果你从HDFS读取数据,Spark会尝试使用HDFS文件的分片信息来决定分区方式;如果是通过并行化集合创建RDD,通常会根据集群的总核心数等因素进行分区。
哈希分区
哈希分区 (HashPartitioner)根据数据的哈希值来决定分区。
范围分区
范围分区 (RangePartitioner): 按数据的范围进行分区,这对于按顺序或按范围查询的数据非常有用。
自定义分区
自定义分区需要继承org.apach.spark.Partitioner类并实现其中的三个方法:def numPartitions:Int:返回想要创建的分区个数,def getPartition(key:Any):这个函数需要对输入的key做处理,然后返回该key的分区ID,范围一定是0~numPartitions-1。equals(oter:Any):这个是java标准的判断相等的函数,之所以要求用户实现这个函数,是因为spark内部会比较两个RDD的分区是否一样。
示例
coalesce(numPartitions:Int,shuffle:boolean=false)重分区
➢该函数用于将RDD 进行重分区,使用HashPartitioner分区方式
➢第一个参数为重分区的数目
➢第二个为是否进行shuffle,默认为false
➢shuffle 为false 时,重设分区个数只能比RDD 原有分区数小,如果要重设的分区个数大于RDD 原有的分区数目,RDD的分区数将不变
➢如果shuffle 为true 时,重设的分区数不管比原有的RDD 分区数多或者少,RDD 都可以重设分区个数
repartition(numPartitions:Int)重分区
➢repartition(num)的方法对数据进行重新分区
➢该函数其实就是coalesce函数第二个参数为true的实现,但重设分区个数没有限制
自定义分区,打包jar,运行命令全过程示例
编译
打包jar包
sbt package
这会在 target/scala-2.12/ 目录下生成一个 JAR 文件,比如 your-spark-application_2.12-0.1.jar
运行命令
spark-submit --class PartitioningExample target/scala-2.12/your-spark-application_2.12-0.1.jar /path/to/input/file.txt /path/to/output/directory
--class PartitioningExample:指定要运行的主类,这里是 PartitioningExample
target/scala-2.12/your-spark-application_2.12-0.1.jar:你的应用程序的 JAR 文件
/path/to/input/file.txt:这是传递给程序的第一个参数,对应于 args(0)
/path/to/output/directory:这是传递给程序的第二个参数,对应于 args(1)
0 条评论
下一页