hadoop
2023-06-07 13:49:25 1 举报
AI智能生成
hadoop
作者其他创作
大纲/内容
优化&新特性
HDFS—故障排除
集群安全模式
1)安全模式
文件系统只接受读数据请求,而不接受删除、修改等变更请求
2)进入安全模式场景
NameNode在加载镜像文件和编辑日志期间处于安全模式
NameNode再接收DataNode注册时,处于安全模式
3)退出安全模式条件
dfs.namenode.safemode.min.datanodes:最小可用datanode数量,默认0
dfs.namenode.safemode.threshold-pct:副本数达到最小要求的block占系统总block数的百分比,默认0.999f。
dfs.namenode.safemode.extension:稳定时间,默认值30000毫秒,即30秒
4)基本语法
集群处于安全模式,不能执行重要操作(写操作)。集群启动完成后,自动退出安全模式。
(1)bin/hdfs dfsadmin -safemode get (功能描述:查看安全模式状态)
(2)bin/hdfs dfsadmin -safemode enter (功能描述:进入安全模式状态)
(3)bin/hdfs dfsadmin -safemode leave (功能描述:离开安全模式状态)
(4)bin/hdfs dfsadmin -safemode wait (功能描述:等待安全模式状态)
NameNode故障处理
1)需求:
NameNode进程挂了并且存储的数据也丢失了,如何恢复NameNode
元数据丢失
2)故障模拟
kill -9 NameNode的进程号
rm -rf /opt/module/hadoop-3.1.3/data/dfs/name/*
3)问题解决
(1)拷贝SecondaryNameNode中数据到原NameNode存储数据目录
scp -r atguigu@hadoop104:/opt/module/hadoop-3.1.3/data/dfs/namesecondary/* ./name/
(2)重新启动NameNode
--daemon start namenode
(3)向集群上传一个文件确认是否还是安全状态
磁盘修复
案例一
1)重新启动集群
(2)集群启动后,立即来到集群上删除数据,提示集群处于安全模式
案例二:磁盘修复
需求
数据块损坏,进入安全模式,如何处理
原数据丢失
(1)分别进入hadoop102、hadoop103、hadoop104的/opt/module/hadoop-3.1.3/data/dfs/data/current/BP-1015489500-192.168.10.102-1611909480872/current/finalized/subdir0/subdir0目录,统一删除某2个块信息
(2)重新启动集群
(3)观察http://hadoop102:9870/dfshealth.html#tab-overview
会进入安全模式且提示在等待块信息加载
安全模式已经打开,块的数量没有达到要求
(4)离开安全模式
hdfs dfsadmin -safemode get Safe mode is ON
hdfs dfsadmin -safemode leave Safe mode is OFF
(5)观察http://hadoop102:9870/dfshealth.html#tab-overview
会提示缺失原数据的块信息
(6)找到对应块的元数据将元数据删除
(7)观察http://hadoop102:9870/dfshealth.html#tab-overview,集群已经正常
案例3:
需求
模拟等待安全模式
(1)查看当前模式
hdfs dfsadmin -safemode get Safe mode is OFF
(2)先进入安全模式
bin/hdfs dfsadmin -safemode enter
(3)创建并执行下面的脚本
先进入安全模式等待
再上传一个文件
(4)再打开一个窗口,离开安全模式
bin/hdfs dfsadmin -safemode leave
(5)再观察上一个窗口
Safe mode is OFF
(6)此时HDFS集群上已经有上传的数据了
HDFS—多目录
DataNode多目录配置
1)DataNode可以配置成多个目录,每个目录存储的数据不一样(数据不是副本)
上传数据时只会在指定的目录有
2)具体配置
<property>
<name>dfs.datanode.data.dir</name>
<value>file://${hadoop.tmp.dir}/dfs/data1,file://${hadoop.tmp.dir}/dfs/data2</value>
</property>
集群数据均衡之磁盘间数据均衡
增加一块硬盘
刚加载的硬盘没有数据时,可以执行磁盘数据均衡命令。(Hadoop3.x新特性)
(1)生成均衡计划
hdfs diskbalancer -plan hadoop102
(2)执行均衡计划
hdfs diskbalancer -execute hadoop102.plan.json
(3)查看当前均衡任务的执行情况
hdfs diskbalancer -query hadoop102
(4)取消均衡任务
hdfs diskbalancer -cancel hadoop102.plan.json
HDFS—集群扩容及缩容
服役新服务器
需要在原有集群基础上动态添加新的数据节点
环境准备
(1)在hadoop100主机上再克隆一台hadoop105主机
(2)修改IP地址和主机名称
(3)拷贝hadoop102的/opt/module目录和/etc/profile.d/my_env.sh到hadoop105
(4)删除hadoop105上Hadoop的历史数据,data和log数据
(5)配置hadoop102和hadoop103到hadoop105的ssh无密登录
服役新节点具体步骤
(1)直接启动DataNode,即可关联到集群
在白名单中增加新服役的服务器
(1)在白名单whitelist中增加hadoop104、hadoop105,并重启集群
(2)分发
(3)刷新NameNode
hdfs dfsadmin -refreshNodes
4)在hadoop105上上传文件
服务器间数据均衡
1)企业经验:
经常在某些节点上提交数据或者有新的服务器扩展时,发生数据倾斜需要执行集群均衡命令
2)开启数据均衡命令
sbin/start-balancer.sh -threshold 10
参数10,代表的是集群中各个节点的磁盘空间利用率相差不超过10%,可根据实际情况进行调整。
3)停止数据均衡命令
sbin/stop-balancer.sh
由于HDFS需要启动单独的Rebalance Server来执行Rebalance操作,所以尽量不要在NameNode上执行start-balancer.sh,而是找一台比较空闲的机器。
添加白名单
1)在NameNode节点的/opt/module/hadoop-3.1.3/etc/hadoop目录下分别创建whitelist 和blacklist文件
2)在hdfs-site.xml配置文件中增加dfs.hosts配置参数
3)分发配置文件whitelist,hdfs-site.xml
4)第一次添加白名单必须重启集群,不是第一次,只需要刷新NameNode节点即可
5)在web浏览器上查看DN,http://hadoop102:9870/dfshealth.html#tab-datanode
6)二次修改白名单,增加hadoop104
7)刷新NameNode
hdfs dfsadmin -refreshNodes
8)在web浏览器上查看DN,http://hadoop102:9870/dfshealth.html#tab-datanode
黑名单退役服务器
适用
黑名单:表示在黑名单的主机IP地址不可以,用来存储数据。
企业中:配置黑名单,用来退役服务器。
步骤
1)编辑/opt/module/hadoop-3.1.3/etc/hadoop目录下的blacklist文件添加退役节点
如果白名单中没有配置,需要在hdfs-site.xml配置文件中增加dfs.hosts配置参数
2)分发配置文件blacklist,hdfs-site.xml
3)第一次添加黑名单必须重启集群,不是第一次,只需要刷新NameNode节点即可
4)检查Web浏览器,退役节点的状态为decommission in progress(退役中),说明数据节点正在复制块到其他节点
5)等待退役节点状态为decommissioned(所有块已经复制完成),停止该节点及节点资源管理器。注意:如果副本数是3,服役的节点小于等于3,是不能退役成功的,需要修改副本数后才能退役
6)如果数据不均衡,可以用命令实现集群的再平衡
sbin/start-balancer.sh -threshold 10
Hadoop企业优化
MapReduce优化方法
数据输入
在执行mr前合并小文件
采用combinetextinputformat作为输入
Map阶段
提高溢写触发的内存量,提高单次溢写量,减少次数
减少合并次数
增大并归路数,减少并归次数
不影响业务的前提下,在map之后采用combine减少IO次数
Reduce阶段
合理设置map和reduce的个数
太少
导致task等待,延长处理时间
太多
map和reduce抢占资源,导致处理超时等错误
设置map和reduce共存
在map运行到一定时候,让reduce参与进来,减少reduce的等待时间
规避使用reduce
因为reduce在连接数据时需要消耗大量网络资源
合理设置reduce端的buffer
内存资源充足的情况下,可设置部分数据不由磁盘就直接输送到reduce
I/O传输
数据压缩,减少io
lzo
snappy
使用sequencefile二进制文件
数据倾斜问题
1.数据频率倾斜
数据文件量远大于平均值
2.数据量倾斜
数据大小远超平均值
减少倾斜的方法
抽样和范围分区
自定义分区
combiner
采用mapjoin,避免reducejoin
常用的调优参数
mapred-default.xml
task资源上限则会被杀
cpu核数、
并行度
buffer相关
Shuffle的环形缓冲区大小,默认100m
环形缓冲区溢出的阈值,默认80%
yarn-default.xml
内存量
cpu
Hadoop小文件优化方法
Hadoop小文件弊端
一方面会大量占用NameNode的内存空间
另一方面就是元数据文件过多,使得寻址索引速度变慢。
MapTask的处理时间比启动时间还小,白白消耗资源。
4.4.2 Hadoop小文件解决方案
1)小文件优化的方向
(1)在数据采集的时候,就将小文件或小批数据合成大文件再上传HDFS。
(2)在业务处理之前,在HDFS上使用MapReduce程序对小文件进行合并。
(3)在MapReduce处理时,可采用CombineTextInputFormat提高效率。
(4)开启uber模式,实现jvm重用
2)Hadoop Archive
是一个高效的将小文件放入HDFS块中的文件存档工具,能够将多个小文件打包成一个HAR文件,从而达到减少NameNode的内存使用
3)CombineTextInputFormat
CombineTextInputFormat用于将多个小文件在切片过程中生成一个单独的切片或者少量的切片。
4)开启uber模式,实现JVM重用。
开启uber模式,在mapred-site.xml中添加配置
Hadoop扩展
集群间数据拷贝
1)scp实现两个远程主机之间的文件复制
scp -r hello.txt root@hadoop103:/user/atguigu/hello.txt // 推 push
scp -r root@hadoop103:/user/atguigu/hello.txt hello.txt // 拉 pull
2)采用distcp命令实现两个Hadoop集群之间的递归数据复制
bin/hadoop distcp hdfs://hadoop102:8020/user/atguigu/hello.txt hdfs://hadoop105:8020/user/atguigu/hello.txt
102 -》105
小文件存档
hdfs小文件存档缺点
小文件虽然占资源是按实际大小,但是小文件数量多会占用单量namenode的元数据存储
解决小文件存储
hdfs存档、har存档
吧小文件存为一个整体,但是内部又是独立的
从而减少namenode 存储量
实操
(1)需要启动YARN进程
(2)归档文件
把/user/atguigu/input目录里面的所有文件归档成一个叫input.har的归档文件,并把归档后文件存储到/user/atguigu/output路径下。
hadoop archive -archiveName input.har -p /user/atguigu/input /user/atguigu/output
(3)查看归档
hadoop fs -ls har:///user/atguigu/output/input.har
需要借助协议,才能查看
(4)解归档文件
hadoop fs -cp har:/// user/atguigu/output/input.har/* /user/atguigu
根据协议接归档为原来的形式
回收站
1)回收站参数设置及工作机制
2)启用回收站
core-site.xml,
<property>
<name>fs.trash.interval</name>
<value>1</value>
</property>
3)查看回收站
回收站目录在hdfs集群中的路径:/user/atguigu/.Trash/….
4)通过程序(idea或者其他)删除的文件不会经过回收站,需要调用moveToTrash()才进入回收站
5)通过网页上直接删除的文件也不会走回收站。
6)只有在命令行利用hadoop fs -rm命令删除的文件才会走回收站
hadoop fs -rm -r /user/atguigu/input
7)恢复回收站数据
hadoop fs -mv /user/atguigu/.Trash/Current/user/atguigu/input /user/atguigu/input
就是从回收站移回来
Yarn
概念
Yarn是一个资源调度平台,负责为运算程序提供服务器运算资源,相当于一个分布式的操作系统平台,而MapReduce等运算程序则相当于运行于操作系统之上的应用程序。
Yarn资源调度器
Yarn基础架构
resoucemanager
1.集群资源的分配与调度
2.管理nodemanager
3.管理/启动APPmaster
4.处理客户端的请求
nodemanager
1.处理resoucemanager的命令
2.处理APPmaster的命令
3.管理单个节点上的资源调配
app master
1.管理单个任务的资源申请和分配
2.对任务的监控和容错
container
封装了某个节点上的资源,cpu,ram,ROM,等
Yarn工作机制
(1)MR程序提交到客户端所在的节点。
(2)YarnRunner向ResourceManager申请一个Application。
(3)RM将该应用程序的资源路径返回给YarnRunner。
(4)该程序将运行所需资源提交到HDFS上。
(5)程序资源提交完毕后,申请运行mrAppMaster。
(6)RM将用户的请求初始化成一个Task。
(7)其中一个NodeManager领取到Task任务。
(8)该NodeManager创建容器Container,并产生MRAppmaster。
(9)Container从HDFS上拷贝资源到本地。
(10)MRAppmaster向RM 申请运行MapTask资源。
(11)RM将运行MapTask任务分配给另外两个NodeManager,另两个NodeManager分别领取任务并创建容器。
(12)MR向两个接收到任务的NodeManager发送程序启动脚本,这两个NodeManager分别启动MapTask,MapTask对数据分区排序。
(13)MrAppMaster等待所有MapTask运行完毕后,向RM申请容器,运行ReduceTask。
(14)ReduceTask向MapTask获取相应分区的数据。
(15)程序运行完毕后,MR会向RM申请注销自己。
作业提交全过程
(1)作业提交
第1步:Client调用job.waitForCompletion方法,向整个集群提交MapReduce作业。
第2步:Client向RM申请一个作业id。
第3步:RM给Client返回该job资源的提交路径和作业id。
第4步:Client提交jar包、切片信息和配置文件到指定的资源提交路径。
第5步:Client提交完资源后,向RM申请运行MrAppMaster。
(2)作业初始化
第6步:当RM收到Client的请求后,将该job添加到容量调度器中。
第7步:某一个空闲的NM领取到该Job。
第8步:该NM创建Container,并产生MRAppmaster。
第9步:下载Client提交的资源到本地。
(3)任务分配
第10步:MrAppMaster向RM申请运行多个MapTask任务资源。
第11步:RM将运行MapTask任务分配给另外两个NodeManager,另两个NodeManager分别领取任务并创建容器。
(4)任务运行
第12步:MR向两个接收到任务的NodeManager发送程序启动脚本,这两个NodeManager分别启动MapTask,MapTask对数据分区排序。
第13步:MrAppMaster等待所有MapTask运行完毕后,向RM申请容器,运行ReduceTask。
第14步:ReduceTask向MapTask获取相应分区的数据。
第15步:程序运行完毕后,MR会向RM申请注销自己。
(5)进度和状态更新
YARN中的任务将其进度和状态(包括counter)返回给应用管理器, 客户端每秒(通过mapreduce.client.progressmonitor.pollinterval设置)向应用管理器请求进度更新, 展示给用户。
(6)作业完成
除了向应用管理器请求作业进度外, 客户端每5秒都会通过调用waitForCompletion()来检查作业是否完成。时间间隔可以通过mapreduce.client.completion.pollinterval来设置。作业完成之后, 应用管理器和Container会清理工作状态。作业的信息会被作业历史服务器存储以备之后用户核查。
Yarn调度器和调度算法
先进先出调度器(FIFO)
单队列,根据提交作业的先后顺序,先来先服务
优点:简单易懂。
缺点:不支持多队列,生产环境很少使用。紧急job得不到优先处理
容量调度器(Capacity Scheduler)
1.多队列,每个队列采取fifo策略,每个队列分配一定的资源
2.容量保证,管理员可设置最低和最好资源使用
3.灵活性,空闲队列资源可出借,直到自己有新job需要处理
4.多租户:支持多用户共享集群,多任务并行
防止同一用户作业占据所有资源,会对同一用户所占资源有所限制
资源分配算法
队列资源分配
从root开始,深度优先算法,寻找资源占用率最低的队列分配资源
作业资源分配
按作业的优先级和提交时间来优先分配资源
容器资源分配
安容器优先级分配资源
若优先级一样
则按任务和数据机架感知来
公平调度器(Fair Scheduler)
优点:与容量调度器一致
与容量调度器不同点
1.核心调度策略不同
容量调度器:优先占用率低的
公平调度器:优先资源缺额比例大的
2.每个队列资源配置设置
容量调度器:fifo,drf
公平调度器:fifo,fair,drf
资源分配算法
不加权
(递归)每个对象多余的资源相加/缺额对象个数
加权
(递归)每个对象多余的资源总和/缺额对象的权重和 * 缺额对象对应的权重
设置
yarn-default.xml文件
Yarn案例实操
Yarn生产环境核心参数配置案例
yarn-site.xml配置参数
主要是cpu核数,内存上下限,虚拟核,虚拟内存设置
容量调度器多队列提交案例
需求
default队列占总内存的40%,最大资源容量占总资源60%,hive队列占总内存的60%,最大资源容量占总资源80%。
配置多队列的容量调度器
在capacity-scheduler.xml中配置
yarn.scheduler.capacity.root.queues
添加hive
yarn.scheduler.capacity.root.default.capacity
改为40
yarn.scheduler.capacity.root.default.maximum-capacity
default的是60
设置新队列属性
yarn.scheduler.capacity.root.hive.capacity
yarn.scheduler.capacity.root.hive.user-limit-factor
yarn.scheduler.capacity.root.hive.maximum-capacity
yarn.scheduler.capacity.root.hive.state
yarn.scheduler.capacity.root.hive.acl_submit_applications
yarn.scheduler.capacity.root.hive.acl_administer_queue
yarn.scheduler.capacity.root.hive.acl_application_max_priority
yarn.scheduler.capacity.root.hive.maximum-application-lifetime
yarn.scheduler.capacity.root.hive.default-application-lifetime
2)分发配置文件
3)重启Yarn或者执行yarn rmadmin -refreshQueues刷新队列,就可以看到两条队列
向Hive队列提交任务
hadoop入门
概念
大数据:海量,高增长率和多样化的信息
hadoop
分布式系统架构
hadoop生态圈
三大发行版本
Apache
Apache版本最原始(最基础)的版本,对于入门学习最好。2006
官网地址:http://hadoop.apache.org
下载地址:https://hadoop.apache.org/releases.html
cloudera
Cloudera内部集成了很多大数据框架,对应产品CDH。2008
官网地址:https://www.cloudera.com/downloads/cdh
下载地址:https://docs.cloudera.com/documentation/enterprise/6/release-notes/topics/rg_cdh_6_download.html
hortonworks
Hortonworks文档较好,对应产品HDP。2011
官网地址:https://hortonworks.com/products/data-center/hdp/
下载地址:https://hortonworks.com/downloads/#data-platform
优势
高可靠性
多个副本
高拓展性
可添加节点,增强计算能力和存储能力
高效性
将一个任务分成多个并行的task运行,效率更高
高容错性
能够自动将失败的任务分配给其他节点
hadoop组成
1.x版本前
MapReduce
集计算和资源调度为一体
hdfs
分布式文件管理系统,数据存储
common
辅助工具
2.x版本后
(3.x后无多大变化)
MapReduce
只负责计算
yarn
负责资源的调度
hdfs
分布式文件管理系统,数据存储
common
辅助工具
作用
主要解决大数据的存储、采集和计算分析问题
特点
volume 大量
velocity 高速
variety 多样
value 低价值密度
组成
Hdfs
概念
Hadoop Distributed File System,简称HDFS,是一个分布式文件系统。
框架
NameNode
存储文件的元数据
包括文件名、文件目录结果、文件属性、块大小、块所在的DataNode等
DataNodes
存储文件的块数据,以及校验和
Secondary NameNode
每隔一段时间备份元数据
Yarn
概念
Yet Another Resource Negotiator简称YARN ,另一种资源协调者,是Hadoop的资源管理器。
框架
ResourceNode
管理整个集群资源
DataManager
单个节点资源管理的老大
Application Master
单个任务运行的老大
Container
容器,封装了一个任务运行所需的资源
MapReduce
概念
(1)Map阶段并行处理输入数据
(2)Reduce阶段对Map结果进行汇总
环境搭建
虚拟机安装
VMware安装
linux安装
在安装系统之前需要检查自己虚拟机的bios的虚拟化是否打开
centos
cpu、内存、硬盘分配
设置VMware nat上网方式设置
IP
Windows vm8 IPV4 设置
hosts文件设置
虚拟机内部网络设置
vim /etc/sysconfig/network-scripts/ifcfg-ens33
#IP的配置方法[none|static|bootp|dhcp](引导时不使用协议|静态分配IP|BOOTP协议|DHCP协议)
IP地址
网关
域名解析器
systemctl restart network
ifconfig
vim /etc/hostname
vim /etc/hosts
C:\Windows\System32\drivers\etc路径下改hosts
改完后重启 reboot
远程终端工具
作用
公司中使用的真实服务器或者是云服务器,都不允许除运维人员之外的员工直接接触,因此就需要通过远程登录的方式来操作
文件传输工具
xftp
hadoop运行模式
本地模式
单机运行,演示官方案例,学习用
在服务器上直接运行官方案例jar包,输入输出文件都是在本地
伪分布式
单机运行,一台服务器拥有所有集群的功能
完全分布式
多台服务器组成集群
搭建:
(1)准备3台客户机(关闭防火墙,修改静态IP、主机名称)
(2)安装JDK
(3)配置环境变量,jdk和hadoop都要,改完重启或者source一下profile
(4)安装Hadoop
Windows端也可以装,配置环境变量
上传解压hadoop安装包
(5)配置环境变量
/etc/profile.d/my_env.sh
改完后 source /etc/profile 让环境变量生效
(6)配置集群
三台DataNode、一台namenode、一台resoucemanager、一台secondarynamenode、一台historysever
NameNode和SecondaryNameNode不要安装在同一台服务器
ResourceManager也很消耗内存,不要和NameNode、SecondaryNameNode配置在同一台机器上。
hadoop环境即配置文件需要拷贝到集群其他服务器
编写脚本xsync
scp -r $pdir/$fname $user@$host:$pdir/$fname
在一个结点上可以将该节点内容覆盖复制分发到另外的其他结点
rsync -av $pdir/$fname $user@$host:$pdir/$fname
将上述命令封装为在一个结点上可以将该节点内容覆盖复制或者有差异更新到另外的其他结点
配置文件
默认配置文件
[core-default.xml]
[hdfs-default.xml]
[yarn-default.xml]
[mapred-default.xml]
自定义配置文件
核心配置文件
core-site.xml
指定NameNode的地址
指定hadoop数据的存储目录
配置HDFS网页登录使用的静态用户为atguigu
HDFS配置文件
hdfs-site.xml
nn web端访问地址
2nn web端访问地址
CheckPoint时间设置
MapReduce配置文件
mapred-site.xml
指定MapReduce程序运行在Yarn上
历史服务器端地址
历史服务器web端地址
YARN配置文件
yarn-site.xml
指定MR走shuffle
指定ResourceManager的地址
环境变量的继承
开启日志聚集功能
设置日志聚集服务器地址
设置日志保留时间为7天
配置works
/opt/module/hadoop-3.1.3/etc/hadoop/workers
同步结点信息
xsync /opt/module/hadoop-3.1.3/etc
常用端口
NameNode内部通信端口 8020 / 9000/9820
NameNode HTTP UI 9870
MapReduce查看执行任务端口 8088
历史服务器通信端口 19888
(7)单点启动
格式化namenode
hdfs namenode -format
不是第一次格式化则要先把集群停了,把data、logs和/temp删了
启动hdfs
sbin/start-dfs.sh
在配置了ResourceManager的节点(hadoop103)启动YARN
sbin/start-yarn.sh
(8)配置ssh
ssh免密登录
免密登录原理
a结点将一个密码切成上下文两段
即公钥id_rsa.pub 私钥id_rsa
将公钥发送给指定结点b
a发出私钥加密文件,b会去找a的公钥解密
解密完成后,将以公钥加密的数据返回给a
ssh另一台电脑的IP地址
如:ssh hadoop102
/home/atguigu/.ssh
ssh-keygen -t rsa
在ssh目录下创建公钥和私钥
ssh-copy-id 其他免密登录结点
将公钥分发
ssh-copy-id 指定结点
其他结点希望别的结点也能免密登录
则也要生成公私钥并分发公钥
(9)群起并测试集群
整体启动/停止HDFS
start-dfs.sh/stop-dfs.sh
整体启动/停止YARN
start-yarn.sh/stop-yarn.sh
分别启动/停止HDFS组件
hdfs --daemon start/stop namenode/datanode/secondarynamenode
启动/停止YARN
yarn --daemon start/stop resourcemanager/nodemanager
编写群起脚本
myhadoop
免密登录到namenode群启hdfs
免密登录到resoucemanager启动yarn
chmod +x myhadoop
xsync分发
jpsall
免密登录到每台结点查看进程状态
chmod +x jpsall
xsync分发
三个重要端口
http://hadoop102:9870
Web端查看HDFS的NameNode
元数据,数据块信息等
http://hadoop103:8088
Web端查看YARN的ResourceManager
资源使用情况
http://hadoop102:19888/jobhistory
查看job历史日志
配置集群时间同步
产生时间偏差,导致集群执行任务时间不同步。
找一个机器,作为时间服务器,所有的机器与这台集群时间进行定时的同步,生产环境根据任务对时间的准确程度要求周期同步。
操作
服务状态和开机自启动状态
sudo systemctl status ntpd
sudo systemctl is-enabled ntpd
sudo vim /etc/ntp.conf
将上面的注释去掉(并将192.168.1.0改成192.168.10.0)
restrict 192.168.10.0 mask 255.255.255.0 nomodify notrap
给上面的内容添加注释
#server 0.centos.pool.ntp.org iburst
#server 1.centos.pool.ntp.org iburst
#server 2.centos.pool.ntp.org iburst
#server 3.centos.pool.ntp.org iburst
添加
server 127.127.1.0
fudge 127.127.1.0 stratum 10
修改/etc/sysconfig/ntpd
SYNC_HWCLOCK=yes
让硬件时间与系统时间一起同步
配置完启动时间同步
sudo systemctl is-enabled ntpd
sudo systemctl enable ntpd
自启动
sudo systemctl status ntpd
sudo systemctl start ntpd
启动
常见错误
1)防火墙没关闭、或者没有启动YARN
2)主机名称配置错误
3)IP地址配置错误
4)ssh没有配置好
5)root用户和atguigu两个用户启动集群不统一
6)配置文件修改不细心
7)不识别主机名称
8)DataNode和NameNode进程同时只能工作一个。
删除data,logs、temp后重新格式化
9)执行命令不生效,粘贴Word中命令时,遇到-和长–没区分开,导致命令失效。
尽量不要粘贴Word中代码
10)jps发现进程已经没有,但是重新启动集群,提示进程已经开启。
原因是在Linux的根目录下/tmp目录中存在启动的进程临时文件,将集群相关进程删除掉,再重新启动集群。
11)jps不生效
全局变量hadoop java没有生效。解决办法:需要source /etc/profile文件
12)8088端口连接不上
cat /etc/hosts
注释掉如下代码
#127.0.0.1 localhost localhost.localdomain localhost4 localhost4.localdomain4
#::1 hadoop102
HDFS
hdfs概述
HDFS产出背景及定义
HDFS产生背景
需要一种系统来管理多台机器上的文件
分布式文件管理系统
HDFS定义
Hadoop Distributed File System
一个分布式文件管理系统
适合一次写入,多次读出的场景。
HDFS优缺点
优点
高容错性
数据保存为多个副本分布于多个datanode
误删,或者一个出问题,还有其他nodedata课帮忙修复
适合处理大量数据
数据规模大
pb级数据
数据文件数量多
百万规模以上
可构建在廉价机器(服务器)上,通过副本机制,提高可靠性
缺点
不适合低延时数据访问
如毫秒级
因为mr机制慢
无法高效对大量小文件进行存储
默认分块是按文件切分的,文件太小也会占用一个块
可以在输入端合并小文件,或者map阶段利用combine先进行汇总部分
不支持并发写入,随机修改
一个文件只允许有一个写,不允许多线程同时写
仅仅支持append追加,不允许随机修改
HDFS组成架构
NameNode
管理hdfs的名称空间
配置副本策略
管理数据块映射信息
处理客户端的读写请求
DataNode
储存实际的数据块
Namenode发出命令,DataNode执行客户端的读写操作
Secondary NameDode
辅助nn,定期合并fsimage和edits,并推送给nn
紧急情况下辅助恢复nn
client
文件切分成块再上传
与nn交互获取数据位置
与dn交互获取读写操作
提供命令管理hdfs
HDFS文件块大小(面试重点)
hdfs文件在物理上是按块存储的,块的大小根据传输速率决定
传输时间为寻址时间的100倍最佳
磁盘的传输速率靠近128、512、1024则块大小就设为该数值
hadoop3.x默认是128m
块大小设置太大太小问题
块大小设置的太小,块的数量相对会更多,而块存储起始位置需要寻址时间,块多寻址时间就更长
块太大,磁盘的传输时间明显的大于寻址时间,因此处理块数据时,会需要很多时间,比较慢
块大小的设置,最好根据磁盘的传输速率来决定
hdfs的shell操作(开发重点)
hadoop fs 具体命令 OR hdfs dfs 具体命令
hadoop fs -help rm
hadoop fs -mkdir /sanguo
-moveFromLocal
从本地剪切粘贴到HDFS
-copyFromLocal
从本地文件系统中拷贝文件到HDFS路径去
-put:等同于copyFromLocal
生产环境更习惯用put
-appendToFile
追加一个文件到已经存在的文件末尾
-copyToLocal
从HDFS拷贝到本地
-get
等同于copyToLocal,生产环境更习惯用get
-ls: 显示目录信息
-cat:显示文件内容
-chgrp、-chmod、-chown
Linux文件系统中的用法一样,修改文件所属权限
-mkdir
创建路径
-cp
从HDFS的一个路径拷贝到HDFS的另一个路径
-mv
在HDFS目录中移动文件
-tail
显示一个文件的末尾1kb的数据
-rm
删除文件或文件夹
-rm -r
递归删除目录及目录里面内容
-du
统计文件夹的大小信息
-setrep
设置HDFS中文件的副本数量
Windows上配置hadoop
配置环境变量
重启
验证Hadoop环境变量是否正常。双击winutils.exe,如果报如下错误。说明缺少微软运行库(正版系统往往有这个问题)。再资料包里面有对应的微软运行库安装包双击安装即可。
创建maven工程并导入依赖
hadoop jar包
log日志配置
test测试
src/main/resources目录下
新建一个文件,命名为“log4j.properties”
上传下载案例
1.创建客户端对象/FileSystem.get
URI
要连接的集群的Namenode地址
conf
参数设置
默认是hdfs上的参数设置
代码端设置 > pom.xml > hdfs-site > 默认配置
user
操作用户
默认是Windows用户
需要在设置里更改为指定用户
-DHADOOP_USER_NAME=atguigu
2.上传/下载
fs.copyFromLocalFile
// Path src 指要下载的文件路径
// Path dst 指将文件下载到的路径
fs.copyToLocalFile
// boolean delSrc 指是否将原文件删除
// Path src 指要下载的文件路径
// Path dst 指将文件下载到的路径
// boolean useRawLocalFileSystem 是否开启文件校验
3.关闭流
fs.close();
hdfs读写流程(面试重点)
文件写入
(1)客户端通过Distributed FileSystem模块向NameNode请求上传文件,NameNode检查目标文件是否已存在,父目录是否存在。
(2)NameNode返回是否可以上传。
(3)客户端请求第一个 Block上传到哪几个DataNode服务器上。
(4)NameNode返回3个DataNode节点,分别为dn1、dn2、dn3。
(5)客户端通过FSDataOutputStream模块请求dn1上传数据,dn1收到请求会继续调用dn2,然后dn2调用dn3,将这个通信管道建立完成。
(6)dn1、dn2、dn3逐级应答客户端。
(7)客户端开始往dn1上传第一个Block(先从磁盘读取数据放到一个本地内存缓存),以Packet为单位,dn1收到一个Packet就会传给dn2,dn2传给dn3;dn1每传一个packet会放入一个应答队列等待应答。
一个packet 64kb,packet里分为512字节的chunk 和4字节的校验码
(8)当一个Block传输完成之后,客户端再次请求NameNode上传第二个Block的服务器。(重复执行3-7步)。
网络拓扑-节点距离计算
机架感知
如果客户端在副本所在的节点则选取此节点
第二个副本在另一个机架的随机节点
第三个副本和第二个在同一机架
客户端不在集群
则随机选取一个结点作为第一结点
第二节点和第三结点在另一机架的随机节点
读数据流程
(1)客户端通过DistributedFileSystem向NameNode请求下载文件,NameNode通过查询元数据,找到文件块所在的DataNode地址。
(2)挑选一台DataNode(就近原则,然后随机)服务器,请求读取数据。
(3)DataNode开始传输数据给客户端(从磁盘里面读取数据输入流,以Packet为单位来做校验)。
(4)客户端以Packet为单位接收,先在本地缓存,然后写入目标文件。
NameNode和SecondaryNameNode
NN和2NN工作机制
原理
长时间添加数据到Edits中,会导致该文件数据过大,效率降低
一旦断电,恢复元数据需要的时间过长。
定期进行FsImage和Edits的合并,如果这个操作由NameNode节点完成,又会效率过低
引入一个新的节点SecondaryNamenode,专门用于FsImage和Edits的合并。
一旦NameNode节点断电,可以通过FsImage和Edits的合并,合成元数据。
NameNode启动
(1)第一次启动NameNode格式化后,创建Fsimage和Edits文件。如果不是第一次启动,直接加载编辑日志和镜像文件到内存。
(2)客户端对元数据进行增删改的请求。
(3)NameNode记录操作日志,更新滚动日志。
(4)NameNode在内存中对元数据进行增删改。
Secondary NameNode工作
(1)Secondary NameNode询问NameNode是否需要CheckPoint。直接带回NameNode是否检查结果。
(2)Secondary NameNode请求执行CheckPoint。
(3)NameNode滚动正在写的Edits日志。
(4)将滚动前的编辑日志和镜像文件拷贝到Secondary NameNode。
(5)Secondary NameNode加载编辑日志和镜像文件到内存,并合并。
(6)生成新的镜像文件fsimage.chkpoint。
(7)拷贝fsimage.chkpoint到NameNode。
(8)NameNode将fsimage.chkpoint重新命名成fsimage。
Fsimage和Edits解析
FsImage
hdfs文件系统的一个元数据永久性检查点,包括hdfs的所有目录和文件inode的序列化信息
oiv查看Fsimage文件
hdfs oiv -p 文件类型 -i镜像文件 -o 转换后文件输出路径
Edits文件
存放hdfs所有更新操作的的路径,文件系统客户端所有写的操作会优先存入edits
oev查看Edits文件
hdfs oev -p 文件类型 -i编辑日志 -o 转换后文件输出路径
CheckPoint时间设置
1)通常情况下,SecondaryNameNode每隔一小时执行一次。
2)一分钟检查一次操作次数,当操作次数达到1百万时,SecondaryNameNode执行一次。
DataNode
DataNode工作机制
(1)一个数据块在DataNode上以文件形式存储在磁盘上,包括两个文件,一个是数据本身,一个是元数据包括数据块的长度,块数据的校验和,以及时间戳。
(2)DataNode启动后向NameNode注册,通过后,周期性(6小时)的向NameNode上报所有的块信息。
(3)心跳是每3秒一次,心跳返回结果带有NameNode给该DataNode的命令如复制块数据到另一台机器,或删除某个数据块。如果超过10分钟+30秒没有收到某个DataNode的心跳,则认为该节点不可用。
(4)集群运行中可以安全加入和退出一些机器。
数据完整性
(1)当DataNode读取Block的时候,它会计算CheckSum。
(2)如果计算后的CheckSum,与Block创建时值不一样,说明Block已经损坏。
(3)Client读取其他DataNode上的Block。
(4)常见的校验算法crc(32),md5(128),sha1(160)。
(5)DataNode在其文件创建后周期验证CheckSum。
掉线时限参数设置
datanode掉线或者出问题无法与nn通信
nn不会将dn立马判定为死亡,默认等待10.5分钟的超时时长
MapReduce
MapReduce概述
MapReduce定义
一个分布式运算程序的编程框架
核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的分布式运算程序
MapReduce优缺点
优点
MapReduce易于编程
良好的扩展性
高容错性
适合PB级以上海量数据的离线处理
缺点
不擅长实时计算
不擅长流式计算
不擅长DAG(有向无环图)计算
MapReduce核心思想
(1)分布式的运算程序往往需要分成至少2个阶段。
(2)第一个阶段的MapTask并发实例,完全并行运行,互不相干。
(3)第二个阶段的ReduceTask并发实例互不相干,但是他们的数据依赖于上一个阶段的所有MapTask并发实例的输出。
(4)MapReduce编程模型只能包含一个Map阶段和一个Reduce阶段,如果用户的业务逻辑非常复杂,那就只能多个MapReduce程序,串行运行。
MapReduce进程
(1)MrAppMaster:负责整个程序的过程调度及状态协调。
(2)MapTask:负责Map阶段的整个数据处理流程。
(3)ReduceTask:负责Reduce阶段的整个数据处理流程。
官方WordCount源码
常用数据序列化类型
Boolean BooleanWritable
Byte ByteWritable
Int IntWritable
Float FloatWritable
Long LongWritable
Double DoubleWritable
String Text
Map MapWritable
Array ArrayWritable
Null NullWritable
MapReduce编程规范
mapper阶段
1.用户自定义mapper的类要继承自己的父类
2.map的输入方式是以键值对的方式
3.业务逻辑写在map()中
4.输出形式也是键值对
5.map()方法循环调用每一个键值对
reduce阶段
1.用户自定义的reduce要继承自己的父类
2.reduce的输入类型是map的输出类型
3.reducer的业务逻辑写在reduce中
4.reducer对每一对键值对调用一次reduce方法
driver阶段
相当于yarn的客户端,将整个程序提交到yarn上,提交的是封装了MapReduce程序及相关的运行参数的job对象
WordCount案例实操
按照MapReduce编程规范,分别编写Mapper,Reducer,Driver。
1.创建工程
2.添加依赖
3.src/main/resources目录下
新建文件log4j.properties添加信息
4.创建类
5.编写程序
编写mapper
按行切割数据,通过上下文对象写出
编写reducer
按统计切割后的数据,通过上下文对象写出
编写driver
1 获取配置信息以及获取job对象
2 关联本Driver程序的jar
3 关联Mapper和Reducer的jar
4 设置Mapper输出的kv类型
5 设置最终输出kv类型
6 设置输入和输出路径
提交到集群测试
(1)用maven打jar包,需要添加的打包插件依赖
(2)将程序打成jar包
(3)修改不带依赖的jar包名称为wc.jar,并拷贝该jar包到Hadoop集群的/opt/module/hadoop-3.1.3路径。
(4)启动Hadoop集群
(5)执行WordCount程序
在Windows上向集群提交任务
(1)添加必要配置信息
1 获取配置信息以及封装任务
设置在集群运行的相关参数-设置HDFS,NAMENODE的地址
指定MR可以在远程集群运行
指定yarn resourcemanager的位置
2 设置jar加载路径
3 设置map和reduce类
4 设置map输出
4 设置map输出
5 设置最终输出kv类型
6 设置输入和输出路径
7 提交
(2)编辑任务配置
操作用户
参数
(3) 打包
(4) 将Jar包设置到Driver中
(4)提交并查看结果
Hadoop序列化
序列化概述
序列化就是把内存中的对象,转换成字节序列(或其他数据传输协议)以便于存储到磁盘(持久化)和网络传输。
序列化可以存储“活的”对象,可以将“活的”对象发送到远程计算机。
hadoop序列化的特点
紧凑 :高效使用存储空间。
快速:读写数据的额外开销小。
互操作:支持多语言的交互
Java的序列化是一个重量级序列化框架(Serializable),一个对象被序列化后,会附带很多额外的信息(各种校验信息,Header,继承体系等),不便于在网络中高效传输。
反序列化就是将收到字节序列(或其他数据传输协议)或者是磁盘的持久化数据,转换成内存中的对象。
自定义bean对象实现序列化接口(Writable)
当键值对中需要包含大于两个字段时,加入对象
(1)必须实现Writable接口
(2)反序列化时,需要反射调用空参构造函数,所以必须有空参构造
(3)重写序列化方法
(4)重写反序列化方法
(5)注意反序列化的顺序和序列化的顺序完全一致。
(6)要想把结果显示在文件中,需要重写toString(),可用"\t"分开,方便后续用。
(7)如果需要将自定义的bean放在key中传输,则还需要实现Comparable接口,因为MapReduce框中的Shuffle过程要求对key必须能排序。
序列化案例实操
编写流量统计的Bean对象
编写Mapper类
编写Reducer类
编写Driver驱动类
MapReduce框架原理
InputFormat数据输入
切片与MapTask并行度决定机制
数据切片是MapReduce程序计算输入数据的单位,一个切片会对应启动一个MapTask。
map阶段的并行度由切片数量决定
默认切片大小是=块大小
Job提交流程源码和切片源码详解
1建立连接
创建提交Job的代理
判断是本地运行环境还是yarn集群运行环境
2 提交job
创建给集群提交数据的Stag路径
获取jobid ,并创建Job路径
拷贝jar包到集群
计算切片,生成切片规划文件
向Stag路径写XML配置文件
提交Job,返回提交状态
FileInputFormat切片源码解析(input.getSplits(job))
1.寻找文件目录
2.遍历目录文件
3.按文件调取getsplit方法获取切片
获取文件大小
计算切片大小
默认情况下=block大小
设置的最大值小于block
则会减小切片的大小,提升maptask数量
设置的最小值大于block
则会增大切片大小,降低maptask数量、
切片
大于切片大小的1.1倍则切
小于等于切片大小的1.1倍则为一片
切片信息写到规划文件
inputSplit
只记录了切片的元数据信息
切的起始位置,长度,所属节点等
4.提交切片规划文件到yarn上,mrappmaster(任务老大)会根据文件计算maptask大个数
FileInputFormat切片机制
TextInputFormat
FileInputFormat实现类
TextInputFormat、KeyValueTextInputFormat、NLineInputFormat、CombineTextInputFormat和自定义InputFormat等
TextInputFormat
默认的FileInputFormat实现类
按行读取每条记录。
key:行的偏移量
LongWritable类型
value:行内容数据
不包括任何行终止符(换行符和回车符),Text类型
CombineTextInputFormat案例实操
将输入的大量小文件合并成一个切片统一处理
job.setInputFormatClass(CombineTextInputFormat.class);
将默认的textinputformat改为combinetextinputformat
MapReduce工作流程
1.待处理数据文件即路径
2.inputformat切片
按照配置文件套切片公式
max(minsize,min(maxsize,blocksize))
得到切片信息
片数
元数据
起始位置
长度
节点信息等
3.客户端提交信息(submit)
job.split
wc.jar
job.xml
4.读取数据
mapper端的map方法一次读取一行数据
inputformat--recorderreader--linerecorderreader读取(key(偏移量),value(行内容))
对每次读取内容进行逻辑运算
得到key(指定字段),value(指定字段)
字段可以是包装后的一个对象
该对象应实现writable接口可序列化
context.write(k,v)
outputcollector
Shuffle机制
Shuffle机制
Map方法之后,Reduce方法之前的数据处理过程称之为Shuffle。
Partition分区
将统计结果按照不同分区输出到不同文件
自定义类继承Partitioner
重写getPartition方法
控制分区代码,几个分区几个reduce
在job驱动中设置自定义分区
根据自定义分区数在job驱动中设置reducetask个数
默认是hashcode值对reduce个数进行模运算
Partition分区案例实操
1)需求
将统计结果按照手机归属地不同省份输出到不同文件中(分区)
2)需求分析
3)自定义分区类
4)在驱动函数中增加自定义数据分区设置和ReduceTask设置
CombineTextInputFormat切片机制
背景
TextInputFormat切片机制
以文件为单位切片,文件太小任然是一片,因此大量小文件处理时,会寻址时间远大于处理时间的百分之一,效率低下
1)应用场景
将多个小文件从逻辑上规划到一个切片中
2)虚拟存储切片最大值设置
CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);// 4m
虚拟存储切片最大值设置最好根据实际的小文件大小情况来设置具体的值。
3)切片机制
虚拟存储过程及切片过程
所有小文件大小总和与设置的最大值进行比较,若小于则一个片
若大于
大于一倍小于两倍
则将总的均分为两部分
大于两倍
先按设置的最值切一份
然后再看是否大于一倍小于两倍
若是
则均分
不是
继续切,直到大于一倍小于两倍后均分
WritableComparable排序
maptask
map方法写出的键值输入到环形缓冲区
当输入的值达到环形缓冲区的一定值,环形缓冲区中该部分数据会进行快排为有序状态溢写为一个磁盘文件
直到该maptask任务所有数据都处理完后,对磁盘文件进行多路归并排序形成一个文件
reducetask
从每个maptask归并后的文件读取数据到内存,当读取的数据超过reducetask设置的一定值,则会溢写到磁盘文件,该文件是将多个maptask文件归并后得到一个有序文件
如果磁盘上文件额达到了一定的阈值,则也会对目前maptask溢写到磁盘的文件进行归并排序为一个更大的有序文件
当所有数据处理完毕,会分配一个reducetask归并处理所有内存和磁盘上的文件
WritableComparable排序案例实操(全排序)
排序都是对键值对的key值进行排序
key可按默认排序方式排序
也可自定排序方式(key值是一个对象)
将key值需要包含的字段包装为对象
自定义排序类实现WritableComparable
提供无参构造
生成属性的getter和setter方法
实现序列化和反序列化方法,注意顺序一定要一致
out.writeLong(this.upFlow)
this.upFlow = in.readLong();
重写ToString,最后要输出FlowBean
重写compareTo方法
编写Mapper类
编写Reducer类
编写Driver类
设置自定义分区器
设置对应的ReduceTask的个数
WritableComparable排序案例实操(区内排序)
Combiner合并
combiner的父类就是reducer
和reducer的区别在于运行位置不同
combiner是在每一个maptask上进行数据的局部汇总
reducer是对全局数据进行汇总
使用条件
combiner汇总后的数据不影响最终数据输出
combiner输出数据的格式要和maptask输出的数据格式一致
自定义Combiner实现步骤
①自定义一个Combiner继承Reducer,重写Reduce方法
②在Job驱动类中设置
job.setCombinerClass(WordCountCombiner.class);
Combiner合并案例实操
统计过程中对每一个MapTask的输出进行局部汇总,以减小网络传输量即采用Combiner功能
方案一
(1)增加一个WordCountCombiner类继承Reducer
(2)在WordcountDriver驱动类中指定Combiner
job.setCombinerClass(WordCountCombiner.class);
方案二
(1)将WordcountReducer作为Combiner在WordcountDriver驱动类中指定
job.setCombinerClass(WordCountReducer.class);
OutputFormat数据输出
OutputFormat接口实现类
fileoutputformat
textoutputformat
filteroutputformat
dboutputformat
应用场景
输出到SQL,HBASE等存储
自定义OutputFormat案例实操
1.自定义类继承outputformat
2.改写recorderwriter
具体改写输出数据的方法write
3.job.setOutputFormatClass(LogOutputFormat.class)
MapReduce工作机制
MapTask工作机制
Read阶段
MapTask通过InputFormat获得的RecordReader(底层是lineRecordReader),从输入InputSplit中解析出一个个key/value。
Map阶段
将解析出的key/value交给用户编写map()函数处理,并产生一系列新的key/value。
Collect收集阶段
在用户编写map()函数中,当数据处理完成后,一般会调用OutputCollector.collect()输出结果。在该函数内部,它会将生成的key/value分区(调用Partitioner),并写入一个环形内存缓冲区中。
Spill阶段
步骤1:利用快速排序算法对缓存区内的数据进行排序,排序方式是,先按照分区编号Partition进行排序,然后按照key进行排序。
经过排序后,数据以分区为单位聚集在一起,且同一分区内所有数据按照key有序。
步骤2:按照分区编号由小到大依次将每个分区中的数据写入任务工作目录下的临时文件output/spillN.out(N表示当前溢写次数)中。
步骤3:将分区数据的元信息写到内存索引数据结构SpillRecord中,其中每个分区的元信息包括在临时文件中的偏移量、压缩前数据大小和压缩后数据大小。
如果当前内存索引大小超过1MB,则将内存索引写到文件output/spillN.out.index中。
Merge阶段
当所有数据处理完成后,MapTask对所有临时文件进行一次合并,以确保最终只会生成一个数据文件。
ReduceTask工作机制
Copy阶段
ReduceTask从各个MapTask上远程拷贝一片数据,并针对某一片数据,如果其大小超过一定阈值,则写到磁盘上,否则直接放到内存中。
Sort阶段
ReduceTask启动了两个后台线程对内存和磁盘上的文件进行合并,以防止内存使用过多或磁盘上文件过多。
由于各个MapTask已经实现对自己的处理结果进行了局部排序,因此,ReduceTask只需对所有数据进行一次归并排序即可。
Reduce阶段
reduce()函数将计算结果写到HDFS上。
ReduceTask并行度决定机制
1)设置ReduceTask并行度(个数)
// 默认值是1,手动设置为4
job.setNumReduceTasks(4);
2)实验:测试ReduceTask多少合适
理论上和maptask数量差不多效率更高
需要考虑需求和场景
0个时,输出结果文件个数和map端输出结果一致
默认值是1个,输出的数据只有一个文件,一般用于汇总
此时设置了分区,但是不会分区,分区数由reducer个数决定
具体个数设置还要看集群性能
Join应用
Reduce Join
Map端的主要工作:
为来自不同表或文件的key/value对,打标签以区别不同来源的记录。然后用连接字段作为key,其余部分和新加的标志作为value,最后进行输出。
Reduce端的主要工作
在Reduce端以连接字段作为key的分组已经完成,我们只需要在每一个分组当中将那些来源于不同文件的记录(在Map阶段已经打标志)分开,最后进行合并就ok了。
Reduce Join案例实操
合并的操作是在Reduce阶段完成,Reduce端的处理压力太大,Map节点的运算负载则很低,资源利用率不高,且在Reduce阶段极易产生数据倾斜。
解决方案:Map端实现数据合并。
Map Join
Map Join适用于一张表十分小、一张表很大的场景。
优点:在Map端缓存多张表,提前处理业务逻辑,这样增加Map端业务,减少Reduce端数据的压力,尽可能的减少数据倾斜。
DistributedCache
(1)在Mapper的setup阶段,将文件读取到缓存集合中。
//缓存普通文件到Task运行节点。
job.addCacheFile(new URI("file:///e:/cache/pd.txt"));
//如果是集群运行,需要设置HDFS路径
job.addCacheFile(new URI("hdfs://hadoop102:8020/cache/pd.txt"));
(2)在Driver驱动类中加载缓存。
Map端Join的逻辑不需要Reduce阶段
Map Join案例实操
MapReduce开发总结
1)输入数据接口:InputFormat
(1)默认使用的实现类是:TextInputFormat
(2)TextInputFormat的功能逻辑是:一次读一行文本,然后将该行的起始偏移量作为key,行内容作为value返回。
2)逻辑处理接口:Mapper
用户根据业务需求实现其中三个方法:map() setup() cleanup ()
3)Partitioner分区
(1)有默认实现 HashPartitioner,逻辑是根据key的哈希值和numReduces来返回一个分区号;key.hashCode()&Integer.MAXVALUE % numReduces
(2)如果业务上有特别的需求,可以自定义分区。
4)Comparable排序
(1)当我们用自定义的对象作为key来输出时,就必须要实现WritableComparable接口,重写其中的compareTo()方法。
(2)部分排序:对最终输出的每一个文件进行内部排序。
(3)全排序:对所有数据进行排序,通常只有一个Reduce。
(4)二次排序:排序的条件有两个。
5)Combiner合并
Combiner合并可以提高程序执行效率,减少IO传输。但是使用时必须不能影响原有的业务处理结果。
6)逻辑处理接口:Reducer
reduce() setup() cleanup ()
7)输出数据接口:OutputFormat
(1)默认实现类是TextOutputFormat,功能逻辑是:将每一个KV对,向目标文本文件输出一行。
(2)用户还可以自定义OutputFormat。
Hadoop数据压缩
概述
1)压缩的好处和坏处
压缩的优点:以减少磁盘IO、减少磁盘存储空间。
压缩的缺点:增加CPU开销。
2)压缩原则
(1)运算密集型的Job,少用压缩。
压缩吃cpu资源
(2)IO密集型的Job,多用压缩。
IO多,cpu可能就空闲,则可压缩
MR支持的压缩编码
压缩方式选择
LZO 需要安装 LZO算法 后缀.lzo 支持切片 需要建索引,还需要指定输入格式
Snappy 直接使用 Snappy算法 后缀.snappy 不支持切片 和文本处理一样,不需要修改
压缩效率
压缩算法 原始文件大小 压缩文件大小 压速 解速
gzip 8.3GB 1.8GB 17.5MB/s 58MB/s
bzip2 8.3GB 1.1GB 2.4MB/s 9.5MB/s
LZO 8.3GB 2.9GB 49.3MB/s 74.6MB/s
压缩解压缩都快,但是压缩效率低
snappy
i7 64位,单核,压缩速度250m/s,解压速度500 MB/s
压缩解压缩都快,但是压缩效率低
压缩方式选择
Gzip压缩
优点:压缩率比较高。
缺点:不支持Split;压缩/解压速度一般。
Bzip2压缩
优点:压缩率高;支持Split。
缺点:压缩/解压速度慢。
Lzo压缩
优点:压缩/解压速度比较快;支持Split。
缺点:压缩率一般;想支持切片需要额外创建索引。
Snappy压缩
优点:压缩和解压缩速度快。
缺点:不支持Split;压缩率一般。
压缩位置选择
可选三个阶段
输入端
必须要支持切片的压缩方式
bzip2 和 lzo
map输出或者reduce输入端
此处需要多考虑速率问题,压缩时间不应太长
lzo和snappy
reduce输出端
若考虑永久存储课选压缩率高的
bzip2和gzip
若还要考虑下一个mapreduce输入,则还要考虑是否可切片
压缩参数配置
core-site.xml中设置输入端压缩
mapred-site.xml中设置
map输出端压缩
reduce输出端的压缩
压缩实操案例
Map输出端采用压缩
Reduce输出端采用压缩
0 条评论
下一页