BigData - Spark知识点整理
2021-02-09 11:44:04 84 举报
AI智能生成
BigData - Spark知识点整理
作者其他创作
大纲/内容
Spark Core
系统架构
运行模式
Local
调试
Standalone
集群,Spark原生的CM
Mesos
集群
Yarn
集群, Yarn ResourceManager
概念组成
Application应用程序
基于Spark的用户程序,包含了一个Driver Program 和集群中多个的Executor
用户写的
Driver驱动
运行Application的main()函数,并且创建SparkContext;
new SparkContext(conf): appName;Master;conf
用户写的RDD DAG
通常SparkContext代表了Driver
可能本地运行,也可能在某worker上
根据SparkContext向cluster Manager(或者Yarn)申请资源
RDD 的 DAG,逐层RDD及其依赖关系
将Application分解成多个stage,stage中包含多个tasks
Cluster Manager
在集群上获取资源的外部服务
Standalone|ResourceManager(Yarn)|Mesos
taskscheuler分配资源给executor
Executor
某Application运行在Worker Node上的一个进程
Heartbeat向CM|Yarn汇报状态
集群架构
Master
Master进程
Driver进程
Worker
Worker进程
executor进程
执行task,内存20%
shuffle copy缓存 20%
spark.shuffle.memoryFraction
RDD持久化 60%
spark.storage.memoryFraction
原理、流程
RDD: 内存共享模型,弹性分布式数据集
对象的集合,只保存元数据信息, 有各种方法,operation()
分布式
数据分布在磁盘/HDFS上,不在RDD里面
只读
静态的每次transform都生成一个新的RDD
迭代式算法
迭代式,出错找父RDD
容错性
数据丢失时,可以进行重建;可对RDD缓存
惰性调用
Transform不做运算,只记录过程;Action才进行计算
生成方法
parallelize(,partition)
rdd=sc.parallelize(list,partition)
文件加载
spark.sparkContext.textFile
spark.read.csv
spark.read.json
option(multipleline=true)
InputFormat
HDFS, Hive, Hbase
转换生成新RDD
算子
Transform
RDD间的转换;得到另外一个RDD, 不会马上提交spark运行
map, flatMap, filter, union, sample, join, groupByKey, cogroup, ReduceByKey, cros, sortByKey, mapValues
宽依赖 Shuffle
repartition,join,cogroup,sortBy,reduceByKey, groupByKey,*ByKey等算子
子 RDD 的各个分片会依赖于父RDD 的多个分片
会触发Shuffle,产生的RDD叫做ShuffleRDD
窄依赖
map,filter,mapPartitions,mapValues,union
子 RDD的各个分片(partition)不依赖于其他父RDD分片
不需要Shuffle
Action
collect, reduce, count, save, lookupKey,take,countbykey,countbyvalue
提交spark运行(生成从头开始的job),且返回结果,不是返回RDD
collect等返回结构到driver端,内存消耗
Shuffle
Shuffle是分区间的行为
需要写入到磁盘并通过网络传输,有时还需要对数据进行排序,消耗网络以及磁盘的I/O
可以再shuffle阶段,写入Combiner
Shuffle write
类Map,将ShuffleMapTask的结果写入到内存
Shuffle fetch
类reduce过程,获取ShuffleMapTask的结果给ShuffleReduceTask
DAG
Directed Acycle graph,反应RDD之间的依赖关系
从后往前回溯,遇到窄依赖加入本stage,遇见宽依赖进行Stage切分
Stage
DAG中,Transform环节,根据宽依赖的Transform来分stage;stage边缘会触发Shuffle
Taskset: task以数据分区做划分
一个分区一个task
Partition分区
优先考虑数据本地化
数据从HDFS读入,分区是默认的,不需要partitioner
shuffleMap分区
数据读入时的分区inputSplit,并行度分区; 到后面的k,v整理好的hashpartition
file.blocksize|defaul.parallelism
shuffleReduce分区
reduce算子决定的分区数,默认也是由spark.default.parallelism决定
分区片大小上限
spark.files.maxPartitionBytes
Task
executor中的线程,每个task使用一个vcore(spark.task.cpus)
一个partition由一个task运行,partition数决定task数
流程
1. Spark-submit提交
2.启动Driver进程,
2.1运行Application运行环境
2.2创建SparkContext
2.3向Yarn等集群资源管理器获取资源,运行executor
2.4.ClusterMaster分配Executor
3. SparkContext将应用程序代码分成stages,一个stage包含多个tasks
3.1SparkContext 根据RDD 的依赖关系构建DAG图
3.2 DAGScheduler 将DAG解析成stages
3.3 Stages发送给TaskScheduler
4.ClusterManager|Yarn分配资源,启动excutor,到各个worker上执行excutor
4.1 Executor向SparkCatext申请task
4.2 TaskScheduler分配Task给executor
4.3 SparkContext将应用程序发送给executor
5.Executor的并发度由CPU Core决定,每个core上同时一个task
pySpark,实际操作用法
SparkSession
2.0之前SparkContext,SparkSession封装了SparkContext
ss.read.[option('header','true').]text()生成sql.DataFrame
.rdd生成pySpark.rdd.RDD
.map生成PipelineRDD
如果是sc, sc.textFile(file,partitions),生成的是RDD
封装了SparkConf
SparkSession.conf()
封装了sqlConext
SparkSession.sql("select * from")
ss.close()
只要不close,rdd一直在内存中,一直可以复用
SparkConf()
SparkConf().get("spark.executor.cores")
SparkConf().getAll()
RDD生成
sparkContext.parallelize(list,partition)
生成ParallelCollectionRDD
第二个参数可以制定partition个数,必须>=2
read.csv|text|json
读取压缩文件需要repartition
ss.read.option("header","true").text(,partition) 读取文件头
分区,受defaultPartitions以及HDFS文件blocks,以及total cores等影响
Hive|Hbase|Mysql
Transform生成新的RDD
RDD转换Transform
dir(RDD)来获取所有的方法
join()
窄: 当两个RDD都使用hashpartition且分区数据一样,数据不需要shuffle
宽: 两个RDD的分区不一致,这时候就是宽依赖
提供join, leftOuterJoin,rightOuterJoin,但是只针对k-v pair
窄依赖
map(func) | 窄
mapPartitions每次处理一个分区,map每次一条
flatMap(func) | 窄
map后对rdd再进行一次flatten,迭代器,扁平化输出
mapValues(func)
只用在k,v上,把所有v用func处理一遍,k不变
flatMapValues(func)
可以将(k,(v1,v2))展开成(k,v1),(k,v2)
a.union(b) | 窄
不会去重
filter(func) | 窄
filter(lambda x:条件)
keys,values | 窄
sample(withReplacement,fraction=0.01)
取样1%, 检查数据倾斜
takeSample(withReplacement,num)
mapPartitions
function一次接收所有的partition数据,吐给mysql等只需要一个par一个连接
容易造成OOM
宽依赖 Shuffle
distinct() | 宽
无参数,返回rdd,未排序
instersection | 宽
需要,相当于join了
substract 宽
需要shuffle
sortByKey(ascending,NumPartition)
只能用于k,v, 且只能排k
sortByKey()不写默认以asc排序,以k为排序
sortBy(lambda x:x[1],ascending,NumPartition)
sortBy底层是sortByKey(),默认升序;
不只用于(k,v),既可以用于(k),也可以用于(x,x,x,x,x);比sortByKey多一个参数
groupBy(func)
接受函数分组,将key分组,同时所有key转成迭代器
groupByKey(numPartition,partitioner)
只能用于k,v
默认不用参数,输出结果类似groupBy(func);以key分组,同时把v转成迭代器
groupByKey(func)返回的是(key,iteratebal迭代器)
纯生成迭代器,所以只有shuffle,没有reduce功能
reduceByKey(func)
只能用于k,v,其他会报错
至少有1个函数参数,shuffle前聚合; 区别于reduce,后者是action
可以当作是spark中的combiner来用,combine逻辑自定义,一样不能avg
*ByKey | 宽
reduceByKey先聚合再shuffle,groupByKey先shuffle再聚合
aggregate(初始值,分区内聚合func,分区聚合func),也是比groupByKey高效
stage边缘,shuffle需要; 判断宽窄的唯一标准就是需不需要shuffle。子rdd一个分区中的数据是否依赖于多个父rdd分区
RDD动作Action
countByKey()
注意这个是个action,返回一个dict
如果对象是字符串,首字母作为key;如果是k,v,以k作为key来count
countByValue()
不管对象传进来什么,全部拿来做keyCount;适合做word Count
count()|min|max|sum|mean()
返回int
collect()
stdout,将rdd转换成list
all into Driver's memory!!! attention!
first()
取值都是action
take(10)
默认顺序,比较快
生成list
takeOrdered()和top相反,默认升序
top()
数字按大小排序
lookup()
rdd.lookup(someKey)
reduce(func)
并行整合,返回一个值,汇总或最大最小
.reduce(lambda x,y:x+y), 只是代表其中元素相加
输出类型和输入一样
区别于reduceByKey,这个是一个action
rdd.saveAs*
生成一个目录
foreach, foreachPartitions
foreachParttion可以减少连接数
RDD进阶
broadcast
将小 RDD 复制广播到每个 Executor 的内存里
persist|cache
将RDD计算写入内存(或者磁盘),根据缓存级别不同
unpersist
cache = persist(memoryonly)
checkPoint
写入hdfs,rdd关系截断,后续计算都从磁盘二进制文件读取
RDD分区
.getNumPartitions()
获取partition个数
.glom
按照分区显示输出
repartition
强制shuffle的coalesce,repartition=coalesce(n,shuffle=True)
增加分区,不shuffle则无意义。rdd.repartition(100)
coalesce
减少分区的时候,建议用coalesce,可以shuffle=false
分区
分区函数
HashPartition
RangePartition
每个分区分配一个task
分区就决定了stage task的并行度
shuffle之前的分区数,不会变的,直到shffufle发生之后,分区数变化
shuffle之后的分区(并行度),和算子有关,部分算子可以指定分区groupByKey,cogroup,distinct,reduceByKey,sortByKEY
简介
伯克利开源项目,由scala开发
迭代式应用的高性能需求而设计
Hadoop生态圈,Yarn框架支持
开发方式
spark-submit
spark-submit test_pySpark.py
参数
解释器交互
spark-shell
scala,兼容java
pyspark
python
sparkR
R
yarn apllication
-list
-kill appid
使用场景
Spark [Core]
批处理 ,RDD
类 MapReduce、Hive
Spark SQL
类SQL处理,交互式,返回Spark-DataFrame
类 Impala
依赖于Hive MetaStore
将HiveQL转换成了Spark的RDD操作
由SparkCore封装
Spark Streaming
流式处理;线上实时时序数据
类Storm
MLlib
机器学习
类Mahout
GraphX
图形算法 pagerank
与Hadoop的比对
Spark特点
(1)提供 Cache 机制来支持需要反复迭代计算或者多次数据共享,减少数据读取的 IO 开销;
(2)提供了一套支持 DAG 图的分布式并行计算的编程框架,减少多次计算之间中间结果写到 Hdfs 的开销;
(3)使用多线程池模型减少 Task 启动开稍, shuffle 过程中避免不必要的 sort 操作并减少磁盘 IO 操作
利用数据集内存缓存以及启动任务时的低延迟和低系统开销
支持多种语言编程 scala|python|java,spark-shell解释器
vs Hadoop
Map+Reduce有限的处理方式 vs RDD, RDD灵活,多种数据集操作提供更多转换,如sort,join
中间过程写入磁盘,二次处理,磁盘IO开销大 vs 内存中处理,同时支持溢出
Reduce需等待Map vs 同一个partition中支持流水线处理
高延迟 vs 小batch处理,stream流处理
优势
低延时
高并发
低磁盘IO
不保存 少保存到磁盘
低依赖
stage内部并发,支持到core的高并发
分区内相同的转换,内部处理不需shuffle
容错性高
RDD重算,惰性操作,RDD可复用共享
cache持久化
抽象程度高
代码简洁,语义清晰;不需要在意具体实现细节,丰富api
逻辑清晰度高
DAG
灵活度高
多种操作转换api
看过哪些源码?
Spark Streaming
StreamingContext
应用案例
实质是micro batch的形式通过Spark处理
整合Kafka
DirectDStream
Receiver
SparkSQL
简介
前身: Shark (DataBricks)
即Hive on Spark.
旨在hive不局限于mapreduce计算框架,提升速度
2014.7停止,转向SparkSQL
Shark对于hive过多依赖
是一个项目,一个模块
将SQL转化为对RDD/DataFrame的操作
支持基于DF的类SQL的DSL处理
两个分支
HiveContext
hive语法解析器
sqlContext
SQL语法解析器,支持比hive多的语法
主要组成
Core(数据处理)、Catalyst(执行计划解析优化)、Hive(hive支持)、Hive-ThriftServer
支持SQL需要: Parser+Optimizer+Execution
特点
高度整合
SparkSQL可以和Spark无缝结合,同时支持多种语言开发python scala java R
多数据源整合
多数据源数据(hive,parquet,json,s3,hdfs)最终DF落地成table、view,然后join union等
与Hive相容
reuse Hive Metastore以及UDF等,支持HiveQL语法(HiveContext)
与 Hive
Hive依赖于MapReduce,SparkSQL基于Spark更快
两者都不负责计算,实际的计算引擎是MR or Spark
SparkSQL还依赖于Hive Metastore
SparkSQL不止可以通过Hive,还可以通过File/RDBMS/NoSQL,RDD获取数据
运行方式
sqlContext
spark.sql(), ss.sql()当前运行环境
DataFrame API
spark1.3开始有,此前是schemaRDD
DataSet API
spark1.6开始
DataFrame
DataFrame前身SchemaRDD(Spark1.3之前)
Spark1.3之后,不再继承于RDD,独立实现方法
DataFrame底层使用同一个优化器,跟用python,scalar等语言开发无关
生成
spark.sql生成
spark.createDataFrame(list)
spark.createDataFrame(k-v)
spark.createDataFrame(someRDD,[schema])
createDataFrame(rdd, StructType)
用structType可以指定数据类型
df=RDD.toDF(schema)
基本操作DSL
df.printSchema()
df.columns
df.count()
df.head(3)
跟df.take(3)结果一样
df.show()
默认20行
df.filter(isnull(col)
wtfDF.filter("Browser='Chrome'").show()
df.select().distinct()
select(col.alias('newname'))
df.select().display() 简单图表
df.selectExpr()
df.where(1=1).show()
df.dropDuplicates
df.drop('col')
删除一列
df1.join(df2,df1.key==df2.key,"inner")
等同df1.join(df2,"key","inner")
df.orderby(df.key.desc()).show(10)
.orderBy(wtfDF.Sessions.desc(),'Bounce Rate') 如果用.desc(), 需要引用df.col
.orderBy(desc('Sessions'),'Bounce Rate').show()
这种用法要用到from pyspark.sql.functions import *
df.sort('',ascending=False)
df.union
df.explode一行拆多行
df.withColumn() 列改动
df.withColumnRenamed('old','new')列改名
df.intersect()
df.subtract()
df.foreach(); df.foreachPartition()
df.sample()
聚合运算
df.groupBy("col").agg({"col1":"avg","col2":"max"})
df.groupBy('').agg()
同 max,min,avg,sum,count
默认生成字段名: "avg(col1)","max(clo2)", 需要进一步.withColumnRenamed()
开窗函数
df2.withColumn("rid",row_number().over(Window.partitionBy("height").orderBy("name"))).show()
生成表view
df.registerTempTable("newTable")
注意:只在当前的ss里面有效,spark.sql()引用不到,ss.sql()可以找到表
df.createOrReplaceTempView("viewName")
同样,只在ss.sql()中有效,在spark.sql()找不到
RDD DataFrame Dataset
与RDD
df.rdd转化成RDD, rdd.toDF(schema)
DF是Row的集合,包含了数据的结构信息(列名,类型)
RDD是java对象的合集 ; DF是分布式Row的合集,比RDD多了schema的信息
Dataset可以认为是DataFrame的一个特例,主要区别是Dataset每一个record存储的是一个强类型值而不是一个Row
DF支持表的操作:df.select(), spark.sql("select ...")
RDD对于join合并等支持不够
DF提供比RDD更高层更丰富的api。 比如开窗函数,列名更改
RDD需要自己进行优化,DF有自动优化器处理Catalyst
通过Spark SQL Catalyst优化器(树变换框架)的最先进的优化和代码生成。
DF可以输出Table/View进行sql操作
python不支持dataset api
调优
broadcast
spark.sql.autoBroadcastJoinThreshold
基表不能被broadcast
?
Spark优化
spark-sumbit参数调优
driver-memory
Driver进程使用的内存
collect()使用driver的内存
Memory for driver (e.g. 1000M, 2G) (Default: 1024M)
Executor-memory
每个Executor进程申请的最大内存,num-Executor*Executor-memory决定了Job申请的最大内存
Memory per executor (e.g. 1000M, 2G) (Default: 1G).
spark.executor.instances
指定Job总共最多用到多少个Executor来处理
经验上由total cores/spark.executor.cores确定
Number of executors to launch (Default: 2).
executor的数量,在sparksession声明的时候,由executor.cores确定,后面怎么ss.conf.set()都没用
默认是2; 在yarn上,container=executor+driver
spark.executor.cores
每个executor申请的core数,决定了task在executor上的多线程上限
executor.instances * executor.cores 决定了total cores,允许并行跑的上限
理论上可以配置超过集群的vCore总数,有啥risk?
Number of cores per executor. (Default: 1 in YARN mode,or all available cores on the worker in standalone mode)
--conf spark.storage.memoryFraction
RDD持久化所需要的内存占比,默认0.6;根据持久化策略,以及shuffle内存的使用情况,可以调整
--conf spark.shuffle.memoryFraction
shuffle内存使用上限,默认0.2
--conf spark.default.parallelism
默认的每个stage里task同时运行的上限
建议是total cores的2-3倍
RDD的默认分区数,影响了sc.read的partition(28->56)
决定了shuffleRDD的默认分区数,如果算子么有指定的话
提高并行度,有利于加快job运行,但是也会增加磁盘IO和cpu压力
--conf spark.sql.shuffle.partitions
和spark.default.parallelism一样,都是决定分区并行度的
区别于spark.default.parallelism,这个是DataFrame操作中决定join/aggregation操作shuffle端的分区数
spark.defaul.parallelism对DF不起作用,spark.sql.shuffle.partiitons对RDD,以及非join/agg的DF操作不起作用
spark.sql.autoBroadcastJoinThreshold
集群调优
代码优化
RDD持久化
复用RDD, 且有必要进行cache
关联到spark.storage.memoryFraction 0.6占比
cache
=persist MemoryOnly
内存不够就不会进行持久化
persist
Memory Only
Memory And Disk
优先内存,不够了就存在磁盘上; 子RDD需要部分从磁盘读取
Memory Only Ser
内存only,但是需要序列化处理,更省内存
Memory And Disk Ser
需要序列化,省内存?
反序列化消耗时间,还有CPU
Disk Only
MemoryOnly_2 MemoryAndDisk_2
2代表在其他节点上保留一个副本,容错
unpersist
需要注意,不需要的RDD,要及时释放内存
broadcast
将小的RDD,broadcast(bc的不是RDD本身)到各个excutor内存中; 适用于大表join小表,小表进行broadcast,类似hive中的mapjoin
到executor级别,相比到task级别,节省了内存开销; 减小了大表的数据传输IO
算子选择
foreachPartitions vs foreach
减少连接数
mapPartitions优于map
reduceByKey() 优先于 groupByKey().mapValues(sum/len); groupByKey会全部先shuffle
思路:选择较优方案,避免shuffle后再汇总,先"combine"
处理数据倾斜
重新分区
减少分区:filter之后coalesce重新分区(减少task,减少启动消耗,可以shuffle=False)
增加分区:如果只是部分分区数据过多,可以尝试repartition增加分区数,扩大并行度
针对某个hash取余,过度集中
reduceByKey,distinct,cogroup,groupByKEY,sortByKey,都有partition参数,来调整stage的并行度
某key过多
针对join,可以用broadcast小表,减少shuffle
针对聚合运算(max/sum),某key过多,可以加前缀,打散并行处理后,去前缀再和其他汇总
针对join,某key过多,可以独立这些数据出来,加前缀,重新分区(两个表分区一致);另一个均匀的表,扩容n倍;join完后和剩下的其他数据合并
.sample()
.sample(要不要放回去重新抽,抽样比例).coungByKey
数据本地化
taskIDtoLocaltion
getPreferLocation
数据存储格式
ORC相比textFile等,拥有更好的查询性能和压缩比
自由主题
收藏
0 条评论
下一页