Spark调优
2024-12-04 10:50:15 0 举报
spark调优
作者其他创作
大纲/内容
block2
flatMap(_.split(\" \"))
partition
Executor1
hive可在Hive中提前先将数据根据id聚合
task4
NODE_LOCAL 节点本地化
worker1每个进程3Core4G 内存
内存data
task
Worker
Executor2
rdd= sc.textFile(\"hdfs://\")rdd= rdd.cache() ---虽有cache(懒执行算子)但仍达不到PROCESS_LOCAL,只是标记了一下数据,当job执行完后,才会放入到内存中rdd.map().count()因为数据在HDFS的磁盘上,所以这种方式永远不会出现PROCESS_LOCAL的情况,最高就是NODE_LOCAL如何达到PROCESS_LOCAL级别?至少有2个job,第一个job把数据缓存到数据中,第二个job在缓存的基础上运行才能达到PROCESS_LOCAL级别
NO_PREF 无最佳位置 (task 在哪个Executor上执行都一样)
join算子广播变量将较小的RDD使用collect加载到Driver端,然后变为广播变量,在较大的RDD中处理数据
上报
HDFS
DriverDAGScheduleTaskSchedule
解决数据倾斜:1. ReduceByKey 双重聚合2. join算子 广播变量3. join算子 拆分RDD+随机前缀4. 提高并行度来解决数据倾斜 (优先调整并行度)5. filter过滤掉非法数据6. 把数据倾斜的问题提前解决
Stage2
Thread1:处理了大量数据Thread2:处理了少量数据
Node1
DriverDAGScheduleTaskScheduleMapOutPutTrackerMaster
shuffle writemap的结果写入磁盘
task1、task2在执行完后会shuffle write将结果写入磁盘,同时MapOutPutTrackerWorker会将地址等相关信息上报给MapOutPutTrackerMaster。这样MapOutPutTrackerMaster掌握了shuffle中间结果的位置。再调度task3、task4的时候,会先去MapOutPutTrackerMaster查看shuffle中间结果的位置,然后才分发task3和task4
Executortask1MapOutPutTrackerWorker
PROCESS_LOCAL 内存本地化
如何在一个节点上启动多个Executor (集群)在yarn上 添加 --num-executors 数量 在standalone上 添加 --executor-cores --total-executor-cores --executor-cores 2 --total-executor-cores 4 在节点上会启动2个Executor
Driver
reduceByKey双重聚合
可以调整并行度将2,成倍增加或者降低比如将2调整为4,发现执行速度快了再将4调整到8,发现执行速度慢了,再将8调整为6,在查看执行速度进行调整
Worker/节点
Mysql2
task1和task2Map TaskMap上如何计算找数据?通过HDFS上的 getBlockLocations()
spark-submit --master XXX --executor-cores 1 --total-executor-cores 3 --executor-memory 1G
task3
block1
Stage1
hive
代码调优
1.Spark task任务调优 (master和worker是物理节点,driver和executor是进程。)2.数据倾斜3.推测执行 (不推荐开启)4.代码调优
Mysql3
task3和task4Reduce Task
配置:1. 程序中配置 SparkSession (不建议,因为每次调节数值都要打包)2. 提交任务的时候配置 spark-submit --conf spark.locality.wait=9 (推荐,命令上可以直接改参数)3. spark-defaults.conf中配置 (不建议,因为是全局的,每个spark程序的最佳参数都不一致)
数据集中对应的key很均匀
worker3每个进程3Core4G 内存
共启动3个,因为设置了total-executor·-cores为3(如果不设置会根据work的cores数处理)会在每个Worker上启动启动一个executor(默认)为了更好的数据本地化
task2
Executortask2MapOutPutTrackerWorker
任务调度流程:如何调优
RDD4
worker2每个进程3Core4G 内存
task线程池
跨进程
spark要做根据id聚合此处容易发生数据倾斜
如果不设置参数,默认在3个worker上各启动一个Executor每个Executor占用2个core,1G内存
Worker1/节点
1.task计算的数据在本节点的其他Executor的内存中2. task计算的数据在本节点的磁盘上
计算框架要覆盖存储框架task1应在block1上运行task2应在block2上执行如果存储在node01上,spark运行在node02上,那么在计算的时候就要走网路io传输
mysql集群和Executor都在不同的节点上
1.Attempt: Task任务失败重试次数, 加上第一次执行的次数,一共4次,也就是第一次失败后重试3次2.Locality Level: 本地化级别 (本地化:计算逻辑在数据所在的节点上执行) 五种本地化级别: a. PROCESS_LOCAL 内存本地化 (最优的) 要配合cache等算子 b. NODE_LOCAL 节点本地化 c. NO_PREF 无最佳位置 d. RACK_LOCAL 机架本地化 (task计算的数据在机架的其他的节点内存中或磁盘上;需自行配置机架号) e. ANY 机房本地化 (最差的,跨机架)3. Duration 执行的时间4. Input Size/Records 执行的数据多大/记录数5. Write Time shuffle写的时间通过 Duration、Input Size/Records、Write Time的三个指标可以发现是否有数据倾斜
reduceByKey(_+_)
持久化算子:cachepersist(MEMORY_ONLY) = cache持久化的级别: (选择这三种就ok ,优先MEMORY_ONLY)MEMORY_ONLYMEMORY_ONLY_SER (压缩一下在放入memory)MEMORY_AND_DISK (优先往memory放,其次放DISK)Spark的WEB UI 中 node01:8080 通过Application查看storage就能看到有多少数据 %缓存到内中及磁盘上如果Memory使用紧张,Memory中的数据需要手动释放rdd.unpersist()
sc.textFile(\"hdfs://\")
spark要做根据Hive聚合后的数据在处理
2.NODE_LOCAL如果等待3s,重试5次 不能执行;再次降一级
spark.locality.wait=3S//等待节点本地化时间spark.locality.wait.node = spark.locality.wait//等待进程本地化时间spark.locality.wait.process = spark.locality.wait//等待机架本地化时间spark.locality.wait.rack = spark.locality.wait通过spark.locality.wait 来调优默认是3S 可以提高等待时间增加本地化的级别但是调优要有度!!!时间太高,会影响整体执行时间反而过长。
data
如何让你的Application飞起?(1)Spark集群管理更多的资源 standalone yarn(2)给Application分配更多的资源,增加Executor的个数 以及 加大每个Executor使用的资源 --executor-cores --executor-memory --total-executor-cores (3)5种增加并行度的方式(4)本地化调优(5)解决数据倾斜
什么是推测执行: 不推荐开启如果一个task 执行十分缓慢,TaskScheduler会在其他节点上新启动一个task,然后两个task比赛,谁先执行完,以谁的结果为准。怎么判定task是拖后腿的task?stage中 一共有100个task ,有75个task执行完毕。计算这75个task的执行时间的中位数, 如果该中位数是1s,凡是执行时间超过1.5倍中位数时间的task都是拖后腿的task。每隔100ms筛选拖后的task。参数:spark.speculation false (默认关闭的)spark.speculation.interval 100ms 间隔多长时间筛选拖后的taskspark.speculation.quantile 0.75 执行完成的百分比,spark.speculation.multiplier 1.5 超时中位数的倍数为什么不推荐开启?1. 数据倾斜+开启推测执行, 提交job,一直在执行,永远执行不完。 比如,一个task 执行1T的数据 , 被判定为拖后腿的task, 会在其他节点开启一个同样的task 依然执行1T的数据,但是同样慢,又被判定为拖 后腿的task,又开启了一个同样的task,导致集群资源全部被消耗完。2. job(往外部数据库写数据,导致重复数据) task 在60%的时候被定义为 拖后的 ,往数据库里写了60%的数据。 新启的task从 0%重新开始执行, 又重新开始往数据库里写数据。 (可以同过数据库的幂等处理解决这个问题)什么时候推荐开启?没有数据倾斜的情况下可以开启。因为每个core的执行快慢不同,谁先执行完以谁的结果为主。可提高运行速度
task1
Mysql1
RDD
RDD3
Node2
Worker2/节点
推测执行
把数据倾斜提前解决
1.PROCESS_LOCAL如果等待时间超过3s,Driver重新在此往Executor1中发送,5次后还不能执行判定Executor资源满了。降低一级数据本地化级别变为 NODE_LOCAL
stage的task的工作原理stage就是一组task的别名
RDD2
3.RACK_LOCAL依次类推
var rdd= sc.textFile(\"hdfs://\")var rdd2 = rdd.flatMap(_.split(\" \
磁盘data
1. 创建RDD的执行分区数: rdd = sc.textFile(\"path\
数据倾斜
提高数据并行度
0 条评论
下一页