MapReduce详细笔记
2023-06-04 11:17:50 13 举报
AI智能生成
MapReduce是一种编程模型和处理大量数据的相关实现。它主要用于大数据处理,通过将复杂的数据处理任务分解为一系列可并行执行的子任务来提高处理效率。Map阶段负责将输入数据拆分成多个独立的部分,然后对每个部分进行处理并生成键值对。Reduce阶段则负责对所有Map阶段的输出进行合并和汇总,以得到最终的结果。MapReduce框架通常与分布式文件系统(如HDFS)一起使用,以实现数据的分布式存储和处理。
作者其他创作
大纲/内容
分支主题
总思维导图
\tMapReduce在整个Hadoop过程中负责计算部分,它分为两个部分,MapTask和ReduceTask,其中Shuffle介于其间,它的整体运作过程是,HDFS中的数据通过RecordReader,从输入的InputSplit中产生Key/Value,写入环形缓冲区,这里依据keyhash算法算出了各个Split的Partition,与此同时依照k进行排序,当环形缓冲区达到80%后,开始Spill(溢写)(此处可进行combiner),当整个文件Spill结束后,进行归并(Merge)并对同样的分区(Partition)依照K进行再排序(此处可进行combiner),最终将数据写到磁盘,Reduce方法开始,以K分组,对同组K中V进行计算,利用Reduce方法写出。
MapReduce总流程概述
HDFS的BLK是物理数据切块
MapTask的数据切片是逻辑分片,不涉及物理
切片的实质
默认等于blocksize
如何控制切片大小
针对每一个文件单独切片,若剩下的部分是否大于Blocksize的1.1倍(设定这个数字防止切片过小)
如何判断是否切片
汇总切片信息到切片规划文件
目的:防止大量小切片产生大量MapTask降低效率
应用:Driver中进行设定
CombineTextInputFormat切片机制
数据切片
TextInputFormat(逐行读取的String类型读取格式)
KeyValueTextInputFormat(键值对读取)
NLineInputFormat(多行读取,指定行数为N)
自定义InputFormat(根据需求,应用举例时我们用小文件举例)
类型
通过Configuration属性的set方法设定切割符
通过job实例的方法setInputFormatClass设置输入格式
应用:在Driver中进行设定
需求:将多个小文件合并成一个SequenceFile
重写isSplitable方法
返回一个new RecordReader对象
重写createRecordReader方法
自定义类继承FileInputFormat
作用:开流
FileSplit fs
FSDataInputStream fis
initialize
nextKeyValue
getCurrentKey
getCurrentValue
getProgress
自定义RecordReader类,继承RecordReader
步骤
自定义InputFormat的应用
FileInputFormat实现类
数据读取
InputFormat数据输入(MapTask)
MapTask通过用户编写的RecordReader,从输入InputSplit中解析出一个个key/value
Read阶段
该节点主要是将解析出的key/value交给用户编写map()函数处理,并产生一系列新的key/value
Map阶段
在用户编写map()函数中,当数据处理完成后,一般会调用OutputCollector.collect()输出结果。在该函数内部,它会将生成的key/value分区(调用Partitioner),并写入一个环形内存缓冲区中
Collect收集阶段
即“溢写”,当环形缓冲区满后,MapReduce会将数据写到本地磁盘上,生成一个临时文件。需要注意的是,将数据写入本地磁盘之前,先要对数据进行一次本地排序,并在必要时对数据进行合并、压缩等操作。
利用快速排序算法对缓存区内的数据进行排序,排序方式是,先按照分区编号Partition进行排序,然后按照key进行排序。这样,经过排序后,数据以分区为单位聚集在一起,且同一分区内所有数据按照key有序。
按照分区编号由小到大依次将每个分区中的数据写入任务工作目录下的临时文件output/spillN.out(N表示当前溢写次数)中。如果用户设置了Combiner,则写入文件之前,对每个分区中的数据进行一次聚集操作。
将分区数据的元信息写到内存索引数据结构SpillRecord中,其中每个分区的元信息包括在临时文件中的偏移量、压缩前数据大小和压缩后数据大小。如果当前内存索引大小超过1MB,则将内存索引写到文件output/spillN.out.index中。
Spill阶段
当所有数据处理完成后,MapTask对所有临时文件进行一次合并,以确保最终只会生成一个数据文件
Combine
MapTask工作机制
默认Partitioner分区为key的hashCode和Integer.max_value进行与运算,并以numReduceTasks取余
默认Partitioner分区
重写getPartition方法
自定义类继承Patitioner
setPartitionerClass()-设置自定义的Partitioner类
setNumReduceTasks()-设定相应数量的ReduceTasks
Driver
自定义Partitioner
分区号从0开始
如果分区数大于ReduceTasks数,会产生空文件
如果ReduceTasks数小于分区数,则溢出
总结
Partition分区
默认依靠字典顺序快排
部分排序
只设置一个ReduceTask,处理大型文件效率很低
输出结果只有一个文件,文件内部有序
全排序
在Reduce端对key分组
辅助排序(GroupingComparator)
自定义排序中compareTo判断条件为两个时为二次排序
二次排序
排序类型
如果是bean对象,则在该类中实现(implements)WritableComparable接口,并在此接口中重写compareTo方法
实操需求:不同省份输出不同的手机号,在不同的省份内又用流量内部排序
自定义Partition,重写getPartition方法
setPartitionerClass
setNumReduceTasks
Driver类中
实操步骤
区内排序(Partition内)
自定义WritableComparable
需求:求每个订单中最贵的商品
Fields,Method,Getter & Setter
compareTo
Bean类实现WritableComparable接口
super(Bean.class,true)
Comparator类
setGroupingComparatorClass
Driver类,设置reduce的分组
实现步骤
GroupingComparator
WritableComparable排序
相当于Map阶段的Reduce
int count = 0;
for(IntWritable v :values){
count += v.get();
}
使用一个count计数,遍历相同key的值进行累加
写出
继承Reducer,重写reduce方法
Driver类中setCombinerClass
自定义Combiner实现的步骤
Combiner
Shuffle机制
ReduceTask从各个MapTask上远程拷贝一片数据,并针对某一片数据,如果其大小超过一定阈值,则写到磁盘上,否则直接放到内存中。
Copy阶段
在远程拷贝数据的同时,ReduceTask启动了两个后台线程对内存和磁盘上的文件进行合并,以防止内存使用过多或磁盘上文件过多。
Merge阶段
各个MapTask已经实现对自己的处理结果进行了局部排序,因此,ReduceTask只需对所有数据进行一次归并排序即可。
Sort阶段
reduce()函数将计算结果写到HDFS上。
Reduce阶段
ReduceTask工作机制
目的:控制最终文件的输出路径和输出格式
详细参考MapReduce代码文件
自定义OutputFormat
OutputFormat
如果不进行任何设置,默认的map个数是和blcok_size相关的。
default_num = total_size / block_size;
默认MapTask如何决定
1.如果想增加map个数,则设置mapred.map.tasks 为一个较大的值。
2.如果想减小map个数,则设置mapred.min.split.size 为一个较大的值。
3.如果输入中有很多小文件,依然想减少map个数,则需要将小文件merge为大文件,然后使用准则2。
如何设置MapTask个数
MapTask究竟有多少个
直接默认了1
默认ReduceTask如何决定
job.setNumReduceTasks()
如何设置ReduceTask值
The right number of reduces seems to be 0.95 or 1.75 multiplied by (<no. of nodes> * <no. of maximum containers per node>).
通过可用节点数,和每个节点的最大的Containers的数量乘积得出一个确定值,在此定值的0.95~1.75倍游动
Hadoop Documents
如何设置ReduceTask才合适
ReduceTask究竟设置多少个
MapTask与ReduceTask的数量之谜
MapTask与ReduceTask小结
Map端负责以连接字段做key,将需要的内容作为value,进行输出
Reduce端负责计算合并
实现思路(主要依靠reduce方法)
Map资源利用率不高,只进行分类工作
Reduce阶段压力过大,且容易产生数据倾斜
缺点
Map Join
解决方式
Reduce Join
Map Join适用于一张表十分小、一张表很大的场景。
使用场景
在Mapper的setup阶段,将文件读取到缓存集合中
在驱动函数中加载缓存
采用DistributedCache
思路
Join机制
数据清洗过程中,利用计数器记录各种数据
计数器应用与数据清洗(ETL)
MapReduce框架原理
MapReduce是分布式运算程序的编程框架
核心功能是将用户编写的业务逻辑代码和自带默认组建整合成一个完整的分布式运算程序
MapReduce定义
易于编程,利用接口
基于HDFS良好的扩展性
高容错性
适合大数据的离线处理
优点
不擅长实时计算
Spark是MapReduce的改进版,它可以进行流计算
不擅长流式计算
不擅长DAG(有向图)计算
MapReduce特点
整个过程切分为Map(切片,(K,V)对应工作),和Reduce(计算统计阶段)
MapReduce核心编程思想
负责程序过程调度和状态协调
MRAppMaster
负责Map阶段的整个数据处理流程
MapTask
负责Reduce阶段的整个数据处理流程
ReduceTask
MapReduce进程
重写map方法,规定入参格式
重写reduce方法,规定入参格式(其中有迭代器用作计数)
Reducer阶段
统合架构,启动MapReduce的工具
获取实例
设置驱动路径
设置Mapper和Reducer的路径
MapOutput的类型和最终输出的类型
设置输入输出路径
设置输出类型
提交任务
Driver阶段
WordCount基本三阶段
MapReduce概述
Java序列化附带其他信息,内容较多,不便于传输
为何不用Java的序列化
Fields、Getter&Setter、toString,
入参DataOutput out
out.writexxx
序列化方法(write)
入参 DataInput in
in.readxxx
反序列化方法(readFields)
写好自定义对象的Bean文件
set k值,set v值,v值不止一个,则多设置
Mapper类
若v值不只1个,则迭代器中累加多个值
封装时将两个v值都封装入Bean中
Reducer类
Driver类
自定义对象实现序列化接口
Hadoop序列化
MapReduce
0 条评论
回复 删除
下一页