sparkRDD复习
2024-06-23 21:06:41 0 举报
AI智能生成
登录查看完整内容
SparkRDD复习涵盖了Apache Spark核心功能的关键概念,包括弹性分布式数据集(RDDs)。RDDs是Spark的基本数据结构,用于处理大规模数据。复习还涵盖了转换(如map、filter、reduceByKey)和行动(如collect、count、take)等操作。此外,复习还包括了对Spark SQL和DataFrame的介绍,它们提供了结构化数据处理的功能。复习资料还可能包括Spark Streaming和Spark MLlib等内容,它们分别用于处理实时数据和进行机器学习任务。这些内容对于理解Spark如何处理大数据以及如何在各种场景下应用Spark至关重要。
作者其他创作
大纲/内容
编译
这会在 target/scala-2.12/ 目录下生成一个 JAR 文件,比如 your-spark-application_2.12-0.1.jar
sbt package
打包jar包
spark-submit --class PartitioningExample target/scala-2.12/your-spark-application_2.12-0.1.jar /path/to/input/file.txt /path/to/output/directory
--class PartitioningExample:指定要运行的主类,这里是 PartitioningExample
target/scala-2.12/your-spark-application_2.12-0.1.jar:你的应用程序的 JAR 文件
/path/to/input/file.txt:这是传递给程序的第一个参数,对应于 args(0)
/path/to/output/directory:这是传递给程序的第二个参数,对应于 args(1)
运行命令
自定义分区,打包jar,运行命令全过程示例
完整过程示例
参数1:要转换的集合,必须是Seq集合
参数2:分区数,不设的话,默认为该程序分配到的资源的CPU核心数
parallelize()方法
第一种用法和parallelize方法一致
T为数据值的类型
Seq[String]是该数据值对应(计算或存储)的分区位置(集群节点或本地分区)
makeRDD()方法
将程序中已有的Seq集合(集合,列表,数组)转换为RDD
具体内容见下面的大主题:RDD之间的操作
用zip方法将两个普通RDD组合成键值对RDD
对已有的RDD转换得到新的RDD
从内存中创建/读取RDD
通过textFile()方法读取HDFS文件的位置即可
读取hdfs文件创建RDD
也是通过sc.textFile(\"路径\")的方法实现的,在路径前面加上“file://”表示从本地文件读取
但在spark-shell中,要求在所有节点的相同位置保存该文件才可以读取它
读取本地文件创建RDD
通过SparkContext对象的textFile()方法读取数据集
从外部读取数据创建RDD
这个是文本文件的操作,但是其他任何文件也都可以先当作文本文件读取
textFile
读取
saveAsTextFile
存储
读取/存储万能方法
首先,你需要使用Spark的SparkSession对象来读取JSON数据。这可以通过read()函数和json()方法来完成。val spark = SparkSession.builder.appName(\"SparkRDDWithJSON\").getOrCreate()val jsonData = spark.read.json(\"path/to/json/file.json\")这段代码创建了一个名为SparkRDDWithJSON的Spark应用,并使用getOrCreate()方法创建了一个SparkSession对象。然后,使用read()函数和json()方法来读取JSON数据,并将其存储在jsonData变量中
读取json数据
接下来,你需要解析JSON数据,将其转换为结构化的数据格式。Spark提供了select()函数来选择特定的列,以及from_json()函数将JSON字符串转换为结构化的数据格式。import org.apache.spark.sql.functions._val parsedData = jsonData.select(from_json(col(\"jsonColumn\
解析json数据
在这一步中,你可以根据需求对数据进行处理,例如筛选特定的字段、过滤数据等。val filteredData = parsedData.select(\"parsedData.field1\
处理json数据
使用rdd()函数将DataFrame转换为RDD。val rddData = filteredData.rdd这段代码将filteredData转换为RDD,并将结果存储在rddData变量中。进行操作 | 在这一步中,你可以对RDD执行各种操作,例如map()、reduce()、filter()等。val result = rddData.filter(_.startsWith(\"value\")).count()这段代码使用filter()函数过滤出以\"value\"开头的数据,并使用count()函数计算符合条件的数据的数量。你可以根据实际需求进行相应的操作
转换为RDD
result
返回结果
// 写入 JSON 文件 val outputPath = \"path/to/your/output.json\" filteredDF.write.json(outputPath)
JSON文件的读取和存储
与上面类似改为:read.csv这样
与上面类似改为:write.csv这样
CSV文件的读取和存储
// 读取 SequenceFile val inputPath = \"hdfs://path/to/your/input/sequencefile\
// 显示原始数据 println(\"Original Data:\
显示
// 进行一些简单的处理,例如过滤以 \"key2\
处理
// 将处理后的数据存储为新的 SequenceFile val outputPath = \"hdfs://path/to/save/output/sequencefile\" filteredRDD.saveAsSequenceFile(outputPath)
完整示例
SequenceFile文件的读取和存储
就是上面的万能方法
文本文件的读取和存储
RDD创建读取存储数据
SequenceFile 是 Hadoop 提供的一种二进制文件格式,用于存储键值对数据。它通常用于在 Hadoop 系统中进行高效的数据存储和交换。SequenceFile 可以包含任意类型的键值对,因此它非常灵活
遍历RDD对每个元素进行某种函数操作
使用点操作符连接RDD,括号里是函数
map()方法
x=> x,表示就是x自己要排序
x=>x._2表示真正要排序的元素是x这个元素里面索引第二个元素
除了索引也可以进行其他处理,比如split切分之类的
第一个参数是x => K,x为每个RDD元素,k为要排序的元素
第二个参数(可选)ascending,true升序(默认)
第三个参数(可选)numPartitions,排序后RDD分区个数
sortBy()方法
每行文本切分为单词并展平成一个单词的 RDD。
切分单词作用
展平嵌套列表
过滤数据
主要起遍历转换元素和展平嵌套列表结构的作用,只是根据括号内容不同而结果不同
flatMap()方法
rdd1.filter(_._2 > 1).collect
rdd2.filter(x => x._2 > 1).collect
例如:过滤每个元组第二个值小于等于1的元素,也就是保留下大于1的元素
filter()方法
去重,去除RDD中相同的元素只留下一个,没有参数,但要写括号
例如:rdd.distinct().collect
distinct()方法
combineByKey 会将所有不重复的键的一个值或第一个值放入第一个函数中做参数,然后将第一个函数返回的初始累加器放入第二个函数中做参数,并且将对应键的当前分区的所有值也送入这个函数做参数,然后第二个函数最后会返回每个分区每个键的总的累加器,然后combineByKey 会将这些累加器两个两个的放入第三个函数,将所有累加器合并成最后的累加器通过combineByKey 返回出来
它接收三个参数,例如在进行累加操作时
这个combineByKey 还有一个特点就是函数在括号里面调用好像是不用自己写参数进去的,直接写个函数名就像,不用加括号
累加示例
也不一定做累加操作,第一个函数,返回的也不一定是累加器,也可以是一个值,例如求最大值
也可以自定义数据结构
用法示例
combineByKey()方法
懒操作/转换操作
将RDD中所有元素转换成数组并返回到Driver端,适用于返回处理后的少量数据
直接调用,sql.list.collect,返回一个数组
使用:val numbersRDD: RDD[Int] = ... // 一个包含整数的 RDD// 使用 collect 和偏函数 one,将整数转换为字符串val transformedRDD: RDD[String] = numbersRDD.collect(one)
再括号内写函数(函数名调用或匿名函数都行)。这个函数一般用于数据清洗处理
使用
collect()方法
点操作符连接,括号里是获取前几个元素,最后返回数组
take()方法
接收一个元素:\"查找的键\",返回的是这个键的所有的值的列表
查找键值对RDD中指定的键的所有值
lookup()方法
启动操作/行动操作
RDD操作数据
合并整数RDD
合并元组RDD
两个RDD的元素类型必须一致
括号外的RDD的元素排在前面
union合并RDD
返回的是一个列表里面有一个一个键值对括号,每个括号里面第一个元素为键,第二个元素为这个键所有的值的列表
内连接join
rdd1中有相同的键时
rdd2中有相同的键时
rdd1.rightOuterJoin(rdd2)
右外连接rightOuterJoin
与右外连接类似
左外连接leftOuterJoin
rdd1.fullOuterJoin(rdd2)
全外连接fullOuterJoin
join连接两个键值对RDD
求出两个RDD共同的元素
参数为另一个RDD,例:rdd1.intersection(rdd2).collect
intersection求交集
将前一个RDD中在后一个RDD出现的元素删除
返回值为前一个RDD去除与后一个RDD相同元素后的剩余值所组成的新的RDD。两个RDD的顺序会影响结果
substract删除相同元素
笛卡尔积就是求两个集合两两组合的所有可能组合
cartesian()方法求笛卡尔积
将两个RDD组合成键值对RDD,要求两个RDD的分区数量以及元素数量相同,否则会抛出异常
括号外的为键,括号内的为值
RDD之间的操作
➢键值对RDD存储二元组,二元组分为键和值,RDD的基本转换操作对于键值对RDD也同样适用
➢因为键值对RDD中包含的是二元组,所以需要传递的函数会由原来的操作单个元素的函数改为操作二元组的函数
➢Spark的大部分RDD操作都支持所有种类的单值RDD,但是有少部分特殊的操作只能作用于键值对类型的RDD
➢顾名思义,键值对RDD由一组组的键值对组成,这些RDD被称为PairRDD。PairRDD提供了并行操作各个键或跨节点重新进行数据分组的操作接口
➢例如,PairRDD提供了reduceByKey()方法,可以分别规约每个键对应的数据,还有join()方法,可以把两个RDD中键相同的元素组合在一起,合并为一个RDD
概要
将普通RDD使用map转换为键值对RDD
根据现有数据创建
从文本文件数据中生成
具体的见上一个大主题:RDD之间的操作
创建
keys方法将所有键合在一起作为RDD返回
values方法将所有值合在一起作为RDD返回
keys和values方法
reduceByKey 操作会对键值对 RDD 中的每个键进行分组,将具有相同键的值聚集到一起,并对这些值应用一个指定的合并函数来进行聚合。然后返回一个新的键值对 RDD,其中每个键对应的是通过函数合并后的单个值。
一般用于统计学生总成绩之类的,键为学生,值为某个科目的成绩
作用:将具有相同键的值锁定,然后一个一个放入函数中计算,把所有键的总值算完后合并以及返回新的键值对RDD,只是这个里面不会出现重复的键了
点操作符连接RDD,带一个函数参数
reduceByKey()方法
单纯是对具有相同键的值进行分组
返回类型的键也不会重复,然后原本值的位置变为了值的集合用{}包起来
groupByKey()方法
具体内容见上一个大主题:RDD之间的操作里面
具体内容见上面的大主题:RDD操作数据
lookup()查找指定键的所有值
键值对RDD及其转换操作
cache()是默认级别MEMORY_ONLY,persist()可以在括号里指定级别
cache()和persist()方法
MEMORY_ONLY
MEMOTY_ONLY_SER
MEMORY_AND_DISK
MEMORY_AND_DISK_SER
DISK_ONLY
持久级别
持久化
调用
广播变量用于将一个只读变量高效地分发到集群中的所有节点上。在一些情况下,我们需要在每个节点上多次使用一个较小的只读数据集(例如,一个查找表、配置参数或机器学习模型参数)。如果不使用广播变量,这些数据会在每个任务中通过网络重复传输,这样会导致大量的网络开销和性能下降。
作用意义
小型只读数据集:当你有一个相对较小的数据集,并且需要在每个任务中多次访问该数据集时,使用广播变量是非常合适的。这些数据集包括查找表、配置参数、机器学习模型参数等
避免重复传输:如果不使用广播变量,每个任务可能会在每次执行时通过网络传输相同的数据。这种重复传输会消耗网络带宽并导致性能瓶颈。广播变量可以将数据一次性发送到每个节点,从而避免重复传输
保持一致性:在需要确保数据一致性的场景下,广播变量是一个有效的解决方案。广播变量可以在分布式环境中保持一致性,即所有节点上的任务都会看到相同的数据副本
提高效率:频繁访问外部资源(如数据库、文件系统等)可能会导致性能瓶颈。通过将这些数据加载到内存中并以广播变量的形式分发,可以显著提高计算效率
场景
使用示例
broadcastVar.destroy()
销毁广播变量释放内存
broadcast广播变量
RDD持久化
分区ID:0~numPartitions-1,分区个数numPartitions
设置分区方式partitionBy()这个需要传递一个分区方式作为参数
Spark会根据数据的来源和当前集群的配置自动选择默认的分区方式。例如,如果你从HDFS读取数据,Spark会尝试使用HDFS文件的分片信息来决定分区方式;如果是通过并行化集合创建RDD,通常会根据集群的总核心数等因素进行分区。
不做设置默认
哈希分区 (HashPartitioner)根据数据的哈希值来决定分区。
哈希分区
范围分区 (RangePartitioner): 按数据的范围进行分区,这对于按顺序或按范围查询的数据非常有用。
范围分区
自定义分区需要继承org.apach.spark.Partitioner类并实现其中的三个方法:def numPartitions:Int:返回想要创建的分区个数,def getPartition(key:Any):这个函数需要对输入的key做处理,然后返回该key的分区ID,范围一定是0~numPartitions-1。equals(oter:Any):这个是java标准的判断相等的函数,之所以要求用户实现这个函数,是因为spark内部会比较两个RDD的分区是否一样。
示例
自定义分区
分区方式
➢该函数用于将RDD 进行重分区,使用HashPartitioner分区方式
➢第一个参数为重分区的数目
➢第二个为是否进行shuffle,默认为false
➢shuffle 为false 时,重设分区个数只能比RDD 原有分区数小,如果要重设的分区个数大于RDD 原有的分区数目,RDD的分区数将不变
➢如果shuffle 为true 时,重设的分区数不管比原有的RDD 分区数多或者少,RDD 都可以重设分区个数
➢repartition(num)的方法对数据进行重新分区
➢该函数其实就是coalesce函数第二个参数为true的实现,但重设分区个数没有限制
repartition(numPartitions:Int)重分区
设置数据分区
sparkRDD复习
从内存中读取数据创建RDD的方法常用于测试,从外部存储系统中读取数据创建RDD才是用于实践操作的常用方法。
应对不断有新数据加入的情况:相当于如果要计算一个实时数据的求和,人来做的话就是把现有的所有数据求和算出来然后后面再加进来的直接和算出开的结果求和,不断更新这个结果就行,但是sparkrdd不会这样,他是计算机程序,他只能每次加入一个新数据都把所有数据重新算一遍,这样确实会浪费很多资源,那为了他想人类一样计算于是将第一次计算的结果持久化下来,然后写程序让新的数据和持久化下来的数据进行求和,再次将结果持久化
0 条评论
回复 删除
下一页