Spark
2021-01-17 03:33:29 4 举报
AI智能生成
spark应用
作者其他创作
大纲/内容
Spark-Core
Spark
分布式计算框架,可以基于内存处理数据
与MR的区别
1.Spark可以基于内存处理数据,速度快
2.Spark 中有DAG有向无环图执行引擎
3.Spark有各类算子实现各种功能,MR只有map端和reduce端,相当于Spark中的两个算子map,reduceByKey
4.Spark 是粗立度资源申请,MR是细粒度资源申请
5.SparkShuffle相对于MR的shuffle更灵活
6.
Spark 技术栈
HDFS,YARN,MR,Hive ,Storm
SparkCore,SparkSQL,SparkStreaming,SparkMLlib
Spark 代码流程
案例:Spark WordCount
java
SparkConf conf = new SparkConf().setMaster("local").setAppName("xx")
JavaSparkContext sc = new JavaSparkContext(conf)
JavaRDD<String> rdd1 = sc.textFile(.)
JavaPairRDD rdd2 = rdd1.flatMap...mapToPair(new PairFunction(new Tuple2<String,Integer>)).reduceByKey
rdd2.foreach...
sc.stop()
scala
val conf = SparkConf().setMaster("local").setAppName("xxx")
val sc = SparkContext(conf)
val result = sc.textFile("").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_)
result.foreach...
sc.stop()
SparkConf
1.Spark 的运行模式:local,standalone,yarn,messos
2.设置Spark application 在WEBUI中的名称。
3.设置Spark application运行的资源,例如:内存
SparkContext
SparkContext Spark的上下文,通往Spark集群的唯一通道
代码流程步骤
1.创建SparkConf val conf = new SparkConf().setMaster..setAppName..
2.创建SparkContext val sc = new SparkContext(conf)
3.创建RDD val rdd = sc.textFile(...)
4.对RDD使用Transformation类算子操作
5.对RDD使用Action触发Transformation算子执行
6.sc.stop
Spark运行模式
Local
多用于本地测试,eclipse,IDEA本地开发Spark应用程序使用
Standalone
Spark自带的资源调度框架,支持分布式搭建
Yarn
Hadoop生态圈中的资源调度框架,Spark可以依赖Yarn去调度资源
Messos
资源调度框架
Spark核心RDD
RDD :弹性分布式数据集
RDD五大特性
1.RDD是由一系列partition组成
2.算子(函数) 是作用在RDD的partition上的
3.RDD之间有依赖关系
4.分区器是作用在K,V格式的RDD上
5.partition对外提供最佳的计算位置
注意
1.sc.textFile(..)底层使用的是MR读取HDFS中文件的方法,首先先split,每个split默认大小是128M,对应一个Block,每个split也对应RDD中一个个的partition
2.什么是K,V格式的RDD
RDD中的元素是一个个的二元组,这个RDD就是K,V格式的RDD
3.哪里体现了RDD弹性
RDD之间有依赖关系
RDD的分区数可以多可以少
4.哪里体现了RDD的分布式
partition是分布在多个节点上的
RDD是不存数据的,RDD的partition也是不存数据的,partition中存的是计算逻辑
算子
Transformation类算子,转换算子,懒执行,需要Action算子触发
flatMap
map
reduceByKey
sortBy/sortByKey
filter
sample
有无放回随机抽样
可以指定种子参数,每次抽到的数据一样
join
leftOuterJoin
rightOuterJoin
fullOuterJoin
intersection
subtract
union
distinct
map + reduceByKey + map
cogroup
mapPartitions
将RDD中每个分区为单位进行遍历
mapPartitionsWithIndex
repartition
重新分区,可以增大分区也可以减少分区
shuffle类算子,产生shuffle
repartiton(num) = coalesce(num.shuffle = true)
coalesce
重新分区,可以增大分区也可以减少分区,默认是不产生shuffle
如果由少的分区增加到多的分区,还指定不产生shuffle 不起作用
groupByKey
zip
zipWithIndex
Action算子 ,行动算子,触发Transforamtions类算子执行,Spark application中有一个action算子就有一个job
foreach
count
会将结果回收到Driver端
first
first = take(1)
会将结果回收到Driver端
take
take(Int)
会将结果回收到Driver端
collect
会将结果回收到Driver端
reduce
foreachPartition
countByKey
必须作用K,V格式的RDD
countByValue
持久化算子
cache()
默认将数据存储在内存中
cache() 是懒执行的,需要使用action触发执行
cache() = persist() = persist(StoageLevel.MEMORY_ONLY)
persist()
可以手动指定数据的存储级别,persist 默认就是将数据存入内存中(MEMORY_ONLY)
存储级别:
MEMORY_ONLY
MEMORY_ONLY_SER
MEMORY_AND_DISK
MEMORY_AND_DISK_SER
避免使用 DISK_ONLY和“_2”级别
checkpoint()
checkpoint 直接将数据持久化到磁盘,且位置必须指定,当application运行完成之后不会被清空
对RDD使用checkpoint时,可以切断RDD的依赖关系
当Spark application 中 RDD的lineage长,就算复杂时,可以使用checkpint来将数据持久化到磁盘上,这样后面的RDD恢复时就可以基于当前的checkpoint位置恢复
checkpoint 可以管理Spark状态,保存原数据。
checkpoint也是懒执行,需要action触发执行
总结:
cache 和persist注意问题
1.cache 和persist都是懒执行算子,都需要action触发,持久化的最小单位是partition
2.可以对某个RDD使用cache或者persist之后赋值给一个变量,下次使用这个变量就是使用持久化的数据
3.当使用第2中这种方式,cache和persist之后不能紧跟action算子
4.cache和persist的数据位置是由Spark框架管理,当application执行完成之后,这些数据都会被清空
checkpoint执行流程
1.当Spark job执行完成之后,Spark会从后往前回溯,找到checkpointRDD做标记
2.回溯完成之后,Spark框架会重新启动一个job,这个job计算被标记的RDD,将结果持久化到指定的路径中
3.切断之前的RDD依赖关系
优化:对RDD进行checkpoint之前,可以对RDD进行cache,这样回溯完成重新启动job计算checkpointRDD数据时可以从内存中直接获取
Spark集群搭建
Standalone集群搭建
需要3台节点,一台Master,2台worker节点
步骤
1.配置jdk8,上传解压spark包
2.在Master节点,spark/conf/slaves配置worker节点
3.在spark/conf/spark-env.sh 配置
SPARK_MASTER_HOST=node1
SPARK_MASTER_PORT =7077
SPARK_WORKER_CORES=2
SPARK_WORKER_MEMORY=3g
4.将配置好的spark包发送到其他worker节点
5.在Master节点上启动集群,spark/sbin/start-all.sh
6.访问webui :node1:8080
客户端搭建
原封不动将Spark的安装包发送到一台新的节点就可以
Spark 基于Yarn提交任务配置
在客户端节点 spark/conf/spark-env.sh中配置 HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop
SparkPI 集群任务提交
./spark-submit --master spark://node1:7077 --class org.apache.spark.examples.SparkPi ../examples/jars/spark-examples_2.11-2.3.1.jar 1000
Spark任务执行
Standalone
client
命令
./spark-submit --master spark://node1:7077 --class ..jar ....xx
./spark-submit --master spark://node1:7077 --deploy-mode client --class ..jar ....xx
流程
1.在客户端提交application,Driver在客户端启动
2.客户端向Master申请资源
3.Master会找到满足资源的节点,启动Executor
4.Executor启动之后会反向注册给Driver
5.Driver 发送task,监控task,回收结果
注意:
standalone-client模式适用于程序测试,不适用于生产环境个,当在客户端提交多个任务时,会有网卡流量激增问题,在客户端中可以看到task的执行和结果
cluster
命令
./spark-submit --master spark://node1:7077 --deploy-mode cluster --class ..jar ..xxx
流程
1.在客户端提交application,首先客户端向Master申请启动Driver
2.Master收到请求随机在一台Worker节点上启动Driver
3.Driver启动之后,会向Master申请资源,Master收到请求,找到一批满足资源的Worker节点启动Executor
4.Executor启动之后,会反向注册给Driver
5.Driver发送task,监控task,回收结果
注意:
standalone-cluster模式适用于生产环境,在客户端提交多个application时,将所有Driver分散到集群中运行的,没有单节点网卡流量激增的问题,在WEBUI中查看task执行和结果
Yarn
client
命令
./spark-submit --master yarn --class ..jar .xxx
./spark-submit --master yarn-client --class ...jar xxx
./spark-submit --master yarn --deploy-mode client --class ..jar xxx
流程
1.在客户端提交application,Driver在客户端启动
2.客户端向RS申请启动ApplicationMaster
3.RS收到请求之后,会随机在一台NM节点上启动AM,AM向RS申请资源用于启动Executor
4.RS收到请求,返回给AM一批资源NM节点
5.AM连接NM节点启动Executor,Executor启动之后反向注册给Driver
6.Driver发送task,监控task执行,回收结果
注意:
yarn-client模式适用于程序测试,不适用于生产环境,当在客户端提交多个任务时,会有网卡流量激增问题,在客户端中可以看到task的执行和结果
cluster
命令
./spark-submit --master yarn-cluster --class ...jar ..xxx
./spark-submit --master yarn --deploy-mode cluster --class ...jar ..xx
流程
1.在客户端提交application,客户端向RS申请启动ApplicationMaster
2.RS收到请求之后,会随机在一台NM节点上启动AM,当前这个AM就相当于Driver
3.AM启动之后,向RS申请资源用于启动Executor
4.RS收到请求,返回给AM一批资源NM节点
5.AM连接NM节点启动Executor,Executor启动之后反向注册给AM
6.AM发送task,监控task执行,回收结果
注意:
yarn-cluster模式适用于生产环境,在客户端提交多个application时,将所有Driver分散到集群中运行的,没有单节点网卡流量激增的问题,在WEBUI中查看task执行和结果
Spark 术语
任务层面:Application -> job -> stage -> tasks
资源层面:Master -> Worker -> Executor -> ThreadPool
Spark RDD宽窄依赖
RDD窄依赖
父RDD与子RDD的partition之间的关系是一对一
父RDD与子RDD的partition之间的关系是多对一
RDD宽依赖(shuffle)
父RDD与子RDD的partition之间的关系是一对多
Stage
Stage是由一组并行的task组成
Stage是按照job中RDD的宽窄依赖关系划分,有宽依赖就划分stage
Stage的计算模式 pipeline管道计算模式
stage中的数据什么时候落地
1.shuffle write
2.对RDD进行持久化
Stage的并行度是由Stage中finalRDD的partition个数决定的
提高stage的并行度?
增大RDD的分区数
例如
reduceByKey(xx,num)
join(xx,num)
groupByKey(num)
repartitioin/coalesce
Spark的资源调度和任务调度
资源调度
1.集群启动,Worker向Master汇报资源,Master掌握了集群资源
2.当spark-submit 提交应用程序,new SparkContext() 会创建两个对象 DAGScheduler,TaskScheduler
3.TaskScheduler 向Master申请资源
4.Master找到满足资源的节点,启动Executor
5.Executor启动之后,反向注册给TaskScheduler,Driver掌握了一批计算资源
任务调度
6.当RDD有action算子触发生成job时,job中RDD依赖关系形成DAG有向无环图,提交给DAGScheduler来划分
7.DAGScheduler按照RDD宽窄依赖关系切割job划分stage,将stage以taskSet形式提交给TaskScheduler
8.TaskScheduler遍历TaskSet,将task发送到Executor中执行
9.Driver监控task执行,回收结果
问题:
1.TaskScheduler会重试失败的task,重试3次,3次之后由DAGScheduler来重试stage,重试4次之后如果task依然失败,当前stage所在的job就失败了,如果application中有一个job失败,那么application就失败了
2.TashScheduler不仅可以重试执行失败的task,还可以重试执行缓慢的task,这是Spark中的推测执行机制,默认关闭的。对于ETL数据一定要关闭
- Spark粗粒度资源申请和细粒度资源申请
粗粒度资源申请--Spark
Application执行之前,先将所有资源申请完毕,task执行和job执行就不需要自己重新申请资源,task和job执行快,当最后一个task执行完成之后才会释放这批资源
优点:application执行快
缺点:集群资源不能充分利用
细粒度资源申请 -- MR
Application执行之前,每个job都会自己重新申请资源,job执行完之后就会释放这批资源
优点:集群资源可以充分利用
缺点:application 执行相对慢
PV&UV
PV:page view
UV:unquie Vistor
Spark提交任务参数
--conf
--master
--name
--jars
--driver-class-path
--files
--driver-cores
--driver-memory
--executor-cores
--executor-memory
--total-executor-cores
--executor-num
--supvise 当Driver失败,会自动重启Driver
Spark源码
Spark 资源调度源码
Spark资源调度结论
1.Executor在集群中分散启动,利于数据处理的本地化
2.如果提交任务默认什么参数都不指定,Standalone集群会为当前的application 在每台Worker上启动一个Executor,这个Executor会使用当前节点所有的core和1G内存
3.如果向要在一台Worker上启动多个Executor ,需要指定 --executor-cores
4.可以通过指定 --total-executor-core 来指定当前应用程序使用多少core
5.启动Executor不仅和core有关还和内存有关。
Spark资源调度结论验证
Spark 任务调度源码
从Action算子开始
二次排序
Spark中大于两列的排序都叫二次排序
封装对象,对象中实现Compare 接口,实现对象的排序
分组取topN
原生的集合排序
定长数组
广播变量
当Executor端使用到Driver端的变量时,如果使用广播变量,在每个Executor端都有一份Driver端的变量副本
当Executor端使用到Driver端的变量时,如果不使用广播变量,在每个Executor端有多少task就有多少变量副本
注意:
1.RDD不能广播出去,可以将RDD的结果广播出去
2.广播变量在Driver端定义,在Executor端不能改变广播变量的值
累加器
相当于集群中的统筹的变量
LongAccumulator
DoubleAccumulator
自定义累加器(项目)
Spark WEBUI
如何查看task数据倾斜
job -> stage -> tasks
端口
8080
8081
4040
18080
50070
8020
9000
9083
2181
9092
Spark 历史日志服务器
配置步骤:
1.在客户端中../conf/spark-defaults.conf文件中配置
spark.eventLog.dir hdfs://node1:9000/spark/test
spark.eventLog.enabled true
spark.history.fs.logDirectory hdfs://node1:9000/spark/test
spark.eventLog.compress true
2.启动 HDFS ,日志保存在HDFS中
3.启动历史日志服务器:../sbin/start-history-server.sh
4.查看webui :node4:18080
Spark Master HA
Master HA是针对Standalone集群,有两种方式
1.基于文件系统
缺点:手动恢复HA
2.基于zookeeper
存储
自动选举
使用zookeeper配置Master HA
1.在Master节点../conf/spark-env.sh 中配置
export SPARK_DAEMON_JAVA_OPTS="
-Dspark.deploy.recoveryMode=ZOOKEEPER
-Dspark.deploy.zookeeper.url=node3:2181,node4:2181,node5:2181
-Dspark.deploy.zookeeper.dir=/sparkmaster0821"
2.将配置好的文件发送到其他节点 scp ./spark-evn.sh node2:`pwd`
3.找一台StandBy-Master 节点 ,配置Master : conf/spark-evn.sh 配置 SPARK_MASTER_HOST=node2
4.在Alive-Master节点启动集群: sbin/start-all.sh
5.在StandBy-Master节点上启动Master : start-master.sh
6.验证
集群已经提交的任务和将要提交的任务都不会受影响
Spark Shuffle
Spark1.6(包含1.6) 使用了HashShuffle 和SortShuffle,Spark2.0+ 只保留了SortShuffleManager
HashShuffleManager
普通机制
产生磁盘小文件的个数:M*R
过程
1.map处理完数据之后写往一个buffer缓冲区(默认与reduce task个数一致)
2.buffer默认32k,满了之后会溢写磁盘,每个buffer对应一个磁盘小文件
3.reduce 拉取数据
问题
产生磁盘小文件多
shuffle write对象多
shuffle read 对象多
shuffle 磁盘文件多,导致节点之间连接多,连接多受到网络不稳定导致的task失败多
优化机制
产生磁盘小文件个数:C*R
过程
1.一个core中map task处理完的数据写往一个buffer缓冲区
2.buffer默认32k,满了之后会溢写磁盘,每个buffer对应一个磁盘小文件
3.reduce 拉取数据
相对于普通机制产生的磁盘小文件个数少,如果reduce task 个数多,core多,产生的磁盘小文件也比较多
SortShuffleManager
普通机制
产生磁盘小文件的个数:2*M
过程
1.map task处理完数据之后,写往一个5M的内存数据结构
2.SortShuffle中会有估算内存机制,如果估算内存不够 ,会申请内存: 申请 = 估算*2 - 当前
3.申请不到内存,会溢写磁盘,溢写磁盘过程中有排序,最终形成2个磁盘文件,一个索引文件,一个数据文件
4.reduce 拉取数据
bypass机制
产生的磁盘小文件个数:2*M
相对于普通机制,只是在溢写磁盘的时候没有排序
触发bypass机制的条件
1.map 端不能有预聚合
2.reduce task 的个数必须小于spark.shuffle.sort.bypassMergeThreshold=200 参数
Shuffle文件寻址
MapOutputTracker
MapOutputTrackerMaster(Driver)
MapOutputTrackerWorker(Executor)
BlockManager
BlockManagerMaster(Driver)
DiskStore
MemoryStroe
BlockTransferService
BlockManagerSlave(Executor)
DiskStore
MemoryStroe
BlockTransferService
获取磁盘文件过程‘
1.map task处理完数据之后,将数据位置封装到MapStatus中,汇报给Driver中MapOutputTrackerMaster,Driver掌握磁盘数据位置
2.reduce task处理数据之前向Driver获取磁盘数据位置
3.reduce 端通过BlockManager连接数据所在节点,通过BlockTransferService拉取数据
4.BlockTransferService 一次启动5个task拉去数据,一次拉取48M,拉去数据存在Executor的shuffle聚合内存中
reduce oom?
增大Executor内存
增大shuffle内存比例
减少拉取的数据量
Spark内存管理
Spark1.6之前使用的是静态内存管理,Spark1.6之后使用的是统一内存管理,可以通过参数spark.memory.useLegacyMode 切换
静态内存
0.2
0.6
0.2
统一内存
300M
(总-300M) *0.25
(总-300M)*0.75
Shuffle调优
spark.shuffle.file.buffer
spark.reducer.maxSizeInFlight
spark.shuffle.io.maxRetries
spark.shuffle.io.retryWait
spark.shuffle.memoryFraction |spark.memory.fraction
spark.shuffle.sort.bypassMergeThreshold-
spark.shuffle.consolidateFiles
spark.shuffle.manager
SparkSQL
Spark SQL支持Spark中可以对分布式数据使用sql查询
Hive -> Shark -> SparkSQL
Shark & SparkSQL
1.Shark 底层解析器使用的是Hive,执行引擎是Spark,SparkSQL解析优化和执行引擎都是Spark
2.SparkSQL还可以读取原生的RDD,对RDD使用SQL查询
3.还可以将DataFrame转换成RDD使用
4.SparkSQL相对于Shark来说,完全脱离了Hive的限制
Spark on Hive-SparkSQL
Hive 存储
Spark:解析优化,执行引擎
Hive on Spark-Shark
Hive :解析优化 ,存储
Spark:执行引擎
DataFrame
DataFrame = Dataset<Row>
DataFrame 像一张二维表格,有数据还有列的schema信息
Dataset和RDD相比,Dataset的效率高
谓词下推
创建DataFrame
1.读取Json 格式的文件
sparkSession.read.json...
sparkSession.read.format("json").load...
列会按照Ascii 码排序
dataFrame.show(num)
dataFrame.printSchema
创建临时表的方式
dataFrame.createOrReplaceTempView ...
dataFrame.createOrReplaceGlobalTempView ...(可以跨session)
将DataFrame转换成RDD: val rdd : RDD[Row] = dataFrame.rdd
row.getAs[String](index)
row.get(“name”)
2.读取Json格式的RDD或者DataSet
导入隐式转换,才能使用 dataSet.toDF
import sparkSession.implicits._
3.读取普通的RDD加载DataFrame
通过反射的方式
1.首先需要将数据文件加载成RDD|Dataset
2.将RDD或者Dataset 装换成 对象类型的RDD|DataSet
3.RDD.toDF() | Dataset.toDF()
注意:自动将对象中的属性当做schema中的列,对象中属性的类型当做schema中列的类型
动态创建Schema
1.首先需要将数据文件加载成val rdd = RDD<Row>
2.构建StructType ,动态创建Schema,动态创建列的顺序要和构建的Row中的顺序一致
3.使用vla df :DataFrame = spark.createDataFrame(rdd,StructType)
4.读取Parquet文件 加载DataFrame
读取parquet文件 方式
sparkSession.read.parquet(...)
sparkSession.read.format("parquet").load(...)
可以将数据保存到parquet格式的文件中
5.读取MySQL 中的数据加载DataFrame
加载数据三种方式
1.spark.read.jdbc("jdbc:mysql://ip:3306/spark" ,tableName,properties)
2.spark.read.format("jdbc").options(map).load
3.spark.read.format("jdbc").option().option..load()
将DataFrame结果保存到MySql中
spark.write.mode(..).jdbc(url,tableName,properties)
参数:spark.sql.shuffle.partitions 指定sql执行时,解析成Sparkjob 的分区数,默认200
6.读取Hive中的数据加载DataFrame
加载Hive中的数据
sparkSession.enableHiveSupport()
sparkSession.sql("Hive 语句")
sparkSession.table("tableName")
将dataFrame存储在Hive中
dataframe.write.mode(SaveMode.Overwrite).saveAsTable("tabelName")
Spark on Hive配置
在Spark 客户端 ../conf中配置hive-site.xml
<configuration>
<property>
<name>hive.metastore.uris</name>
<value>thrift://node1:9083</value>
</property>
</configuration>
测试:使用MR和SparkSQL 测试对同一批数据查询速度
./spark-shell --master spark://node1:7077,node2:7077
UDF
User defined Function -- 用户自定义函数
使用UDF是一对一的关系,读取一条数据处理得到一条数据
注册UDF:
spark.udf.register("udf Name ",fun)
使用
sparkSession.sql("select xx,udf Name from tableName ....")
UDAF
User defined aggregate Function -- 用户自定义聚合函数
开窗函数
row_number() over(partition by xxx order by xxx)
相当于Spark core中分组取topn
在Hive中使用开窗函数,在Mysql8之后也是支持的
SparkStreaming
SparkStreaming 流式处理模块,7*24小时不间断运行
SparkStreaming & Storm
1.SparkStreaming微批处理,Storm纯实时处理数据,SparkStreaming的吞吐量大
2.SparkStreaming擅长处理复杂的业务,可以在Streaming中使用core ,sql 处理数据,Strom擅长处理简单的汇总型业务
3.Strom的事务相对完善,SparkStreaming目前也是比较完善
4.Storm支持动态资源调度,Spark1.2之后也是支持动态资源调度,不建议开启
SparkStreaming读取Socket数据
nc - lk 9999
本地读取Scokect数据,底层采用了Receiver接受器模式,需要一个task用来接收 数据, local[2]
创建StreamingContext两种方式 val ssc = new StreamingContext(SparkConf|SparkContext ,Durations.Seconds(5))
batchInterval 相当于我们可以接受数据的延迟度。可以通过webui多次调节得到
SparkStreaming.start()之后不能添加新的业务逻辑
SparkStreaming .stop(true|false) 默认是true,会在关闭StreamingContext同时将SparkContext回收。false 在关闭StreamingContext时不会将SparkContext回收
SparkStreaming.stop()之后不能重新调用start() 方法重新启动
SparkStreaming每batchInterval 接收来的数据存入一个batch,这个batch被封装到一个RDD中,RDD被封装到DStream中
DStream有自己的Transformation类算子,懒执行,需要DStream的outputOperator类算子触发执行
如果集群处理一批次数据的时间大于BatchInterval ,任务会有堆积
如果集群处理一批次数据的时间小于BatchInterval ,集群不能充分利用
SparkStreaming 监控目录中的数据
不需要local[2]
目录中的数据必须是原子性产生,已将存在的文集不能被监控到
SparkStreaming 算子
Transformation
filter..
map..
flatMap...
transform
可以获取DStream中的RDD,最终需要返回一个RDD,返回的RDD被封装到一个DStream中
注意:
对获取的RDD可以使用RDD的transformation类算子,但是一定要返回RDD
transform算子内,获取的RDD的算子外的代码是在Driver端执行,可以通过这个特点做到动态改变广播变量的值
updateStateByKey
自从SparkStreaming启动以来,根据key更新状态
注意
需要设置Checkpoint,保存状态,状态默认保存在内存中和checkpoint中
多久将内存中的状态保存到checkpoint中?
batchInterval >10 batchInterval 保存一次
batchInterval <10 10s保存一次
reduceByKeyAndWindow
窗口函数,reduceByKeyAndWindow(fun(),窗口长度,滑动间隔)
窗口长度:wl,窗口长度必须是batchInterval的整数倍
滑动间隔:si,滑动间隔必须是batchInterval的整数倍
普通机制
每次计算,将窗口内的批次重新计算
优化机制
需要设置checkpoint保存上一次所有key的状态
window
窗口函数
val ds:DStream[xx] = window(窗口长度,滑动间隔)
outputOperator
foreachRDD
可以获取DStream中的RDD,一定要对获取的RDD使用Action算子触发
foreachRDD算子内,获取的RDD的算子外的代码是在Driver端执行,可以通过这个特点做到动态改变广播变量的值
print
saveAsTextFile
Driver HA
val ssc: StreamingContext = StreamingContext.getOrCreate(ckDir,CreateStreamingContext)
当SparkStreaming应用程度停止之后,可以从checkpoint中恢复数据处理的位置(offset)
checkpoint中存储的数据
conf配置信息
DStream处理逻辑
消费数据的位置(offset)
问题:当代码逻辑变化时,既想从checkpoint中恢复offset信息,又想执行新的逻辑,是不能实现的。因为当恢复offset时,将旧的代码逻辑也恢复过来了
Kafka
术语
kafka是分布式的消息系统,将数据直接存储磁盘,默认保存7天
producer
消息的生产者,自己决定将数据存在哪个partition中,两种机制1.轮询2key的hash
broker
组成kafka集群的节点,之间没有主从关系,依赖zookeeper来协调,broker负责消息的读写,和存储
topic
一类消息,由 partition组成,可以创建topic时指定。topic中的partition目的是为了并行消费topic数据
partition
组成topic的单元,每个partition对应磁盘目录
每个partition有副本,可以在创建topic时指定
每个partition都由一个broker管理,这个broker叫做当前parititon的leader
每个partition只能被同一个消费者组内的一个消费者同时连接消费
consumer
每个消费者都有自己的消费者组
不同的组之间消费相同的topic互不影响
同一个组内的不同消费者消费同一个topic时,这个topic中相同的数据只能被消费一次
kafka 0.8.2 之前 consumer自己将消费者offset维护在zookeeper中
kafka0.9 + consumer 的offset是由kafka维护
zookeeper
存储原数据,broker,topic,partiton
kafka 0.8.2 之前存储消费者offset
命令
创建topic
./kafka-topics.sh --zookeeper node3:2181,node4:2181,node5:2181 --create --topic topic2017 --partitions 3 --replication-factor 3
查看topic
./kafka-topics.sh --list --zookeeper node3:2181,node4:2181,node5:2181
console当做生产者
./kafka-console-producer.sh --topic topic2017
--broker-list node1:9092,node2:9092,node3:9092
console当做消费者
./kafka-console-consumer.sh --zookeeper node3:2181,node4:2181,node5:2181 --topic topic2017
查看某个topic的详细描述
./kafka-topics.sh --describe --zookeeper node3:2181,node4:2181,node5:2181 --topic topic2017
删除topic
在kafka broker节点 ../config/server.properties中配置 topic.delete.enable = true
kafka Leader均衡机制
SparkStreaming + kafka 整合
Spark1.6 + kafka 0.8.2
Spark2.3 + kafka0.11
0 条评论
下一页