Spark调优
2020-01-15 14:18:38 1 举报
AI智能生成
spark企业级调优
作者其他创作
大纲/内容
资源调优
driver-memory
executor-memory
num-executors
executor-cores
spark.default.parallelism
参数说明:该参数用于设置每一个stage的默认task数量,这个参数极为重要,如果不设置可能会直接影响你的spark作业性能
调优建议:spark作业的默认task数量为500-1000个较为合适,很多同学常犯的一个错误就是不去设置这个参数,那么此时就会导致spark自己根据底层HDFS的block数量来设置task的数量,默认是一个HDFS block对应一个task。通常来说,spark默认设置的数量是偏少的,如果task数量偏少的话,就会导致你前面设置好的executor的参数都前功尽弃。试想一下无论你的executor进程由多少个,内存和cpu有多大,但是task只有1个或者10个,那么90%的executor进程根本就没有task执行,也就是说白白浪费了资源! 因此spark官网建议的设置原则是,设置该参数 num-executors * executor-cores的2~3倍较为合适,比如executor的总cpu核数为300个,那么设置1000个task是可以的,此时可以充分利用spark集群的资源
spark.sql.shuffle.partitions
参数说明:该参数用来设置sparksql参数shuffle之后 shuffle read task的个数
调优建议:默认是200,一般在大批量数据处理的场景下是默认参数太小了,需要调整,比如增加到400/500/600等
开发调优
RDD的重用和持久化
MEMORY_ONLY
MEMORY_ONLY_SER
MEMORY_AND_DISK_SER
广播变量使用
尽量避免使用shuffle类算子
Broadcast+ map 来代替join
使用高性能的算子
使用reduceByKey/aggregateByKey替代groupByKey
使用mapPartitions替代普通map
使用foreachPartitions替代foreach
使用filter之后进行coalesce操作
使用repartitionAndSortWithinPartitions替代repartition与sort类操作
使用Kryo优化序列化性能
描述:Spark默认采用Java的序列化器,不够高效,使用KryoSerializer
使用fastutil优化数据格式
描述:fastutil集合类,可以减小内存的占用,提供更快的存取速度
调节数据本地化等待时长
描述:spark.locality.wait,默认是3s,可以适当增大
数据倾斜调优
解决方案一:使用Hive ETL预处理数据
解决方案二:过滤少数导致倾斜的key
解决方案三:提高shuffle操作的并行度
解决方案四:两阶段聚合(局部聚合+全局聚合)
解决方案五:将reduce join转为map join
解决方案六:采样倾斜key并分拆join操作
解决方案七:使用随机前缀和扩容RDD进行join
解决方案八:多种数据倾斜的解决方案综合的灵活使用
shuffle相关参数调优
spark.shuffle.file.buffer
默认值:32k
参数说明:该参数用于设置shuffle write task的BufferedOutputStream的buffer缓冲大小。将数据写到磁盘文件之前,会先写入buffer缓冲中,待缓冲写满之后,才会溢写到磁盘。
调优建议:如果作业可用的内存资源较为充足的话,可以适当增加这个参数的大小(比如64k),从而减少shuffle write过程中溢写磁盘文件的次数,也就可以减少磁盘IO次数,进而提升性能。在实践中发现,合理调节该参数,性能会有1%~5%的提升。
spark.reducer.maxSizeInFlight
默认值:48m
参数说明:该参数用于设置shuffle read task的buffer缓冲大小,而这个buffer缓冲决定了每次能够拉取多少数据。
调优建议:如果作业可用的内存资源较为充足的话,可以适当增加这个参数的大小(比如96m),从而减少拉取数据的次数,也就可以减少网络传输的次数,进而提升性能。在实践中发现,合理调节该参数,性能会有1%~5%的提升。
spark.shuffle.io.maxRetries
默认值:3
参数说明:shuffle read task从shuffle write task所在节点拉取属于自己的数据时,如果因为网络异常导致拉取失败,是会自动进行重试的。该参数就代表了可以重试的最大次数。如果在指定次数之内拉取还是没有成功,就可能会导致作业执行失败。
调优建议:对于那些包含了特别耗时的shuffle操作的作业,建议增加重试最大次数(比如60次),以避免由于JVM的full gc或者网络不稳定等因素导致的数据拉取失败。在实践中发现,对于针对超大数据量(数十亿~上百亿)的shuffle过程,调节该参数可以大幅度提升稳定性。
spark.shuffle.io.retryWait
默认值:5s
参数说明:具体解释同上,该参数代表了每次重试拉取数据的等待间隔,默认是5s。
调优建议:建议加大间隔时长(比如60s),以增加shuffle操作的稳定性。
spark.shuffle.memoryFraction
默认值:0.2
参数说明:该参数代表了Executor内存中,分配给shuffle read task进行聚合操作的内存比例,默认是20%。
调优建议:在资源参数调优中讲解过这个参数。如果内存充足,而且很少使用持久化操作,建议调高这个比例,给shuffle read的聚合操作更多内存,以避免由于内存不足导致聚合过程中频繁读写磁盘。在实践中发现,合理调节该参数可以将性能提升10%左右。
spark.storage.memoryFraction
默认值:0.6
参数说明:该参数用于设置RDD持久化数据在executor内存中的占比,默认是0.6,根据你选择的不同的持久化数据策略级别,如果内存不够了时,可能数据就不会持久化或者数据会写入到内存中
调优建议:如果spark任务中,有较多的RDD持久化数据操作,这个值可以适当提高一些,保证持久化的数据在能够容乃在内存中。如果spark任务中shuffle操作较多,那么这个参数的值适当降低一些比较合适,更更多的内存空间shuffle操作
spark.shuffle.manager
默认值:sort
参数说明:该参数用于设置ShuffleManager的类型。Spark 1.5以后,有三个可选项:hash、sort和tungsten-sort。HashShuffleManager是Spark 1.2以前的默认选项,但是Spark 1.2以及之后的版本默认都是SortShuffleManager了。Spark1.6以后把hash方式给移除了,tungsten-sort与sort类似,但是使用了tungsten计划中的堆外内存管理机制,内存使用效率更高。
调优建议:由于SortShuffleManager默认会对数据进行排序,因此如果你的业务逻辑中需要该排序机制的话,则使用默认的SortShuffleManager就可以;而如果你的业务逻辑不需要对数据进行排序,那么建议参考后面的几个参数调优,通过bypass机制或优化的HashShuffleManager来避免排序操作,同时提供较好的磁盘读写性能。这里要注意的是,tungsten-sort要慎用,因为之前发现了一些相应的bug。
spark.shuffle.sort.bypassMergeThreshold
默认值:200
参数说明:当ShuffleManager为SortShuffleManager时,如果shuffle read task的数量小于这个阈值(默认是200),则shuffle write过程中不会进行排序操作,而是直接按照未经优化的HashShuffleManager的方式去写数据,但是最后会将每个task产生的所有临时磁盘文件都合并成一个文件,并会创建单独的索引文件。
调优建议:当你使用SortShuffleManager时,如果的确不需要排序操作,那么建议将这个参数调大一些,大于shuffle read task的数量。那么此时就会自动启用bypass机制,map-side就不会进行排序了,减少了排序的性能开销。但是这种方式下,依然会产生大量的磁盘文件,因此shuffle write性能有待提高。
0 条评论
下一页