spark的shuffle过程
2021-10-30 09:20:52 9 举报
制作不易
作者其他创作
大纲/内容
SortShuffleManager
3
4
7
8.重写spill
ShuffledRDD
BlockStoreShuffleReader
DAGScheduler
SortShuffleWriter,这就是第4步得到的writer
RDD
2
ShuffleMapTask,用于写文件(落盘)
1
insertAll()//第108行因为你往内存不断插入值,就有可能溢写磁盘,这就会根据条件来判断是否需要溢写到临时的溢写文件,数据量大的话就会生成很多tmep临时溢写文件maybeSpill()
// 第84行,myMemoryThreshold默认值内存门槛是5m if (elementsRead % 32 == 0 && currentMemory >= myMemoryThreshold) { 申请溢写} // 第93行,强制溢写,读取数据的值 超过了Int的最大值 shouldSpill = shouldSpill || _elementsRead > numElementsForceSpillThreshold// 第98行,溢写 spill(collection) // 释放内存 releaseMemory()
ShuffleWriteProcessor(写处理器)
Spillable
override protected[this] def spill(collection: ...): Unit = { val inMemoryIterator = collection.destructiveSortedWritablePartitionedIterator(comparator) val spillFile = spillMemoryIteratorToDisk(inMemoryIterator) spills += spillFile }
ExternalSorter
ResultTask(用于读之前写入的文件)
6
5
0 条评论
下一页