大数据知识点
2020-09-08 11:30:26 4 举报
AI智能生成
大数据知识点
作者其他创作
大纲/内容
Kafka
kafka是高性能,高吞吐,低延迟的分布式消息队列
架构
produce
生产者
Block
topic
partition
topic的数据会分割成一个或多个partition
messager
topic会根据分配策略选择分区,将数据添加到指定partition的末尾队列
follower
从节点副本,只可读不可修改,并每隔一段时间与leader进行同步
leader宕机会从follower中选举一个leader
replication
备份数据
每个block可有多个topic
consumer
消费者
执行流程
produce会选择一个topic生成消息,会依据分配策略选择partition,将数据添加到指定的partition文件末尾
consumer会选择一个topic(指定的offset)进行消费,消费后保存offset
消费者组
每个消费者都会维护自身的offset
每个消费者都有对应的消费组
消费者模型
quene
publish-subscribe
一条消息在group内只被消费一次
ISR机制
kafka中每个partition中会对应一个leader和多个follwer
kafka写入数据只往leader写,follwer只负责同步leader数据
副本机制
ack
ack=-1
produce会等待follower将leader的数据同步完成,才会commit提交,数据一致性高,但性能低
ack=1
produce等待leader接收到数据即可commit,性能高,数据一致性低
ack=0
produce只管发送,不需要等待leader和follower的落地,就可以commit
ISR队列,每一个leader都会有一个ISR队列,由leader动态维护,列表中的follower数据和leader基本同步
ISR数据一致性的标准
follower超过10s未向leader同步数据,移出
replica.lag.max.ms
follower和leader相差4000条,移出
replica.lag.max.messages
leader挂了,优先从isr列表进行leader选举,并且将之前的leader移除ISR列表
kafkaAPI
生产者
指定broker位置找到kafka集群
通过protition设置参数,创建produce对象
keyedMessage根据key进行hash求模分区
producer.send(message)
消费者
High level consumer
消费者设置相关properties参数
zk地址
消费者group
是否自动提交偏移量
自动提交偏移量的时间间隔
设置消费者第一次消费的位置
指定主题消费,选择线程数
通过迭代器获取消息
消息持久化
本质是可以消息持久化
temp/kafka-logs
该目录下会按照topic的分区进行存储,一个分区一个目录
存储格式为多个semgent文件
.log
存储数据
.index
索引文件
二分法快速定位对应offset所在的索引文件,然后通过索引到log文件查找对应的数据
高吞吐本质
页缓存pagecatch
produce发送数据给kafka时,会优先保存到broker节点的pagecatch,并定时flush到磁盘上
顺序写
顺序写比随机写快,保证写的高效率
零拷贝
相比常规的磁盘IO读文件流程少掉了scoket缓存和用户区缓存(减少上下文应用切换)
kafka数据重复和丢失
数据丢失
消费者
自动提交offset,这时如果consumer宕机,数据还没消费完,就会丢失数据
解决
手动提交offset
生产者
produce发送数据到kafka中时,kafka会优先将数据存储到PageCatch中,需要定时flush到本地的磁盘上,这时如果系统宕机,flush还没操作,就会丢失数据
解决
增加flush频率,治标不治本
设置ack=-1或all,保证数据必须写入leader并follower同步才算真正的写入成功,在继续写入,但性能会降低
备份机制,防止数据丢失
数据重复
原因
自动提交offset慢了,消费完数据后,发生宕机,下次从zk中读取上次提交的offset,会导致丢失数据
解决
保证幂等性
事务
手动提交offset
Strom
Spark
简介
spark是处理大规模数据集的分布式框架
Spark Core
RDD
分布式数据结果集
五大特性
RDD由一系列partition组成
函数作用在partition上
RDD之间有一系列依赖关系
分区器作用在KV格式的RDD
RDD提供了最佳计算位置,“计算移动,数据不移动”的理念
算子
转换算子
map
flatMap
reducebykey
groupbykey
mapPartition
sortbykey
触发算子
foreach
take
collect
foreachPartition
countBykey
countByValue
count
reduce
saveAsTextFile
持久化算子
checkpoint
spark的容错机制
执行流程
1.job在执行完会从finalRDD向前追溯,找到checkpoint标记的RDD
2.找到后,会重新开启一个job,到标记的RDD结束
3.将这些结果缓冲到指定的位置上
cache
数据存放在内存中,但容易掉电易失
cache() = persist() = memory_only
persist
diskonly的和副本的我们不推荐选择
注意:checkpoint new job的结果可交由cache缓存,可减少new job的计算
宽窄依赖
RDD之间的依赖关系
宽依赖
父RDD和子RDD之间的partition的关系是一对多
产生shuffle
窄依赖
父RDD和子RDD的partition的关系是多对一
不产生shuffle
stage
在spark中RDD之间的依赖关系会形成DAG(有向无环图),DAG Sceduler会将DAG进行切分,遇到宽依赖会切分成stage,每个stage有一个或多个task,stage封装到taskSet,在将taskSet发送到TaskSceduller运行
pipeline计算模式
stage并行度
由FinalRDD决定
资源任务调度和任务调度
DAG Sceduler
RDD之间的依赖关系会形成DAG,DAG sceduler
Task Sceduler
遍历TaskSet,将task发送给worker节点(executor进程)
重试机制
task如果失败,会重试3次,3次还失败,task所在的stage会失败
stage会失败后,会由DAG Sceduler重试4次,4次还是失败,这个job也会失败,任务结束
内存管理
静态内存
内存大小写死,容易造成内存分配不合理
统一内存
注意
互相借用的内存,必须要等任务结束完释放内存在可以使用
共享变量
广播变量
将RDD计算好的结果广播到内存中,方便后续操作
累加器
源码解析
运行模式
local
standalone
yarn
执行流程
1.Client会先申请启动Driver
2.在去RM申请AM的启动
3.RM会先随机一台NM开启
4.NM开启后在去RM申请启动一批容器
5.RM返回给AM一批NM节点
5.Driver发送task,Executor汇报任务情况给Driver
Spark SQL
简介
sparkSql可以让spark支持原生sql,编写操作hive,hbase,mysql,hdfs等存储,也使得spark脱离了hive的限制
区别
Spark On Hive
hive存储,spark解析
Hive On Spark
hive存储和解析分析,spark执行
dataSet与DataFrame
谓词下推
先执行各个条件,过滤掉部分数据,降低join要查询的数据量,提高查询效率
序列化问题
什么是序列化?
序列化是一种用来处理对象流的机制,可以将对象类进行流化,流化后的对象可进行读写操作或用于网络传输
父类实现了序列化,子类默认实现序列化,如父类未实现序列化,但子类实现了序列化还是无效
被static,transient修饰的不能被序列化
序列化版本不一致也会导致反序列化报错
自定义UDF和UDAF
udf
自定义函数
udaf
聚合函数
开窗函数
row_number over(partition by )
Spark Streaming
spark Streaming是具有高性能,高吞吐,可容错的准实时流处理框架
sparkStreaming和storm区别
1.storm是纯实时处理,sparkStreaming是准实时处理,spark的延迟比storm高
2.storm只能处理简单的逻辑业务,sparkStreaming由于有RDD,可以进行复杂的业务逻辑
算子操作
finalRDD
transform
对DStream做RDD到RDD的任意操作
updateStateByKey
每一个key维护一份state状态,sparkstream会对已存在的key进行state状态更新
窗口操作
每隔5s一个batch,窗口长度为15,窗口滑动时间为10s
窗口长度和滑动间隔必须是batchInterval的**整数倍**
sparkstream整合kafka(0.8)
receiver模式
执行流程
1.Executor Recevier task接收kafka发送过来的数据
2.将数据发送到不同节点上的task,将数据写入到本地磁盘
3.备在将zookeeper中的偏移量更新
4.将写入本地磁盘的位置告诉Driver端的Recevier Tracker,由Recevier Tracker将分发task执行
出现问题
当Driver进程挂掉后,Executor进程也会被杀掉,当更新完zookeeper消费偏移量时,Driver进程挂掉后,会导致数据找不到问题,相当于数据丢失问题
解决
WAL预写日志机制,在接受过来数据备份到其他节点时,同时备份到HDFS上,但HDFS的读写性能太低,会增加job的执行时间,对与任务的执行提高了延迟度,并且还会造成数据重复消费问题
在sparkstream中如果使用kafka整合不推荐使用receiver模式,易重复消费
Driect模式
sparkStream+kafka的Driec模式是利用kafka存数据的一方,kafka不是被动的存数据而是主动取数据,Driec模式中数据偏移量是sparkStream自己管理的,默认存在内存中,当然如果设置了checkpoint目录,那么偏移量会存在checkpoint中,也可以利用zookeeper来存。
并行度
由kafka的topic中的partition的分区数而定
对比
1.被动将数据接收到executor,当任务堆积时,数据存储问题
2.receiver模式不能手动管理offset
3.开启WAL机制,会将数据同时写入到HDFS上,影响job的执行速度,还会造成数据重复消费问题
4.Driect模式是利用kafka主动存数据,并且offset交由sparkstream自己管理,当kafka挂掉了,下次重启会从内存中的offset开始读起,避免数据丢失和重复消费
5.Driect模式还可以通过checkpoint来管理offset
但是逻辑代码发生改变时,就不能使用checkpoint来管理offset,需要自己手动管理,这时可使用redis来进行管理
Spark 调优
资源调优
在部署spark集群中指定资源分配的默认参数
SPARK_WORKER_CORES
SPARK_WORKER_MEMORY
SPARK_WORKER_INSTANCES
在提交Application的时候给当前的Application分配更多的资源
提交命令选项
--executor-cores
--executor-memory
--total-executor-cores
配置信息
spark.executor.cores
spark.executor.memory
spark.max.cores
动态分配资源
并行度调优
如果读取的数据在HDFS中,降低block大小,相当于提高了RDD中partition个数sc.textFile(xx,numPartitions)
sc.parallelize(xxx, numPartitions)
coalesce
减少分区数
repartition
增加分区数
代码调优
避免重复的RDD
多次使用的RDD进行持久化
避免shuffle类的算子
持久化算子
广播变量
使用高性能的算子
使用map-side的预聚合的shuffle操作
spark Shuffle调优
buffer大小——32KB
shuffle read拉取数据量的大小——48M
shuffle聚合内存的比例——20%
拉取数据重试次数——5次
重试间隔时间60s
Spark Shuffle的种类
解决数据倾斜问题
ETL预处理数据
前提:只要保证hive表中的数据是不会数据倾斜的,spark就不会遇到数据倾斜
map join
map端预聚合,减少reduce拉取的数据量,避免内存溢出
两次阶段聚合
1.局部聚合
给key加随机数,将key打散分布到不同的reduce上聚合
2.全局聚合
将聚合好的数据去掉随机数,在进行聚合
小表join大表
小表在前,大表在后,减少join要查询的数据量
大表join大表
将要发生倾斜的key抽取出来,并分拆join
适合场景
倾斜key的量不是很大
解决思路
将要倾斜的key抽取出来,对倾斜key单独处理,在对不倾斜的key处理,最后两表进行join或union
随机前缀和扩容RDD进行join
增加reduce数量
提高并行度
Flink
机器学习
Kylin
Linux
常用命令
top
查看内存
df -h
查看磁盘
netstart -tunlp | grep
查看端口占用情况
ll
子主题
shell脚本
Hadoop
HDFS
分布式存储文件系统
读写流程
读取流程
1.client先向NameNode申请要读取的文件
2.NN获取该文件所存储的DN位置,并返回给client
3.client在从指定的DN上拉取数据
写入流程
1.向NN申请上传文件
2.响应后,clinet会发送第一个block数据
3.创建一个通道(parpline),用于数据传输,并有应答机制来保证数据是否成功传输完成
4.返回ack后,才会继续传输,直到文件上传完成
HDFS会将完整的文件切分成一个个的block(默认128M),并将这些block分布到各个集群上
系统架构
NN
存储元数据信息,管理读写操作
DN
存储block
心跳机制
3s
会向NN汇报block的位置信息,安全机制
SSN
加快NN的启动时间
MapReduce
mapreduce是hadoop的计算框架
执行流程
1.每个block对应split
2.每一个split会分配一个maptask来处理
3.maptask会将数据写入到KVbuffle
4.KVBuffle会将数据优先写入到内存中,当达到阈值(80%)时开始溢写
在溢写下可选择combiner进行轻度的汇总,减轻reducetask要处理的数据量
5.溢写会对数据进行分区(按照key的hash值进行分区),排序(字典排序)
6.溢写的文件还会merga成大文件并分区
7.reducetask根据分区拉取merga文件(reduce的数量对应map的数量)
8.reducetask会进行合并排序,并把文件传到reduce
9.reduce会对这些文件归并,并最终传输到HDFS上
MR的计算也是基于内存计算是非常快的,但涉及到多次数据落地到磁盘和网络的IO传输,影响到其整体的运行效率,这也是MR最大的致命
数据倾斜问题
Yarn
架构
ResourceManager
资源分配管理器
NodeManager
执行流程
1.client会向RM申请application资源,RM返回资源路径,提交job
2.RM会将app拆分成多个task并根据资源调度器来分配NM
3.NM接收到app时,会初始化容器,并对这个app启动对应的APPMaster实例
4.容器运行期间会向RM汇报心跳,当任务执行完成后,application会向RM注销释放容器资源
hadoop2.X升级后出现yarn,由于hadoop1.x的伸缩性差,并且namenode只有一个,容易出现单点故障
调度器
FIFO
单队列,先进先出
容量调度器
多队列,也是先进先出,每个队列在同一阶段只有一个任务执行,队列数是队列的并行度数
企业中推荐
公平调度器
多队列,每个队列内部按照缺额大小进行分配资源启动任务
Hive
hive可以将sql语句解析成MR程序的分布式计算工具
架构
元数据
mysql
解析器
存储层
hdfs
操作
DDL(表操作)
创建表
create [external] table [if not exists] tablename(
`colname` datatype [comment 。。。]
) [comment ...]
partitioned by
clustered by
sorted by
row format
stored as
`colname` datatype [comment 。。。]
) [comment ...]
partitioned by
clustered by
sorted by
row format
stored as
数据类型
分区表
partitioned by(name String)
DML(CRUD操作)
查询
select from
大致与sql语句相同
分区排序
distribute by
常用查询函数
rank row_number
字段排序
case when
空值赋值处理
nvl(s1,-1)
列转行
lateral view explore
窗口函数
UDF
自定义函数
实现UDF--实现evaluate方法--add jar --create function
加载数据
load data (local本地) inpath 路径 overwrite into table 表名
外部表和内部表
外部表
删除外部表,只会删除元数据信息,并不会将真实数据删除(存储在HDFS上)
关键字External修饰
内部表
删除内部表,会将元数据和真实数据全部删除
可以互相转换
alter table 表名 set tblproperties('EXTERNAL'='TRUE');
hive优化
表优化
小表 join 大表
小表在前,大表在后
减少内存溢出的问题
大表 join 大表
解决
异常key值处理
给异常key值加盐,分布到不同的reduce
调整reducetask数量
在准备工作前配置历史服务
是可以在页面上查看reducer计算的耗时
一分为二,倾斜的key做mapjoin,不倾斜的key正常join,后面将2表数据union all
map join
开启预聚合
动态分区
sql语句调优
group by代替distinct
列裁剪
sort by代替order by
小文件合并
三种方式
HAR
可将所有小文件归档成一个大文件(*.har),该大文件有元数据信息,索引和文件内容
缺点
归档后,小文件还会存在,不会删除
归档后的文件不能压缩
SequenceFile
SequenceFile二进制格式文件,由K,V格式存储,K存储文件名,V存储文件内容,在业务处理之前进行手动合并
Combiner
在map端就将小文件进行预聚合合并
压缩
ORC,可以加快map端的数据网络IO到reduce,需自己安装,压缩后可切分
gzip
bzip
snappy
合理设置map数
增加
减少
JVM重用
存储格式
textFile
默认格式
RCFile
推荐,列式存储
ORCFile
推荐,RCFile改良版
SEQUENCEFILE
Hbase
habse是高性能,高可靠,基于列存储的非关系型数据库
架构
HMaster
负责管理RegionServer和读写操作
HRegionServer
存储数据,执行由HMaster分配的Region
Hlog
预写日志,记录RegionServer的一系列操作,以防宕机丢失数据
Store
列族
memstore
内存写入,达到阈值(128M)开始溢写磁盘,生成StoreFile
storFile
当单个storeFile超过阈值会分成2个相等的sorefile
Region
表信息,一个RegionServer可以有多个region,一个region只能有一个regionServer
ZK
存储元数据信息,监控和调度HMaster
三级寻址
zk记录root表信息,从root表在找到meat表信息,meat表信息记录真实数据
rowkey设计原则
长度原则
不超过64kb
散列原则
避免相同key数据走单个regionserver处理,出现数据热点问题
唯一性原则
hbase的rowkey是按照字典排序,利用这点,可以顺序存储
热点问题
单个regionserver处理过多的数据,从而影响到其他regionserver的运行,甚至宕机
解决办法
哈希
字符串反转
加盐
hbase读写流程
读流程
1.先从zk获取到regionserver位置信息
2.regionserver会优先找catch文件查看需要读取文件是否存在,如存在直接返回读取
3.没有,则去region找到store,先读取memstore数据
4.如果没有在去读取storeFile数据
写流程
1.获取到regionserver位置信息,对应的region表信息
2.先去Hlog写入日志,只有Hlog写入成功才会继续,在去region写入数据,region会优先去memstore写入
3.当memstore写入数据达到阈值会将数据溢写到磁盘,并生成storefile
4.storefile的数据量也达到阈值也会划分成两个相等的storefile
当storefile太小,会进行合并,生成大的storefile文件
Hbase优化
子主题
子主题
过滤器
布隆过滤器
Flume
flume可以将实时生成的日志信息流入到HDFS上的日志收集系统
架构
source
一般而言,最广泛地还是日志文件
Channel
缓冲区
memory channel
file channel
sink
hdfs
kafka
操作
编写相应的配置文件
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
flme-ng agent --conf path --name a1 --conf-file 具体的配置路径
hdfs sink
a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/%S
a1.sinks.k1.hdfs.filePrefix = events-
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute
a1.sinks = k1
a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/%S
a1.sinks.k1.hdfs.filePrefix = events-
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute
kafka sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.topic = mytopic
a1.sinks.k1.brokerList = localhost:9092
a1.sinks.k1.requiredAcks = 1
a1.sinks.k1.batchSize = 20
a1.sinks.k1.channel = c1
a1.sinks.k1.topic = mytopic
a1.sinks.k1.brokerList = localhost:9092
a1.sinks.k1.requiredAcks = 1
a1.sinks.k1.batchSize = 20
a1.sinks.k1.channel = c1
Flume采集数据会丢失吗?(防止数据丢失的机制)
不会,channel的数据是存储在file中,数据传输自身有事务。
put
take
过滤器
子主题
Sqoop
可以将HDFS的数据导入到muysql上,也可以从mysql导入到hdfs的数据迁移工具
操作
导入(import)
sqoop import \
--connect jdbc:mysql://node01:3306/sqoop \
--username root \
--password 123456 \
--table 表名 \
--target-dir /user/sqoop \
--num-mapper 1 \
--fields-terminated-by "\t"
--connect jdbc:mysql://node01:3306/sqoop \
--username root \
--password 123456 \
--table 表名 \
--target-dir /user/sqoop \
--num-mapper 1 \
--fields-terminated-by "\t"
导出(export)
sqoop export \
--connect jdbc:mysql://node01:3306/sqoop \
--username root \
--password 123456 \
--table 表名 \
--export-dir /user/sqoop \
--num-mapper 1 \
--input-fields-terminated-by "\t"
--connect jdbc:mysql://node01:3306/sqoop \
--username root \
--password 123456 \
--table 表名 \
--export-dir /user/sqoop \
--num-mapper 1 \
--input-fields-terminated-by "\t"
一般而言都是写成脚本的方式运行
export \
--connect jdbc:mysql://node01:3306/sqoop \
--username root \
--password 123456 \
--table 表名 \
--export-dir /user/sqoop \
--num-mapper 1 \
--input-fields-terminated-by "\t"
--connect jdbc:mysql://node01:3306/sqoop \
--username root \
--password 123456 \
--table 表名 \
--export-dir /user/sqoop \
--num-mapper 1 \
--input-fields-terminated-by "\t"
脚本文件不需要刚开始的sqoop命令
sqoop --options-file 文件
Redis
基于内存存储的k,v格式的非关系型数据库
数据类型
list
hash
map
set
zset
主从复制
redis支持一主多从的模式架构,主节点支持读写操作,从节点只支持读操作
哨兵机制
sentinel.conf
监控
哨兵不断的检查主从服务器的状态
防止主从复制带来的单节点故障,如主节点宕机就会使redis丢失写入功能,只能读
当主节点宕机,会从从节点选举成为新的主节点
redis集群
HA
持久化方式
AOF
基于内存存储日志,容易丢失,存储的是操作命令,数据恢复起来慢,但启动速度快
RDB
基于磁盘存储日志,不容易丢失,存储的是数据本身,数据恢复起来快,但启动速度慢
Es
分布式搜索引擎
倒排索引
数据存储结构
基本命令
分片和备份
架构
JavaAPI
ZK
分布式协调管理工具,多用于大数据管理或协调各个组件的工作
架构
leader
投票的发起和决议,对外更新zk的状态
follower
接收客户端的请求和返回相应的结果,参加leader的选举投票(票数大于一半就会通过)
功能
消息同步
负载均衡
分布式集群管理
服务器动态上下线
操作
zkServer.sh start
启动zk
zkServer.sh stop
停止zk
zkServer.sh status
查看zk的状态
zkCli.sh
启动zk客户端
ls /
查看zk下存储的信息
get
create
0 条评论
下一页