SparkEnv 源码分析
2020-04-27 11:14:54 0 举报
sparkEnv源码分析部分
作者其他创作
大纲/内容
1、!dep.mapSideCombine:Map-side combine without Aggregator specified!2、dep.partitioner.numPartitions <= bypassMergeThreshold bypassMergeThreshold=spark.shuffle.sort.bypassMergeThreshold default 200
V
minSystemMemory = (reservedMemory * 1.5).ceil.toLong = 450M如果systemMemory<minSystemMemory,报错如果配置了spark.executor.memory,且小于minSystemMemory,报错
ShuffleManager
MemoryManger分析
rpcEnv
registerShuffle
SparkEnv创建过程分析
BlockManager
map
BypassMergeSortShuffleHandle
onHeapStorageMemory
MapOutputTracker
预留内存
OnHeapHeapByteBuffer
mmap
1、从代码中可以看出在Driver和Executor中都会创建BlockManager,BlockManagerMaster,但是在创建BlockManagerMaster时,传入到registerOrLookupEndpoint的第二个参数不是new 对象操作,而是传入了一个无参的匿名函数,所有通过代码发现只有在Driver创建BlockManagerMaster时会创建BlockManagerMasterEndpoint,Executor会生成一个BlockManagerMasterEndPointRef
系统内存
数据写入磁盘方式:
buffer
compute()
getWriter
SortShuffleWriter
RESERVED_SYSTEM_MEMORY_BYTES
300 * 1024 * 1024
BlockManager分析
maxMemory
K
reduceByKey
含义
data
new ShuffledRDD()
内存分配时取值描述:
MemoryManager
getMaxMemory
配置项
kernel
SparkContext
可用内存占比因子
maxOnHeapStorageMemory+maxOffHeapStorageMemory
1、从迭代器中取出下一条记录,写入到ShuffleExternalSorter2、写入ShuffleExternalSorter过程: 1、判断ShuffleInMemorySorter中记录条数大于配置项: spark.shuffle.spill.numElementsForceSpillThreshold默认为Integer.MAX_VALUE,则溢写到磁盘中 2、判断ShuffleInMemorySorter中是否有空间,没有则进行扩容。扩容大小为已用的空间used/8*2*8L,进行allocatePage 3、unsafe.putInt() 4、unsafe.copyMemory() 这两个方法通过传入的对象是否为空执行堆内还是堆外操作
DiskStore
ByteBuffer.allocate
ShuffleManager分析
SortShuffleManager
MetricsSystem
onHeapExecutionMemory
默认值
create
memoryFraction
BlockInfoManager
BroadcastManager
sparkEnv
JVM
ByteBuffer.allocateDirect
Executor
getDependencies
maxMemory - onHeapStorageMemory
shuffle管理器spark.shuffle.manager:默认sort->SortShuffleManager
if (curSize == capacity) { growArray()}font color=\"#ff6666\
内存管理器spark.memory.useLegacyMode:默认值false->UnifiedMemoryManager
SerializedShuffleHandle
systemMemory
spark.testing.memory
BaseShuffleHandle
_env
getReader
MemoryStore
Runtime.getRuntime.maxMemory
DirectByteBufferByteBuffer
UnifiedMemoryManager
HeapByteBuffer:分配在堆上的字节数组buffer
SerializerManager
BypassMergeSortShuffleWriter
spark.memory.fraction
DirectByteBuffer:分配在堆外的字节数组buffer
filedisk
UnsafeShuffleWriter
线性数组结构,维护一个curSize,按顺序摆放K、V
reservedMemory
1、创建ExternalSorter,根据mapSideCombine,传入dep.aggregator2、根据是否有mapSideCombine,数据会插入到两种容器中map、buffer
spark.testing.reservedMemory
DiskBlockManager
1、supportsRelocationOfSerializedObjects 支持序列化2、!dependency.aggregator.isDefined 依赖RDD没有做聚合操作3、!(numPartitions > MAX_SHUFFLE_OUTPUT_PARTITIONS_FOR_SERIALIZED_MODE) 分区数小于16777215+1
combineByKey
while (records.hasNext) { addElementsRead() val kv = records.next() font color=\"#ff6666\
var pos = rehash(k.hashCode) & mask var i = 1while (true) { // 在当前数组中寻找key val curKey = data(2 * pos) if (curKey.eq(null)) { font color=\"#ff6666\
Driver
0.6
返回值:usableMemory * memoryFraction
0 条评论
回复 删除
下一页