sparkAPI、RDD总结
2019-11-27 10:08:12 0 举报
AI智能生成
spark结构化apijiRDD总结
作者其他创作
大纲/内容
Spark RDD小结
1. 什么是RDD
介绍
弹性分布式数据集,一种容错的并行数据结构
一种数据抽象,只读的、分区记录集合——在此之上,提供了丰富的操作用来处理RDD
Spark的基石,也是Spark的灵魂。——RDD是Spark最核心最精髓的部分,Spark将所有数据都抽象成RDD。
5个特性
分区信息(Partition):-- 数据集的基本组成单位
一系列的分区信息。每一个分区都会被一个任务处理。 ---决定了并行度。 创建RDD时,可以指定RDD的分区数,如果没有指定,采用默认值。
RDD是一组分区,RDD由分区组成
分区个数默认与CPU核数个数有关
计算的函数 : -- 对于给定的数据集,需要做哪些计算
由一个函数计算每一个分片。RDD的计算以分片为单位。
依赖关系 : -- RDD的依赖关系,描述了RDD之间的Lineage
RDD每一次转换都生成一个新的RDD,多个RDD之间有前后依赖关系。 在某个分区数据丢失时,Spark可以通过这层依赖关系重新计算丢失的分区数据,而不是重头对RDD的所有分区数据进行计算。→容错性
Partitioner函数: -- 对于计算出来的数据结果如何分发
Partitioner是RDD中的分区函数,数据按一定规则分配到指定的Reducer上去处理。两种分区;Hash Partitioner、RangePartitioner key-value的数据才有Partitioner,普通的数据Partitioner为None
优先位置列表 :-- 对于data partition的位置偏好
HDFS -> Partitioner所在的Block的位置。 分配任务时,会尽量将任务分配给处理数据块的位置。
2. 创建RDD
基于parallelize创建
myCollection = \"Spark The Definitive Guide : Big Data Processing Made Simple\"\\ .split(\" \
parallelize函数可以传入分片个数参数,否则采用defaultParallelism。
基于外部数据源创建
distFile = sc.textFile(\"file:///home/camel/Repos/spark/README.md\")distFile.count()
textFile函数支持从多种源创建RDD,如hdfs://,s3n://
基于父RDD转换得来
rdd2 = rdd1.xxx()
入口:spark.sparkContext # 或者直接调用 sc
3. RDD常用算子
转换(transformantion)
在一个已存在的 RDD上创建一个新的 RDD,但实际的计算并没有执行,仅仅记录操作过程
在RDD上调用distinct方法,删除重复数据:words.distinct().count() #9
对RDD进行过滤,保留以字母“ S”开头的单词:def startsWithS(individual): return individual.startswith(\"S\")words.filter(lambda word: startsWithS(word)).collect()
flatMap操作也是对RDD中每个元素进行操作的,但是它的操作结果是一对一或者是一对多的words.flatMap(lambda word: list(word)).take(5)
按单词长度从最长到最短排序words.sortBy(lambda word: len(word) * -1).take(2)
动作(action)
执行 RDD记录的所有运行transformations操作,并计算结果,结果可返回到 driver程序
count使用它计算RDD中的行数words.count() #9
first返回结果集的第一个值words.first()
保存结果数据到文件saveAsTextFilewords.saveAsTextFile(“file:/tmp/bookTitle”)
Spark结构化API总结
1.创建DataFrame和SQL临时表
创建Dataframe
从数据源创建
df = spark.read.format(\"json\").load(\"/data/flight-data/json/2015-summary.json\")
通过row转换
myManualSchema = StructType([ StructField(\"some\
从RDD
创建Sql临时表
df.createOrReplaceTempView(\"dfTable\")
Schema
查看:df.printSchema()
创建:myManualSchema = StructType([StructField(\"DEST_COUNTRY_NAME\
应用:df = spark.read.format(\"json\").schema(myManualSchema)\\.load(\"/data/flight-data/json/2015-summary.json\")
2.数据源对接
读模式
读取数据的核心结构:DataFrameReader.format(...).option(\"key\
spark.read.format(\"csv\").option(\"mode\
写模式
写入数据的核心结构:DataFrameWriter.format(...).option(...).partitionBy(...).bucketBy(...).sortBy(...).save()
dataframe.write.format(\"csv\").option(\"mode\
可对接的六大核心数据源
CSVJSONParquetORCJDBC/ODBC connectionsPlain-text files
3.dataframe基本操作
转换
字段选择
Python: df.select(\"DEST_COUNTRY_NAME\").show(2)
增加列
Python:df.withColumn(\"numberOne\
修改列
Python:df.withColumnRenamed(\"DEST_COUNTRY_NAME\
删除列
Python: df.drop(\"ORIGIN_COUNTRY_NAME\").columns
修改类型
Python : df.withColumn(\"count2\
条件过滤
Python :df.filter(col(\"count\") < 2).show(2) df.where(\"count < 2\").show(2)
Spark SQL:SELECT * FROM dfTable WHERE count < 2 LIMIT 2
去重
Python:df.select(\"id\
排序
Python:df.orderBy(col(\"count\
有限选择
Python:df.orderBy(expr(\"count desc\")).limit(6).show()
Spark SQL:SELECT * FROM dfTable ORDER BY count desc LIMIT 6
4.dataframe聚合操作
聚合算法
计数
Python:df.select(count(\"StockCode\")).show()
Spark SQL:SELECT COUNT(*) FROM dfTable
去重计数
Python:df.select(countDistinct(\"StockCode\")).show()
Spark SQL:SELECT COUNT(DISTINCT *) FROM DFTABLE
规定有效数字计数
Python:df.select(approx_count_distinct(\"StockCode\
第一个和最后一个
Python:df.select(first(\"StockCode\
最大最小值
Python:df.select(min(\"Quantity\
求和
Python:df.select(sum(\"Quantity\")).show()
Spark SQL:SELECT sum(Quantity) FROM dfTable
区别求和
Python:df.select(sumDistinct(\"Quantity\")).show()
Spark SQL:SELECT SUM(Quantity) FROM dfTable
......
分组聚合
from pyspark.sql.functions import countdf.groupBy(\"InvoiceNo\").agg(count(\"Quantity\").alias(\"quan\
实现K-V映射
Python:df.groupBy(\"InvoiceNo\").agg(expr(\"avg(Quantity)\
5. SparkSQL
创建表
删除表
DROP TABLE flights_csv;
DROP TABLE IF EXISTS flights_csv;
插入数据
收藏
收藏
0 条评论
回复 删除
下一页