SparkShuffleReader+writer&reader 组合
2024-12-04 10:47:24 0 举报
Spark框架原理
作者其他创作
大纲/内容
shuffle-reader
interruptibleIter
rdd
repartition:new ShuffledRDD
iter.next > writer
sortByKey:new ShuffledRDD.setKeyOrdering
spark的shuffle系统dep
taskthread
fetchLocalBlocks
pipeline迭代器嵌套iterator
hadooprdd
shuffle
map端join
blockmanagermemory and disk
getReader
blockManager.getBlockData
writer
initialize
taskthreadp?
task运行时怎么完成persist
为什么会有shuffle:1,改变分区的数量,改变并行度2,aggregated3,keyOrdering
aggregatedIter
SortShuffleManager
handle
groupByKey:combineByKey(mapSideCombine = false)
join一般会触发shuffle
1,没有mapsite聚合2,下游分区数小于200groupByKeysortBykeyrepartition自定义的bombinebyKey
0 partition
broadcast
聚合:mapsite:false
dep
resulttask
memory
agg
metricIter
FetchRequesttargetRequestSize48M 1/5
上游mapsite == true数据变少,拉取IO快,计算快,内存省falsereduce--能将数据压缩group--没办法,压缩不下来你开启了就浪费cpu空转,为了规避这种浪费spark帮你想了,mapsite == false
read()
checkpoint
sort
Base
物理的是taskrun-> iter.next -> 嵌套的iter(pipeline)
NettyBlockTransferService每一个节点都会有的服务
shuffleDep
groupbykey
1,无聚合器2,序列化可寻址3,分区数限制:16777215sortbykeyrepartition
data10E
results
list10
result
站在用户的角度
shufflemaptask
driver
1 partition
resultIter
ShuffleBlockFetcherIterator
remoteRequests
splitLocalRemoteBlocks
HadoopRDD
persist:blockmanager
keyOrdering
Unsafe
aggregated[combine]三个函数:第一条记录后续记录合并溢写
curBlocks面向一个Executor
executor
fetchUpToMaxBytes
1,checkpoint,不会立刻触发作业2,action算子触发runJob后再触发的checkpoint3,checkpoint会单独触发runJob(痛点:代码逻辑会重复一次)4,so,我们在使用checkpoint的时候是要结合persist算子*,因为:persist是优先任务执行,然而chp是滞后任务、job的**,无论persist、checkpoint:都有一个共通点:那些被重复利用的数据RDD:弹性的分布式的数据集分布式:partition弹性的:某一个分区:可能是memeory、disk、hdfsmapreduce中:如果一个转换的数据很值钱:单独跑一个map的MR程序,map的结果只能存在hdfs,后续可以跑几百个不同逻辑的计算面向这个结果
?中间数据
sendRequest
comput
src -> iterreader
task 1compute(P)
persist
recordIter
shuffleblock
shuffle-writer
repartition
combineCombinersByKey
shuffle readerShuffledRDD
combineValuesByKey
new
true
blockmanager
compute
NettyBlockRpcServer
sortbykey
什么时候触发拉取?
ShuffleHandle
false
localBlocks
fetchBlocks
blockManager
task 0compute(P)
remoteBlocks
reduceByKey:combineByKey(mapSideCombine: Boolean = true)
receive
聚合:mapsite:true
byPass
combineByKey:new ShuffledRDD.setAggregator.setMapSideCombine
OpenBlocks
task
reader
none
BlockStoreShuffleReader
disk
数据源
RDD:def iterator
sendRpc
buffer无聚合器groupbykey
map有聚合器combinebykeyreducebykey
mapSite
wrappedStreams
shuffle reader
datalisttaskbinary
client
RDD
……
netty知识
ser/des
0 条评论
下一页