sparkAPI、RDD总结
2019-11-27 10:08:12 0 举报
AI智能生成
spark结构化apijiRDD总结
作者其他创作
大纲/内容
Spark RDD小结
1. 什么是RDD
介绍
弹性分布式数据集,一种容错的并行数据结构
一种数据抽象,只读的、分区记录集合
——在此之上,提供了丰富的操作用来处理RDD
——在此之上,提供了丰富的操作用来处理RDD
Spark的基石,也是Spark的灵魂。
——RDD是Spark最核心最精髓的部分,Spark将所有数据都抽象成RDD。
——RDD是Spark最核心最精髓的部分,Spark将所有数据都抽象成RDD。
5个特性
分区信息(Partition):
-- 数据集的基本组成单位
-- 数据集的基本组成单位
一系列的分区信息。
每一个分区都会被一个任务处理。 ---决定了并行度。
创建RDD时,可以指定RDD的分区数,如果没有指定,采用默认值。
每一个分区都会被一个任务处理。 ---决定了并行度。
创建RDD时,可以指定RDD的分区数,如果没有指定,采用默认值。
RDD是一组分区,RDD由分区组成
分区个数默认与CPU核数个数有关
计算的函数 :
-- 对于给定的数据集,需要做哪些计算
-- 对于给定的数据集,需要做哪些计算
由一个函数计算每一个分片。RDD的计算以分片为单位。
依赖关系 :
-- RDD的依赖关系,描述了RDD之间的Lineage
-- RDD的依赖关系,描述了RDD之间的Lineage
RDD每一次转换都生成一个新的RDD,多个RDD之间有前后依赖关系。
在某个分区数据丢失时,Spark可以通过这层依赖关系重新计算丢失的分区数据,
而不是重头对RDD的所有分区数据进行计算。→容错性
在某个分区数据丢失时,Spark可以通过这层依赖关系重新计算丢失的分区数据,
而不是重头对RDD的所有分区数据进行计算。→容错性
Partitioner
函数:
-- 对于计算出来的数据结果如何分发
函数:
-- 对于计算出来的数据结果如何分发
Partitioner是RDD中的分区函数,数据按一定规则分配到指定的Reducer上去处理。
两种分区;Hash Partitioner、RangePartitioner
key-value的数据才有Partitioner,普通的数据Partitioner为None
两种分区;Hash Partitioner、RangePartitioner
key-value的数据才有Partitioner,普通的数据Partitioner为None
优先位置列表 :
-- 对于data partition的位置偏好
-- 对于data partition的位置偏好
HDFS -> Partitioner所在的Block的位置。
分配任务时,会尽量将任务分配给处理数据块的位置。
分配任务时,会尽量将任务分配给处理数据块的位置。
2. 创建RDD
基于parallelize创建
myCollection = "Spark The Definitive Guide : Big Data Processing Made Simple"\
.split(" ")
words = spark.sparkContext.parallelize(myCollection, 2)
.split(" ")
words = spark.sparkContext.parallelize(myCollection, 2)
parallelize函数可以传入分片个数参数,否则采用defaultParallelism。
基于外部数据源创建
distFile = sc.textFile("file:///home/camel/Repos/spark/README.md")
distFile.count()
distFile.count()
textFile函数支持从多种源创建RDD,如hdfs://,s3n://
基于父RDD转换得来
rdd2 = rdd1.xxx()
入口:
spark.sparkContext # 或者直接调用 sc
spark.sparkContext # 或者直接调用 sc
3. RDD常用算子
转换(transformantion)
在一个已存在的 RDD上创建一个新的 RDD,
但实际的计算并没有执行,仅仅记录操作过程
但实际的计算并没有执行,仅仅记录操作过程
在RDD上调用distinct方法,删除重复数据:
words.distinct().count() #9
words.distinct().count() #9
对RDD进行过滤,保留以字母“ S”开头的单词:
def startsWithS(individual):
return individual.startswith("S")
words.filter(lambda word: startsWithS(word)).collect()
def startsWithS(individual):
return individual.startswith("S")
words.filter(lambda word: startsWithS(word)).collect()
map: 将函数作用到数据集的每一个元素上,生成一个新的分布式的数据集(RDD)返回
words2 = words.map(lambda word: (word, word[0], word.startswith("S")))
words2.filter(lambda record: record[2]).take(5)
words2 = words.map(lambda word: (word, word[0], word.startswith("S")))
words2.filter(lambda record: record[2]).take(5)
flatMap操作也是对RDD中每个元素进行操作的,但是它的操作结果是一对一或者是一对多的
words.flatMap(lambda word: list(word)).take(5)
words.flatMap(lambda word: list(word)).take(5)
按单词长度从最长到最短排序
words.sortBy(lambda word: len(word) * -1).take(2)
words.sortBy(lambda word: len(word) * -1).take(2)
动作(action)
执行 RDD记录的所有运行transformations操作,
并计算结果,结果可返回到 driver程序
并计算结果,结果可返回到 driver程序
指定一函数将RDD中数据任意个数的数据值合并为一个值
spark.sparkContext.parallelize(range(1, 21)).reduce(lambda x, y: x + y) # 210
spark.sparkContext.parallelize(range(1, 21)).reduce(lambda x, y: x + y) # 210
count
使用它计算RDD中的行数
words.count() #9
使用它计算RDD中的行数
words.count() #9
first
返回结果集的第一个值
words.first()
返回结果集的第一个值
words.first()
max and min
分别返回结果中最大、最小值
sc.parallelize(range(1, 20)).max()
sc.parallelize(range(1, 20)).min()
分别返回结果中最大、最小值
sc.parallelize(range(1, 20)).max()
sc.parallelize(range(1, 20)).min()
保存结果数据到文件
saveAsTextFile
words.saveAsTextFile(“file:/tmp/bookTitle”)
saveAsTextFile
words.saveAsTextFile(“file:/tmp/bookTitle”)
Spark结构化API总结
1.创建DataFrame和SQL临时表
创建Dataframe
从数据源创建
df = spark.read.format("json").load("/data/flight-data/json/2015-summary.json")
通过row转换
导包:
from pyspark.sql import Row
from pyspark.sql.types
import StructField, StructType, StringType, LongType
from pyspark.sql import Row
from pyspark.sql.types
import StructField, StructType, StringType, LongType
myManualSchema = StructType([ StructField("some", StringType(), True),
StructField("col", StringType(), True),
StructField("names", LongType(), False) ])
myRow = Row("Hello", None, 1)
myDf = spark.createDataFrame([myRow], myManualSchema)
StructField("col", StringType(), True),
StructField("names", LongType(), False) ])
myRow = Row("Hello", None, 1)
myDf = spark.createDataFrame([myRow], myManualSchema)
从RDD
创建Sql临时表
df.createOrReplaceTempView("dfTable")
Schema
查看:df.printSchema()
创建:
myManualSchema = StructType([
StructField("DEST_COUNTRY_NAME", StringType(), True),
StructField("ORIGIN_COUNTRY_NAME", StringType(), True),
StructField("count", LongType(), False, metadata={"hello":"world"})
])
myManualSchema = StructType([
StructField("DEST_COUNTRY_NAME", StringType(), True),
StructField("ORIGIN_COUNTRY_NAME", StringType(), True),
StructField("count", LongType(), False, metadata={"hello":"world"})
])
应用:df = spark.read.format("json").schema(myManualSchema)\
.load("/data/flight-data/json/2015-summary.json")
.load("/data/flight-data/json/2015-summary.json")
2.数据源对接
读模式
读取数据的核心结构:
DataFrameReader.format(...).option("key", "value").schema(...).load()
DataFrameReader.format(...).option("key", "value").schema(...).load()
spark.read.format("csv")
.option("mode", "FAILFAST")
.option("inferSchema", "true")
.option("path", "path/to/file(s)")
.schema(someSchema)
.load()
.option("mode", "FAILFAST")
.option("inferSchema", "true")
.option("path", "path/to/file(s)")
.schema(someSchema)
.load()
写模式
写入数据的核心结构:
DataFrameWriter.format(...).option(...).partitionBy(...).bucketBy(...).sortBy(
...).save()
DataFrameWriter.format(...).option(...).partitionBy(...).bucketBy(...).sortBy(
...).save()
dataframe.write.format("csv")
.option("mode", "OVERWRITE")
.option("dateFormat", "yyyy-MM-dd")
.option("path", "path/to/file(s)")
.save()
.option("mode", "OVERWRITE")
.option("dateFormat", "yyyy-MM-dd")
.option("path", "path/to/file(s)")
.save()
可对接的六大核心数据源
CSV
JSON
Parquet
ORC
JDBC/ODBC connections
Plain-text files
JSON
Parquet
ORC
JDBC/ODBC connections
Plain-text files
3.dataframe基本操作
转换
字段选择
Python:
df.select("DEST_COUNTRY_NAME").show(2)
df.select("DEST_COUNTRY_NAME").show(2)
Spark SQL:
SELECT columnName * 10, otherColumn, someOtherCol as c FROM dataFrameTable
SELECT columnName * 10, otherColumn, someOtherCol as c FROM dataFrameTable
增加列
Python:
df.withColumn("numberOne", lit(1)).show(2)
df.withColumn("numberOne", lit(1)).show(2)
Spark SQL:
SELECT *, 1 as numberOne FROM dfTable LIMIT 2
SELECT *, 1 as numberOne FROM dfTable LIMIT 2
修改列
Python:
df.withColumnRenamed("DEST_COUNTRY_NAME", "dest").columns
df.withColumnRenamed("DEST_COUNTRY_NAME", "dest").columns
删除列
Python:
df.drop("ORIGIN_COUNTRY_NAME").columns
df.drop("ORIGIN_COUNTRY_NAME").columns
修改类型
Python :
df.withColumn("count2", col("count").cast("long"))
df.withColumn("count2", col("count").cast("long"))
条件过滤
Python :
df.filter(col("count") < 2).show(2)
df.where("count < 2").show(2)
df.filter(col("count") < 2).show(2)
df.where("count < 2").show(2)
Spark SQL:
SELECT * FROM dfTable WHERE count < 2 LIMIT 2
SELECT * FROM dfTable WHERE count < 2 LIMIT 2
去重
Python:
df.select("id","sno").distinct().count()
df.select("id","sno").distinct().count()
Spark SQL:
SELECT COUNT(DISTINCT(id, sno)) FROM dfTable
SELECT COUNT(DISTINCT(id, sno)) FROM dfTable
排序
Python:
df.orderBy(col("count").desc(), col("DEST_COUNTRY_NAME").asc()).show(2)
df.orderBy(col("count").desc(), col("DEST_COUNTRY_NAME").asc()).show(2)
Spark SQL:
SELECT * FROM dfTable ORDER BY count DESC, DEST_COUNTRY_NAME ASC LIMIT 2
SELECT * FROM dfTable ORDER BY count DESC, DEST_COUNTRY_NAME ASC LIMIT 2
有限选择
Python:
df.orderBy(expr("count desc")).limit(6).show()
df.orderBy(expr("count desc")).limit(6).show()
Spark SQL:
SELECT * FROM dfTable ORDER BY count desc LIMIT 6
SELECT * FROM dfTable ORDER BY count desc LIMIT 6
4.dataframe聚合操作
聚合算法
计数
Python:
df.select(count("StockCode")).show()
df.select(count("StockCode")).show()
Spark SQL:
SELECT COUNT(*) FROM dfTable
SELECT COUNT(*) FROM dfTable
去重计数
Python:
df.select(countDistinct("StockCode")).show()
df.select(countDistinct("StockCode")).show()
Spark SQL:
SELECT COUNT(DISTINCT *) FROM DFTABLE
SELECT COUNT(DISTINCT *) FROM DFTABLE
规定有效数字计数
Python:
df.select(approx_count_distinct("StockCode", 0.1)).show()
df.select(approx_count_distinct("StockCode", 0.1)).show()
Spark SQL:
SELECT approx_count_distinct(StockCode, 0.1) FROM DFTABLE
SELECT approx_count_distinct(StockCode, 0.1) FROM DFTABLE
第一个和最后一个
Python:
df.select(first("StockCode"), last("StockCode")).show()
df.select(first("StockCode"), last("StockCode")).show()
Spark SQL:
SELECT first(StockCode), last(StockCode) FROM dfTable
SELECT first(StockCode), last(StockCode) FROM dfTable
最大最小值
Python:
df.select(min("Quantity"), max("Quantity")).show()
df.select(min("Quantity"), max("Quantity")).show()
Spark SQL:
SELECT min(Quantity), max(Quantity) FROM dfTable
SELECT min(Quantity), max(Quantity) FROM dfTable
求和
Python:
df.select(sum("Quantity")).show()
df.select(sum("Quantity")).show()
Spark SQL:
SELECT sum(Quantity) FROM dfTable
SELECT sum(Quantity) FROM dfTable
区别求和
Python:
df.select(sumDistinct("Quantity")).show()
df.select(sumDistinct("Quantity")).show()
Spark SQL:
SELECT SUM(Quantity) FROM dfTable
SELECT SUM(Quantity) FROM dfTable
......
分组聚合
from pyspark.sql.functions import count
df.groupBy("InvoiceNo").agg(
count("Quantity").alias("quan"),
expr("count(Quantity)")).show()
df.groupBy("InvoiceNo").agg(
count("Quantity").alias("quan"),
expr("count(Quantity)")).show()
实现K-V映射
Python:
df.groupBy("InvoiceNo").agg(expr("avg(Quantity)"),expr("stddev_pop(Quantity)")).show()
df.groupBy("InvoiceNo").agg(expr("avg(Quantity)"),expr("stddev_pop(Quantity)")).show()
Spark SQL:
SELECT avg(Quantity), stddev_pop(Quantity), InvoiceNo FROM dfTable
GROUP BY InvoiceNo
SELECT avg(Quantity), stddev_pop(Quantity), InvoiceNo FROM dfTable
GROUP BY InvoiceNo
5. SparkSQL
创建表
CREATE TABLE flights (
DEST_COUNTRY_NAME STRING, ORIGIN_COUNTRY_NAME STRING, count LONG)
USING JSON OPTIONS (path '/data/flight-data/json/2015-summary.json')
DEST_COUNTRY_NAME STRING, ORIGIN_COUNTRY_NAME STRING, count LONG)
USING JSON OPTIONS (path '/data/flight-data/json/2015-summary.json')
删除表
DROP TABLE flights_csv;
DROP TABLE IF EXISTS flights_csv;
插入数据
INSERT INTO flights_from_select
SELECT DEST_COUNTRY_NAME, ORIGIN_COUNTRY_NAME, count FROM flights LIMIT 20
SELECT DEST_COUNTRY_NAME, ORIGIN_COUNTRY_NAME, count FROM flights LIMIT 20
收藏
收藏
0 条评论
下一页