MapReduce工作流程
2020-11-15 20:24:45 0 举报
AI智能生成
MapReduce工作流程
作者其他创作
大纲/内容
Job提交
Driver中提交Job
submit提交
确认MR提交方式
创建cluster对象
初始化本地or集群环境
通过读取配置循环获取MR提交者
提交者提交
检查MR的输出路径
获取Job的临时工作区间
获取JobId
将前两步组合成job提交路径并创建
生成切片对象
获取InputFormat对象
通过InputFormat对象生成切片
生成切片方法
获取块大小
计算切片大小
计算切片数
根据切片数设置MapTask个数
写文件
提交job
最终删除job相关文件
Map端
构建真正执行的job
确认提交位置
读取job.xml文件
执行job线程的run方法
job run方法
读取 job.splitmetainfo文件
获取ReduceTask个数
根据切片的个数,创建对应的MapTaskRunnable任务
创建线程池
将MapTaskRunnable提交给线程池执行
迭代Runnable 提交给线程池执行
执行MapTaskRunnable里面的run方法
创建MapTask对象
执行MapTask的run方法
先判断是否有ReduceTask
runNewMapper
创建一个Mapper
创建InputFormat读取数据
获取切片详情,记录文件从哪个位置读到哪个位置
input 调用到TextRecordRedader
获取输出对象 判断写出到文件中或缓冲区中
执行Mapper的run方法
调用到自己写的Map方法
关闭缓冲区对象
准备缓冲区
创建缓冲区对象MapTask$MapOutputBuffer
初始化缓冲区
设置溢写百分比
设置缓冲区大小
设置索引缓存大小
设置索引排序算法(快排)
获取Key的排序比较器
压缩
combiner
启动溢写线程,并等待溢写
获取分区器对象
k,v在进入缓冲区之前要计算分区,分区与ReduceTask有关
获取reduce个数
如果reduce个数大于1,通过读取配置,获取分区器
如果获取到,则使用获取到的分区器
如果获取不到,则默认使用HashPartitioner分区器类
根据key的hash值对reduce的个数取余,得到key的分区号
如果reduce个数等于1,通过匿名内部类的方式,创建Partitioner对象, 在getPartition方法中 返回分区号 0
shuffle端
context.write(k,v)将kv写出,进入shuffle流程
收集K,V判断是否开始溢写,满足条件后溢写
通知溢写线程,开始溢写
排序并溢写
创建溢写文件
数据排序
分区溢写,完成后writer 关闭
判断索引内存是否超过默认,超过则写入磁盘
mapper端持续溢写
最后一次不满足溢写条件时,在关闭缓冲区时写出
关闭缓存区,内部调用缓存区刷新方法
调用排序并溢写方法
所有数据写出后开始归并操作
创建溢写文件数组,判断是否需要归并
创建最终归并输出文件
解析归并输出文件的文件
开始归并,判断是否需要combiner,并删除溢写文件
Reduce端
创建ReduceTaskRunnable对象
创建线程池对象,将ReduceTaskRunnable 提交给线程池执行
迭代Runnable 提交给线程池执行
执行ReduceTaskRunnable里面的run方法
创建ReduceTask对象
执行ReduceTask的run方法
runNewReducer
make a reducer
执行Reducer的run方法
调到自己写的reduce方法
是否分组
写出K,V
TextOutputForma 写出到文件
FSDataOutputSteam流的write方法写出数据
收藏
0 条评论
下一页