spark
2019-09-03 10:12:14 0 举报
AI智能生成
spark
作者其他创作
大纲/内容
任务调度
专业术语
Application
一段应用程序,wordcount
job
一般与action类算子一一对应
stage
一个task
task
计算单元,一个task就是一个线程
Master
集群管理的主节点
Worker
集群管理从节点
Executor
计算进程,在计算进程中有一个线程池
ThreadPool
执行task线程池
划分
将job划分成一个个stage的目的就是为了pipeline计算模式\管道计算模式
理论上:每个stage下有多少分区,就有多少task
宽依赖往往对应着shuffle操作,需要运行过程中将同一个父RDD的分区传入不同子RDD分区中,中间可能涉及多个节点之间的数据传输;
窄依赖的每个父RDD分区只会传入到一个子RDD分区中, 通常可以在一个节点内完成转换。
流程
RDD
RDD中实际不存储数据,存储的是数据的位置以及处理数据的逻辑
task中逻辑就是将这个task所贯穿的各个partition中逻辑以递归函数展开式方式整合起来的fun2(fun1(textFile(block1)))textFile底层MR读文件的方法,一行行的读。
子主题
TaskScheduler任务失败,1+3(重试),都不行,退回DAGSchedluer重试4次(有问题的stage)都不行job失败
挣扎task(执行慢的)
推测,争抢任务
如何判断这个task是一个挣扎task?
1.5 100ms 0/75
taskScheduler正在调度100task,如果超过75%的task都执行完毕了,此时TaskScheduler会每个100ms进行进行一次判定,将执行完毕的task按照执行时间来升序排序,然后去中位数(2min),然后2min*1.5 = 3min,再过滤没有执行完毕的task,如果时间超过3min,那么这个task就是挣扎的task
计算1T数据,使用单进程单线程6h能够执行完毕,如果使用spark(多进程,多线程)来处理12h
原因:数据发生倾斜并且还开启了推测执行
数据倾斜
对于计算来说数据倾斜就是大部分task处理了少量的数据,小部分task处理大量数据
关闭推测执行
开启推测进行
数据有重复数据
关于etl类型业务,如何避免数据库中重复数据?
设置事务表!主键!
配置
Driver进程中存在DAGscheduler和Taskscheduler
DAGScheduler
1.根据依赖关系切割job
2.如果stage失败,重试stage
TaskScheduler
1.接收DAGScheduler的stage 遍历
2.失败task
3.拖后腿
负责为当前的Application申请资源
调度
粗粒度资源调度(spark)
描述:执行任务之前,会将所有的资源申请完毕,直到所有的任务全部执行完毕,才释放这部分资源
优点
在每一个task执行之前不需要自己去申请资源了,那么每一个task执行之前不需要自己去申请资源,那么每一个task执行时间变短
缺点
直到最后一个任务执行完毕才会释放资源,导致集群资源无法充分使用
细粒度资源调度(mapreduce)
描述:在任务执行之前不会先去申请资源的,直接进行任务的调度,每个task在执行之前自己去申请资源,申请到就执行,申请不到就能等待,当task执行完毕,就立马释放资源
优点
充分利用集群的资源
缺点
task
信息
Master和Worker之间有心跳机制的,Worker向Master发送心跳,发送心跳过程会不会携带当前Worker资源信息?不会,只会携带当前Worker的ID号。
累加器
用法
sc.accumulator(0)
注意
必须在Driver端定义
累加器只能在Driver端读,不能再Executor端读
广播变量
普通变量:集合副本数=task数
广播变量:集合副本数=Eexecutor数据
注意
1.广播变量必须在Driver端定义
2.广播只能在Driver端修改,不能再Executor端修改
用法
sc.broadcast(Array(1,2,3))
资源调度
1.默认情况下,一个worker为当前Application只启动一个Executor
2.这个Executor会使用1G内存和这个Worker所能管理的所有core
3.默认情况下,Executor启动方式轮询启动,有利于数据本地化
schedule()
1.startDriver
2.startExecutordOnWorkers
RDD
分区列表
每个分区都有一个计算函数
依赖于其他RDD的列表
数据类型(key-value)的RDD分区器
每个分区都有一个优先位置列表
spark比MR快的原因
1.spark支持pipline计算模式
2.spark资源复用
1core分配2~3 task
spark基于内存迭代
spark算子
repartition(numPartitions:Int):RDD[T]
重新分区
每一个stage task的个数与这个stage中最后一个RDD分区数有关
如果想要增加RDD的分区数,必须使用带有shuffle重分区方法coalesce(num,true)\repartition
coalesce(numPartitions:Int,shuffle:Boolean=false):RDD[T]
如果想要减少RDD的分区数,可以使用不带有shuffle重分区方法coalesce(num,false)
提高并行度
1.textFile(path,numPartitons) ***
2.增加HDFS上block数
3.reduceBykey groupByKey shuffle 算子可以指定返回RDD分区数 reduceByKey(_+_,10)
4.重分区 coalescem repartition
5.自定义分区器
distinct
原生
rdd.map((_,1)).reduceByKey(_*_).map(_._1) = distinct
子主题
shuffle
聚合
Shuffle Write
上一个stage的每个map task 就必须保证将自己处理的当前分区中数据相同的key写入一个分区文件,可能多个不同分区文件
Shuffle Read
reduce task 就会生成从上一个stage的所有task所在的机器上寻找属于自己那些分区文件,这样就可以保证每个key所对应value都会汇聚到同一个节点上去处理和聚合。
问题
小文件过多,耗时低效IO操作
miner gc
full gc
oom
OOM,读写文件以及缓存过多
优化后hashshufflemanager
hashSuffle
普通机制
每个map task 都会产生R(reduce task 的个数)个磁盘小文件,M*R个磁盘小文件
磁盘小文件过多,有什么影响?
1.写磁盘的时候,会产生大量写文件对象
2.读文件的时候,会产生大量读文件的对象
3.频繁大量通信
对象过多 》 内存不足 》 GC 》 OOM
合并机制
每个Executor(1 core) 会产生R个磁盘小文件 磁盘小文件 = c*r 个 c * r > m* r ? 一个core一般给他分配2-3个task执行
处理大量小文件
SortShuffleManager
普通机制
内存数据结构 ! map array
SortShuffleManager bypass
bypass运行机制
shuffle reduce task 数量小于spark.shuffle.sort.bypassMergeThreshold参数的值
参数
spark.shuffle.file.buffer
该参数用于设置shuffle write task的BufferedOutputStream的buffer缓冲大小。将数据写到磁盘文件之前,会先写入buffer缓冲中,待缓冲写满之后,才会溢写到磁盘。
调优建议:如果作业可用的内存资源较为充足的话,可以适当增加这个参数的大小(比如64k),从而减少shuffle write过程中溢写磁盘文件的次数,也就可以减少磁盘IO次数,进而提升性能。在实践中发现,合理调节该参数,性能会有1%~5%的提升。
调优建议:如果作业可用的内存资源较为充足的话,可以适当增加这个参数的大小(比如64k),从而减少shuffle write过程中溢写磁盘文件的次数,也就可以减少磁盘IO次数,进而提升性能。在实践中发现,合理调节该参数,性能会有1%~5%的提升。
spark.reducer.maxSizeInFlight
参数说明:该参数用于设置shuffle read task的buffer缓冲大小,而这个buffer缓冲决定了每次能够拉取多少数据。
调优建议:如果作业可用的内存资源较为充足的话,可以适当增加这个参数的大小(比如96m),从而减少拉取数据的次数,也就可以减少网络传输的次数,进而提升性能。在实践中发现,合理调节该参数,性能会有1%~5%的提升。
调优建议:如果作业可用的内存资源较为充足的话,可以适当增加这个参数的大小(比如96m),从而减少拉取数据的次数,也就可以减少网络传输的次数,进而提升性能。在实践中发现,合理调节该参数,性能会有1%~5%的提升。
spark.shuffle.io.maxRetries
reduce OOM?
原因
1.shuffle聚合内存大小
2.reduce task 每次去map端数据量太小,来不及聚合
解决
1.spark.reduce.maxSizelnFlight小一点
2.增大shuffle聚合内存比例0.2 调优一般是成倍的增大或者减少
3.最大增大Executor内存
从Executor角度
reduce oom ?
原因
map端拉取数据量大,但是聚合少
解决
1.增加shuffle memory 0.2 -> 0.4
2.每次拉取数据量不要太大48 -> 24
3.增加Executor内存,水涨船高
业务
计算密集型
第一个解决办法最好,第三个解决办法最差
IO密集型
(需要大量持久化,或者使用缓存数据),第三种解决方案就是最好
Executor内存管理
0 条评论
下一页