Spark思维导图
2022-04-01 10:22:02 393 举报
AI智能生成
Spark学习思维导图
作者其他创作
大纲/内容
Spark入门
Spark基础知识
Spark 的发展
Mapreduce的缺点
①mr基于数据集计算,面向数据。对较小数据集处理较慢
②基本运算规则从存储介质中获取数据,然后进行计算,最后将计算结构存储到介质中,所以主要应用于一次性计算,不适合数据挖掘和机器学习这样的迭代计算和图形挖掘计算。
③mr基于文件存储介质的操作,性能非常的慢。
④mr于Hadoop紧密耦合在一起,无法动态替换
Spark思想
①spark基于hadoop1.X版本,采用了自己的方法改替了mr的计算缺点
②spark计算基于内存,并且基于Scala语法开发,所以天生适合迭代式计算
hadoop框架发展
1.X版本:hdfs+mapreduce框架
2.X版本:hdfs+spark+yarn框架
Spark介绍
Spark是一种通用内存计算框架,使用Scala语言进行实现,它是一种面向对象、函数式编程语言,能够像操作本地集合对象一样轻松地操作分布式数据集
Spark特点
①快:与mr相比,spark基于内存的运算要快100倍以上,基于硬盘运算快10倍以上
②易用:spark支持Java,python,Scala的API
③通用:spark提供了统一的解决方案。可以用于批处理,交互式查询,实时流处理,机器学习,图计算
④兼容性:spark可以非常方便的与其他开源产品进行融合
Spark内置模块
①spark core:实现了spark的基础功能(任务调度,内存管理,错误恢复,与存储系统交互等),以及对弹性api数据集的API定义。
②spark SQL:是spark用来操作结构化数据的程序包,支持多种数据源hive,parquet,josn等。
③spark streaming:对实时数据进行流式计算的组件,提供了用来操作数据流的API,并于spark core中的RDD API高度对应
④spark MLlib:提供常见的机器学习(ML)功能的程序库
⑤Graphx:是spark面向图像计算提供的框架和算法库
⑥集群管理器:spark设计为可以高效的在一个计算节点到数千个计算节点之间伸缩计算。
Spark使用场景
①复杂的批量处理(Batch Data Processing),偏重点在于处理海量数据的能力,至于处理速度可忍受,通常的时间可能是在数十分钟到数小时
②基于历史数据的交互式查询(Interactive Query),通常的时间在数十秒到数十分钟之间
③基于实时数据流的数据处理(Streaming Data Processing),通常在数百毫秒到数秒之间
Spark部署模式
①local(本地模式):常用于本地开发测试,本地还分为local单线程和local-cluster多线程;master和worker都为本机。运行时直接加断点调试即可。
②standalone(集群模式):构建一个有Master+Slave构成的Spark集群,spark运行在集群中。简单来说,只用spark,不用yarn,所以不需要启动Hadoop
③on yarn(集群模式): 运行在 yarn 资源管理器框架之上,由 yarn 负责资源管理,Spark 负责任务调度和计算
④on mesos(集群模式): 运行在 mesos 资源管理器框架之上,由 mesos 负责资源管理,Spark 负责任务调度和计算
⑤on cloud(集群模式):比如 AWS 的 EC2,使用这个模式能很方便的访问 Amazon的 S3;Spark 支持多种分布式存储系统:HDFS 和 S3
Spark运行流程
Spark运行流程
Spark运行流程图
Spark通用运行流程
①任务提交后,都会先启动Driver进程
②Driver进程向集群管理器注册应用程序application
③集群管理器根据此任务的配置文件分配Execute并启动
④当Driver所需的资源全部满足后,Driver开始执行main函数
⑤Spark查询为懒执行,当指向到action(行动)算子时,开始反向推算
⑥根据宽依赖进行stage(阶段)的划分,随后每一个stage对应一个taskset,taskset中对应很多个task
⑦根据本地化原则,task会被分发到指定的Execute中执行
⑧在执行任务的过程中,Execute也不断与Driver进行通信,报告任务运行情况
⑨Task在Executor上运行完释放所有资源
Spark基本使用
启动Spark:bin/start-all.sh
启动Spark-shell:bin/spark-shell
使用Spark求圆周率(官方案例)
bin/spark-submit --class org.apache.spark.examples.SparkPi \
--master spark://master:7077 \ //指定master是哪台机器
--executor-memory 1G \
--total-executor-cores 2 \
./examples/jars/spark-examples_2.11-2.3.2.jar \
100
--master spark://master:7077 \ //指定master是哪台机器
--executor-memory 1G \
--total-executor-cores 2 \
./examples/jars/spark-examples_2.11-2.3.2.jar \
100
Spark RDD
RDD基础
RDD介绍
①RDD(resilient Distributed Dataset)弹性分布式数据集,是spark中基本的逻辑(数据)抽象。
②代码中是一个抽象类,它代表一个不可变,可分区,里面的元素可以并行(多个线程一起执行)计算的集合
RDD属性
①一组分区(partition)即数据集的基本组成单位
②一个计算每个分区的函数
③RDD之间的依赖关系
④一个Partitioner,即RDD的分片函数
⑤一个列表,存储存取每个Partition的优先位置(preferred location)
RDD特点
①分区:RDD逻辑上是分区的,每个分区数据是抽象存在的
从内存中创建RDD分区规则
从内存中创建RDD分区有两个函数 parallelize 和 makeRDD 这两个函数底层原理一样。用这两个函数创建RDD时,如果没有传入分区参数,他会默认将cpu核数的个数和2进行比较,较大的那个数为分区数;如果创建RDD时,传入了分区数,则会将转入的分区参数和2进行比较,较大的做为分区数
从外部存储中创建RDD分区规则
读取文件时,传递分区参数为最小分区数(传递的分区数和2进行比较),但不一定时这个分区数,取决于Hadoop读取文件时的分片规则
①只读: RDD表示只读的分区的数据集,要想改变RDD的数据,只能在现有的RDD基础通过算子转换创建新的RDD
②转换:由于RDD不可变,所以对RDD进行改动,只能通过转换操作
③依赖:从一个RDD得到一个新的RDD,新的RDD包含了从其他RDD衍生所必须的信息,RDD之间存在依赖关系
④持久化:如果血缘关系较长,可以通过持久化(检查点)RDD来切断血缘关系
RDD的创建
1.从集合(内存)中创建
从集合中创建RDD,spark主要提供了两种函数,parallelize 和 makeRDD
1.使用parallelize()
val rdd = sc.parallelize(Array(1,2,3,4))
2.使用makeRDD()
val rdd1 = sc.make(Array(1,2,3,4))
3.这两种方式等价,makeRDD底层就是parallelize,没有区别
1.使用parallelize()
val rdd = sc.parallelize(Array(1,2,3,4))
2.使用makeRDD()
val rdd1 = sc.make(Array(1,2,3,4))
3.这两种方式等价,makeRDD底层就是parallelize,没有区别
2.从外部存储中创建
包括:本地文件系统,Hadoop支持的数据集(HDFS,hbase,cassandra)
val rdd2 = sc.textFile("hdfs || file 路径")
说明:
①默认情况下可以读取项目路径,也可以读其他路径 hdfs
②默认从文件中读取的数据都是字符串类型
val rdd2 = sc.textFile("hdfs || file 路径")
说明:
①默认情况下可以读取项目路径,也可以读其他路径 hdfs
②默认从文件中读取的数据都是字符串类型
3.从其他创建
RDD的保存
rdd1.saveAsTextFile("output") //output文件夹自动创建
常用的RDD算子
转换算子
1.map(func)算子
作用:返回一个新的RDD,该RDD由每一个输入元素经过func函数转换后组成
过程:每次处理一个元素
需求:创建一个RDD,将所有的元素*2后返回新的RDD
var rdd1 = sc.parallelize(1 to 10)
var rdd2 = rdd1.map(_*2).collect()
rdd2.foreach(println)
作用:返回一个新的RDD,该RDD由每一个输入元素经过func函数转换后组成
过程:每次处理一个元素
需求:创建一个RDD,将所有的元素*2后返回新的RDD
var rdd1 = sc.parallelize(1 to 10)
var rdd2 = rdd1.map(_*2).collect()
rdd2.foreach(println)
2.mapPartitions(func)算子
作用:类似于map,但独立地在RDD的每一个分片(分区)上运行
过程:每次处理一个分区,运行结果和map一样,只是过程不同
优点:效率优于map算子,减少了发送到执行器的执行的交互次数
缺点:当分区大小超过执行器的内存时,会出现内存溢出(oom)
val mapPar = rdd1.mapPartitions(datas=>{datas.map(data=>data*2)})
作用:类似于map,但独立地在RDD的每一个分片(分区)上运行
过程:每次处理一个分区,运行结果和map一样,只是过程不同
优点:效率优于map算子,减少了发送到执行器的执行的交互次数
缺点:当分区大小超过执行器的内存时,会出现内存溢出(oom)
val mapPar = rdd1.mapPartitions(datas=>{datas.map(data=>data*2)})
3.flatMap(func)算子
作用:类似于map,但是每一个输入元素可以被映射为0或者多个元素;所以要求传入的func函数返回的是一个序列,而不是单一元素。
简单来说:就是传入的元素有可能是一个集合,将集合的每个元素取出,并返回(扁平化操作)
使用:
val listRDD:RDD[list[Int]] = sc.makeRDD(Array(List(1,2),List(3,4)))
val flatmap:RDD[Int] = listRDD.flatMap(datas => datas)
作用:类似于map,但是每一个输入元素可以被映射为0或者多个元素;所以要求传入的func函数返回的是一个序列,而不是单一元素。
简单来说:就是传入的元素有可能是一个集合,将集合的每个元素取出,并返回(扁平化操作)
使用:
val listRDD:RDD[list[Int]] = sc.makeRDD(Array(List(1,2),List(3,4)))
val flatmap:RDD[Int] = listRDD.flatMap(datas => datas)
4.glom()算子
说明:这个算子不用传参数,它会将同一个分区中的元素放到一个数组里,由多少个分区就有多少个数组
作用:可以利用这个算子,对分区做相应的运算
使用:
val rdd:RDD[Int] = sc.makeRDD(List(1,2,32,3,4,5,4),3)
//将同一个分区中的元素放到同一个数组中,返回值的类型是数组
val glomRDD:RDD[Array[Int]] = rdd.glom()
//将结果打印,这是就可以对数组进行操作
glomRDD.collect().foreach(arr => {
println(arr.mkString(","))
})
说明:这个算子不用传参数,它会将同一个分区中的元素放到一个数组里,由多少个分区就有多少个数组
作用:可以利用这个算子,对分区做相应的运算
使用:
val rdd:RDD[Int] = sc.makeRDD(List(1,2,32,3,4,5,4),3)
//将同一个分区中的元素放到同一个数组中,返回值的类型是数组
val glomRDD:RDD[Array[Int]] = rdd.glom()
//将结果打印,这是就可以对数组进行操作
glomRDD.collect().foreach(arr => {
println(arr.mkString(","))
})
5.groupBy(func)算子
作用:分组,按照传入函数的返回值进行分组。将相同的key对应的值放入一个迭代器。
需求:创建一个RDD,按照元素模以2的值进行分组
说明:分组后的数据形成了对偶元组(k-v),k表示分组的key
使用:
val rdd1:RDD[Int] = sc.make(List(1,2,3,4))
val groupby:RDD[(Int,Iterate[Int])] = rdd1.groupBy(i=>i%2)
作用:分组,按照传入函数的返回值进行分组。将相同的key对应的值放入一个迭代器。
需求:创建一个RDD,按照元素模以2的值进行分组
说明:分组后的数据形成了对偶元组(k-v),k表示分组的key
使用:
val rdd1:RDD[Int] = sc.make(List(1,2,3,4))
val groupby:RDD[(Int,Iterate[Int])] = rdd1.groupBy(i=>i%2)
6.filter(func)算子
作用:过滤,根据传入的逻辑,然会判断结果为true的元素,过滤掉false的元素
使用:
val rdd1:RDD[Int] = sc.makeRDD(List(1,2,3,4))
val res:RDD[Int] = rdd1.filter(x=>x%2==0)
结果:res = 2,4题
作用:过滤,根据传入的逻辑,然会判断结果为true的元素,过滤掉false的元素
使用:
val rdd1:RDD[Int] = sc.makeRDD(List(1,2,3,4))
val res:RDD[Int] = rdd1.filter(x=>x%2==0)
结果:res = 2,4题
7.distinct(【numPartition】)算子
作用:对数据集进行去重操作,可以指定去重后的分区数,也可以不指定
使用:
val rdd:RDD[Int] = sc.makeRDD(List(1,2,3,2,4,3))
val distinct:RDD[Int] = rdd.distinct()
val distinct2:RDD[Int] = rdd.distinct(2) //指定两个分区
作用:对数据集进行去重操作,可以指定去重后的分区数,也可以不指定
使用:
val rdd:RDD[Int] = sc.makeRDD(List(1,2,3,2,4,3))
val distinct:RDD[Int] = rdd.distinct()
val distinct2:RDD[Int] = rdd.distinct(2) //指定两个分区
8.coalesce(numPartitions)
作用:缩减分区数,用于大数据集过滤后,提高小数据集的执行效率
需求:创建一个4个分区的RDD,对其缩减成3个分区
使用:
val listRDD:RDD[Int] = sc.makeRDD(1 to 16,4) //指定4个分区
println("缩减分区前"+listRDD.partitions.size)
val coalesceRDD:RDD[Int] = listRDD.coalesce(3)
println("缩减分区后"+listRDD.partitions.size)
作用:缩减分区数,用于大数据集过滤后,提高小数据集的执行效率
需求:创建一个4个分区的RDD,对其缩减成3个分区
使用:
val listRDD:RDD[Int] = sc.makeRDD(1 to 16,4) //指定4个分区
println("缩减分区前"+listRDD.partitions.size)
val coalesceRDD:RDD[Int] = listRDD.coalesce(3)
println("缩减分区后"+listRDD.partitions.size)
9.repartitions(numPartitions)
作用:根据分区数,重新通过网络随机洗牌(shuffle)所有数据,可以解决数据倾斜
需求:创建一个4个分区的RDD,对其分区
val rdd = sc.paralleilze(1 to 16,4)
println("缩减分区前"+rdd.partitions.size)
val rerdd = rdd.repartitions(2)
println("缩减分区前"+rerdd .partitions.size)
repartitions算子底层就是掉的coalesce shuffle=true
作用:根据分区数,重新通过网络随机洗牌(shuffle)所有数据,可以解决数据倾斜
需求:创建一个4个分区的RDD,对其分区
val rdd = sc.paralleilze(1 to 16,4)
println("缩减分区前"+rdd.partitions.size)
val rerdd = rdd.repartitions(2)
println("缩减分区前"+rerdd .partitions.size)
repartitions算子底层就是掉的coalesce shuffle=true
10.sortByKey 按照key值进行排序 默认是升序,加入false参数后就是降序排序
11.mapValue 只对k-v类型的value进行操作
12.join 将类型为(k,v) (k.m) 形成一个新的RDD (k,(v,m))。即将key相同的value组合到一起形成一个元组。如果有一对k-v在另一个RDD中不存在,这个k-v就不会返回
行动算子
1.reduce 化简
2.collect() 在驱动程序中,以数组的形式返回数据集中的所有元素
3.count() 返回RDD中元素的个数
4.first() 返回RDD中第一个元素
5.take(n) 取出RDD中前n个元素
6.takeOrdered(n) 取出RDD排序后的前n个元素
7.aggregate(n)(func,func) 将每个分区里面的元素和初始值进行聚合,分区与分区聚合时也需要加上初始值
8.fold(n)(func) 分区内和分区间的操作相同时的aggregate简化操作
9.saveASTextFile(path) 保存成文本文件
10.saveAsSequenceFile(path) 将元素保存成Hadoop sequence的个数保存
11.savaAsObject(path) 将RDD中的元素保存成序列化对象
12.countByKey 返回每一个key的个数
13.foreach
小结
每调用一次行动算子,就会执行一次runjob
行动算子在driver中执行,转换算子在executor中执行
行动算子在driver中执行,转换算子在executor中执行
RDD函数传递
在实际开发中我们往往需要自己定义一些对于RDD的操作,那么此时需要注意的是,初始化工作是在Driver中进行的,而实际运行是在Executor端进行的,这就涉及到了跨进程通信,是需要序列化的
RDD依赖关系
一.RDD依赖关系基本介绍
RDD只支持粗粒度转换,即在大量记录上执行单个操作。将创建RDD的一系列Lineage(血统)记录下来,以便恢复丢失的分区。RDD的Lineage会记录RDD元数据信息和转换关系,当该RDD的部分分区数据丢失时,它可以根据这些信息来重新运算和恢复丢失的数据分区
RDD只支持粗粒度转换,即在大量记录上执行单个操作。将创建RDD的一系列Lineage(血统)记录下来,以便恢复丢失的分区。RDD的Lineage会记录RDD元数据信息和转换关系,当该RDD的部分分区数据丢失时,它可以根据这些信息来重新运算和恢复丢失的数据分区
二.查看RDD之间的依赖关系
list.toDebugString //查看RDD的所有依赖关系
list.dependencies //查看当前RDD上一层依赖关系
list.toDebugString //查看RDD的所有依赖关系
list.dependencies //查看当前RDD上一层依赖关系
三.窄依赖
窄依赖指的是每一个父RDD的Partition最多被子RDD的一个Partition使用,即一个分区的数据原封不动的给另一个分区就叫窄依赖。可以形象的比喻成独生子女
窄依赖指的是每一个父RDD的Partition最多被子RDD的一个Partition使用,即一个分区的数据原封不动的给另一个分区就叫窄依赖。可以形象的比喻成独生子女
四.宽依赖
宽依赖指的是多个子RDD的Partition会依赖同一个父RDD的Partition,这就会引起shuffle。简单的说,就有shuffle过程的都叫做宽依赖。可以形象的比喻成超生
宽依赖指的是多个子RDD的Partition会依赖同一个父RDD的Partition,这就会引起shuffle。简单的说,就有shuffle过程的都叫做宽依赖。可以形象的比喻成超生
五.DAG
DAG(Directed Acyclic Graph)叫做有向无环图,原始的RDD通过一系列转换就形成了DAG,根据DAG之间的依赖关系的不同将DAG划分成不同的stage(阶段)。对窄依赖,partition的转换处理在stage中完成计算。对于宽依赖,由于有shuffle的存在,只能在parent(起源)RDD处理完成后,才能进行接下来的计算,因此宽依赖是划分stage的依据
DAG(Directed Acyclic Graph)叫做有向无环图,原始的RDD通过一系列转换就形成了DAG,根据DAG之间的依赖关系的不同将DAG划分成不同的stage(阶段)。对窄依赖,partition的转换处理在stage中完成计算。对于宽依赖,由于有shuffle的存在,只能在parent(起源)RDD处理完成后,才能进行接下来的计算,因此宽依赖是划分stage的依据
六.任务划分(面试重点)
RDD任务切分中间分为:Application(应用) Job(作业) Stage(阶段) Task(任务)
①Application:初始化一个SparkContext 即生成Application
②Job:每个action(行动)算子都会生成一个Job
③Stage:根据RDD之间的依赖关系的不同将Job划分成不同的Stage,遇到一个宽依赖就会划分一个Stage
④Task:Stage是一个TaskSet,将Stage划分的结果发送到不同的Executor执行即为一个Task
注意: 1.Application -> Job -> Stage ->Task 每一层都是1对n的关系
2.每个分区都对应一个任务Task,即将每个分区交给Executor执行
3.判断Task数量的方法,窄依赖之间只划分一个Task,宽依赖根据执行结果最后有几个分区来判断Task数量,每个分区对应一个Task
2.每个分区都对应一个任务Task,即将每个分区交给Executor执行
3.判断Task数量的方法,窄依赖之间只划分一个Task,宽依赖根据执行结果最后有几个分区来判断Task数量,每个分区对应一个Task
RDD缓存与检查点
RDD缓存
1.RDD的缓存通过persist方法或cache方法将前面的计算结果缓存,默认情况下persist()会把数据序列化的形式缓存在JVM的推空间中
2.但是并不是这两个方法被调用时立即缓存,而是触发后面的action(行动算子)时,该RDD将会缓存在计算节点的内存中,并供后面重用
3.缓存有可能丢失,或者存储于内存的数据由于内存不足而被删除,RDD的缓存容错机制保证了即使缓存丢失也能保证计算的正确执行。通过基于RDD的一系列转换,丢失的数据会被重用,由于RDD的各个partition分区是相对独立的,因此只需要计算丢失的部分即可,并不需要重算全部Partition
RDD检查点
spark中对于数据的保存处理持久化操作之外,还提供了一种检查点的机制,检查点本质是通过RDD写入disk(磁盘)做检查点,是为了通过lineage做容错的辅助,lineage过长会导致容错成本过高,这样就不如在中间点做检查点容错,如果之后右节点出现问题而丢失分区,从做检查点RDD开始重做lineage,就会减少开销。检查点通过将数据写入到HDFS文件系统实现了RDD的检查功能
1.RDD缓存和检查点一般用于RDD血缘关系较长时。
2.缓存存在内存中,检查点存在磁盘中
数据的存储与读取
本地文件系统
MySQL数据库
Hbase数据库
RDD编程进阶
累加器
累加器是只写共享变量
累加器用来对信息进行聚合,通常在向spark传递函数时,比如使用map()函数 或者filter()传条件时,可以使用驱动器程序种定义的变量,但是集群中运行的每个任务都会得到这些变量的一份新的副本,更新这些副本的值也不会影响驱动器中的对应变量。如果我们向实现所有分片处理时更新共享变量的功能,那么累加器可以实现我们的需求
广播变量
广播变量是只读共享变量
广播变量用来高效分发较大的对象。向所有工作节点发送一个较大的只读值,以供一个或多个spark操作使用。比如,如果你的应用需要向所有节点发送一个较大的只读查询表,甚至是机器学习算法中的一个很大的特征向量,广播变量用起来都很顺手。在多个并行操作中使用同一个变量,spark会为每个任务分别发送
Spark SQL
Spark SQL基础
介绍
1.spark SQL是模仿hive而来的,它是spark用来处理结构化数据的一个模块,它提供了2个编程抽象,DataFrame 和 DataSet,并且作为分布式SQL查询引擎的作用
2.我们已经学习了hive,它是hive SQL转换成mapreduce然后提交到集群中执行,大大简化了mapreduce的程序的复杂性,由于mapreduce这种计算框架执行效率比较慢。所以spark SQL应运而生,它将spark SQL转换成RDD,然后提交集群执行,所以执行效率非常快
特点
1.易整合
2.统一的数据访问方式
3.兼容hive
4.标准的数据连接
2.统一的数据访问方式
3.兼容hive
4.标准的数据连接
两个编程抽象
DataFrame
与RDD类似,DataFrame也是一个分布式数据容器,然而DataFrame更像传统数据库的二维表格,处理表格以外,还记录数据的结构信息,即schema。同时,与hive类似,DataFrame也支持嵌套数据类型(struct,array,map)。从API易用性的角度上看,DataFrame API提供的是一套高层的关系操作,比函数式RDD API要更加友好了,门槛更低
DataSet
1.是DataFrame API的一个扩展,是spark最新的数据抽象
2.用户友好API风格,既具有类型安全检查也具有DataFrame的查询优化特性
3.DataSet支持编解码器,当需要访问非堆上的数据时可以避免反序列化整个对象,提高了效率
4.样例类被用在DataSet中定义数据的结构信息,样例类中的每个属性的名称直接映射到DataSet中的字段名称
2.用户友好API风格,既具有类型安全检查也具有DataFrame的查询优化特性
3.DataSet支持编解码器,当需要访问非堆上的数据时可以避免反序列化整个对象,提高了效率
4.样例类被用在DataSet中定义数据的结构信息,样例类中的每个属性的名称直接映射到DataSet中的字段名称
说明
1.DataFrame在RDD的基础上增加了结构,这样才能供sql查询
2.DataSet在DataFrame的基础上增加了类型
3.如果同样的数据都给到着三个数据类型,它们分别计算之后,都会给出相同的结构。不同的是它们执行的效率和执行的方式
4.在后期的spark版本中,DataSet会逐步取代RDD和DataFrame成为唯一的API接口
5.DataFrame是DataSet的特例,DataFrame = DataSet[Row],所以可以通过as的方式将DataFrame转换成DataSet,Row是一个类型,更Car,Person这些类型一样,所以有表结构的信息我们都用Row来表示
6.DataSet是强类型的,比如可以有DataSet[Car],DataSet[Person]
7.DataFrame只是知道字段,但是不知道字段的类型,所以在执行这些操作的时候是没办法
Spark SQL创建
DataFrame创建
1.1.SparkSession是spark最新的SQL查询的起始点,实质上是SQLContext和HiveContext的组合,所以在SQLContext和HiveContext上可用的API在SparkSession上同样是可以使用的,SparkSession内部封装了SparkContext,所以计算实际上是由SparkContext完成的
2.在sparkSQL中SparkSession是创建DataFrame和执行SQL的入口,创建DataFrame有三种方式:通过spark的数据源进行创建;从一个存在的RDD进行转换;还可以从一个hive table中查询返回
2.在sparkSQL中SparkSession是创建DataFrame和执行SQL的入口,创建DataFrame有三种方式:通过spark的数据源进行创建;从一个存在的RDD进行转换;还可以从一个hive table中查询返回
①spark.read可以读文件的类型:
csv format jdbc json load option options orc parquet schema table text textFile
②创建DataFrame
val df = spark.read.json("file:////export/servers/spark/data.json")
③查看DataFrame中的内容
df.show
④将DataFrame转换成一个临时视图(表),以便于使用SQL
df.createTempView("student") //student为视图名
⑤使用SQL查询语句
spark.sql("select * from student").show
注意:临时表是Session范围内的,Session退出后,表就失效了。如果向应用范围内有效,可以使用全局表。注意使用全局表时需要全路径访问。如global_temp.people
csv format jdbc json load option options orc parquet schema table text textFile
②创建DataFrame
val df = spark.read.json("file:////export/servers/spark/data.json")
③查看DataFrame中的内容
df.show
④将DataFrame转换成一个临时视图(表),以便于使用SQL
df.createTempView("student") //student为视图名
⑤使用SQL查询语句
spark.sql("select * from student").show
注意:临时表是Session范围内的,Session退出后,表就失效了。如果向应用范围内有效,可以使用全局表。注意使用全局表时需要全路径访问。如global_temp.people
⑥对于DataFrame创建一个全局表
df.createGlobalTempView("people")
⑦通过SQL全局实现查询全表
sprk.sql("select * from global_temp.people").show()
⑧在新的session中访问全局表
spark.newSession().sql("select * from global_temp.people").show()
df.createGlobalTempView("people")
⑦通过SQL全局实现查询全表
sprk.sql("select * from global_temp.people").show()
⑧在新的session中访问全局表
spark.newSession().sql("select * from global_temp.people").show()
DSL风格
1.创建一个DataFrame
val df = spark.read.json("file:////export/servers/spark/data.json")
2.查看DataFrame的Scheme信息
df.printSchema
4.查看“name”列的数据
df.select("name").show()
5.将年龄字段加 1
df.select($"age"+1).show
6.查看年龄大于18的行
df.filter($"age">18).show
7.根据每个年龄段进行分组
df.groupBy("age").count().show
1.创建一个DataFrame
val df = spark.read.json("file:////export/servers/spark/data.json")
2.查看DataFrame的Scheme信息
df.printSchema
4.查看“name”列的数据
df.select("name").show()
5.将年龄字段加 1
df.select($"age"+1).show
6.查看年龄大于18的行
df.filter($"age">18).show
7.根据每个年龄段进行分组
df.groupBy("age").count().show
DataSet创建
二.DataSet创建
//创建样例类
case class person(name:String,age:Int)
//根据样例类模板创建DS
val classDS = Seq(person("zhangsan",17)).toDS
//查看
classDS.show
三.将DS转换成表,以便于使用sql语法
classDS.createTempView("xxx")
spark.sql("select * from xxx").show
一.DataSet的基本介绍
DataSet是具有强类型的数据集合,需要提供对应的类型信息
DataSet是具有强类型的数据集合,需要提供对应的类型信息
二.DataSet创建
//创建样例类
case class person(name:String,age:Int)
//根据样例类模板创建DS
val classDS = Seq(person("zhangsan",17)).toDS
//查看
classDS.show
三.将DS转换成表,以便于使用sql语法
classDS.createTempView("xxx")
spark.sql("select * from xxx").show
RDD DataFrame DataSet转换
RDD与DataFrame相互转换
四.将DataFrame转换成RDD
val rdd2 = df.rdd
一.RDD转换成DataFrame
1.如果需要RDD与DF或者DS之间进行转换,那么需要引入 import spark.implicits._ 【spark不是包名,而是sparkSession对象的名称】
2.前置条件:导入隐式转换并创建一个RDD
1.如果需要RDD与DF或者DS之间进行转换,那么需要引入 import spark.implicits._ 【spark不是包名,而是sparkSession对象的名称】
2.前置条件:导入隐式转换并创建一个RDD
二.导包的方式转换
import spark.implicits._
val rdd = sc.makeRDD(List(1,2,23,4,5))
rdd.toDF("id") //为RDD添加结构就成了DF
df.show
import spark.implicits._
val rdd = sc.makeRDD(List(1,2,23,4,5))
rdd.toDF("id") //为RDD添加结构就成了DF
df.show
三.样例类方式转换
val rdd = sc.makeRDD(List(("zhangsan",22),("lisi",32),("wangwu",15)))
//创建样例类
case class people(name:String,age:Int)
//通过map算子将rdd中每个元素传入到样例类?(指定结构)
val peopleRDD = rdd.map(t => people(t._1,t._2))
//转换
val df = peopleRDD.toDF
df.show
val rdd = sc.makeRDD(List(("zhangsan",22),("lisi",32),("wangwu",15)))
//创建样例类
case class people(name:String,age:Int)
//通过map算子将rdd中每个元素传入到样例类?(指定结构)
val peopleRDD = rdd.map(t => people(t._1,t._2))
//转换
val df = peopleRDD.toDF
df.show
四.将DataFrame转换成RDD
val rdd2 = df.rdd
RDD与DataSet相互转换
一.将RDD转换成DataSet
val rdd = sc.makeRDD(List(("zhangsan",10),("lisi",90),("wangwu",29)))
//创建样例类
case people(name:String,age:Int)
//将rdd中元素传入到样例类中
val mapRDD = rdd.map(t => {people(t._1,t._2)})
//转换成DS
val ds = mapRDD.toDS
val rdd = sc.makeRDD(List(("zhangsan",10),("lisi",90),("wangwu",29)))
//创建样例类
case people(name:String,age:Int)
//将rdd中元素传入到样例类中
val mapRDD = rdd.map(t => {people(t._1,t._2)})
//转换成DS
val ds = mapRDD.toDS
二.将DS转换成RDD
val rdd2 = ds.rdd
val rdd2 = ds.rdd
DataFrame与DataSet相互转换
DataFrame 有结构
DataSet 有类型并且有结构
前提:需要导入隐式转换 import spark.implicits._
DataSet 有类型并且有结构
前提:需要导入隐式转换 import spark.implicits._
一.将DF变成DS
val rdd = sc.makeRDD(List(("zhangsan",22),("lisi",32),("wangwu",15)))
//将RDD加上结构就变成了DF
val df = rdd.toDF("name","age")
//将DF加上类型就变成了DS
case class people(name:String,age:Int) //这个就是类型
val ds = df.as[people]
val rdd = sc.makeRDD(List(("zhangsan",22),("lisi",32),("wangwu",15)))
//将RDD加上结构就变成了DF
val df = rdd.toDF("name","age")
//将DF加上类型就变成了DS
case class people(name:String,age:Int) //这个就是类型
val ds = df.as[people]
二.将DS变成DF
val df2 = ds.toDF
val df2 = ds.toDF
三者之间的共性
一.三者的共同点
1.RDD DataFrame DataSet全部是spark平台的下的分布式弹性数据集,为处理超大型数据提供便利
2.三者都有惰性机制,在进行创建,转换,如map方法时,不会立即执行,只有遇到行动算子Action,如foreach时,三者才会开始遍历
3.三者都会根据spark内存情况自动缓存运算,这样即使数据量很大,也不担心内存溢出
4.三者都有partition的概念
5.三者都有许多共同的函数,如filter sort等
6.在对DataFrame和DataSet进行操作到需要这个包支持 import spark.implicits._
7.DataFrame和DataSet均可使用模式匹配获取各个字段的值和类型
1.RDD DataFrame DataSet全部是spark平台的下的分布式弹性数据集,为处理超大型数据提供便利
2.三者都有惰性机制,在进行创建,转换,如map方法时,不会立即执行,只有遇到行动算子Action,如foreach时,三者才会开始遍历
3.三者都会根据spark内存情况自动缓存运算,这样即使数据量很大,也不担心内存溢出
4.三者都有partition的概念
5.三者都有许多共同的函数,如filter sort等
6.在对DataFrame和DataSet进行操作到需要这个包支持 import spark.implicits._
7.DataFrame和DataSet均可使用模式匹配获取各个字段的值和类型
二.三者的区别
1.RDD一般和sparkmlib同时使用,但不支持sparkSQL操作
2.DataFrame与RDD和DataSet不同,DataFrame每一行的类型固定为Row,每一列的值无法直接访问,只有通过解析才能获取各个字段的值
3.DataFrame和DataSet一般不与sparkmilb同时使用
1.RDD一般和sparkmlib同时使用,但不支持sparkSQL操作
2.DataFrame与RDD和DataSet不同,DataFrame每一行的类型固定为Row,每一列的值无法直接访问,只有通过解析才能获取各个字段的值
3.DataFrame和DataSet一般不与sparkmilb同时使用
IDEA创建Spark SQL程序
一.添加pom
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
二.代码演示,创建Session,RDD DataFrame DataSet三者进行转换
def main(args: Array[String]): Unit = {
//创建配置对象
val conf = new SparkConf().setMaster("local[*]").setAppName("SparkSQL01")
//构建sparkSQL环境对象
val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()
//RDD,DataFrame,DataSet三者进行转换之前
//需要引入隐式转换规则
//这里的spark含义不是包名的意思,而是Session对象的名称
import spark.implicits._
//创建rdd
val rdd:RDD[(Int,String,Int)] = spark.sparkContext.makeRDD(List((1,"张三",20),(2,"李四",30),(1,"王五",22)))
//将 rdd 转换成 DF
val df:DataFrame = rdd.toDF("id","name","age")
//将 DF 转换成 DS
//前提:需要创建类型规程,即创建一个样例类
val ds:Dataset[User] = df.as[User]
//将 DS 转换成 DF
val df1 = ds.toDF()
//将 DF 转换成 RDD
val rdd1:RDD[Row] = df1.rdd
//打印rdd1
rdd1.foreach(
//打印姓名
row => {
println(row.getString(1))
println(row)
}
)
def main(args: Array[String]): Unit = {
//创建配置对象
val conf = new SparkConf().setMaster("local[*]").setAppName("SparkSQL01")
//构建sparkSQL环境对象
val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()
//RDD,DataFrame,DataSet三者进行转换之前
//需要引入隐式转换规则
//这里的spark含义不是包名的意思,而是Session对象的名称
import spark.implicits._
//创建rdd
val rdd:RDD[(Int,String,Int)] = spark.sparkContext.makeRDD(List((1,"张三",20),(2,"李四",30),(1,"王五",22)))
//将 rdd 转换成 DF
val df:DataFrame = rdd.toDF("id","name","age")
//将 DF 转换成 DS
//前提:需要创建类型规程,即创建一个样例类
val ds:Dataset[User] = df.as[User]
//将 DS 转换成 DF
val df1 = ds.toDF()
//将 DF 转换成 RDD
val rdd1:RDD[Row] = df1.rdd
//打印rdd1
rdd1.foreach(
//打印姓名
row => {
println(row.getString(1))
println(row)
}
)
用户自定义函数
UDF
用户自定义函数
1.强类型的DataSet和弱类型的DataFrame都提供了相关的聚合函数,如count(),countdistinct(),avg(),max(),min()。除此之外,用户可以设定自己的自定义函数
1.强类型的DataSet和弱类型的DataFrame都提供了相关的聚合函数,如count(),countdistinct(),avg(),max(),min()。除此之外,用户可以设定自己的自定义函数
UDAF
用户自定义聚合函数
1.通过继承UserDefineAggregateFunction来实现用户自定义聚合函数,直接用SQL语句就可以使用该函数
2.通过继承Aggregator,可用DSC风格使用函数
1.通过继承UserDefineAggregateFunction来实现用户自定义聚合函数,直接用SQL语句就可以使用该函数
2.通过继承Aggregator,可用DSC风格使用函数
Spark SQL数据源
文件的读取与保存
Spark SQL的默认数据源为Parquet格式。数据源为Parquet文件时,Spark SQL可以方便的执行所有的操作。修改spark.sql.sources.default可以修改默认的数据源格式
二.读取文件
//读取json文件常规的方式
spark.read.json("/export/servers/spark/data.json")
//通用的读取文件方式(本地文件)
spark.read.format("json").load("/export/servers/spark/data.json")
//读取hdfs上的文件
spark.read.format("json").load("hdfs://192.168.220.25:9000/data.json")
//读取json文件常规的方式
spark.read.json("/export/servers/spark/data.json")
//通用的读取文件方式(本地文件)
spark.read.format("json").load("/export/servers/spark/data.json")
//读取hdfs上的文件
spark.read.format("json").load("hdfs://192.168.220.25:9000/data.json")
三.写文件到本地或者hdfs
//将文件写入到本地
df.write.format("json").save("/export/servers/spark/output")
//将文件写入到hdfs
df.write.format("json").save("hdfs://192.168.220.25:9000/output")
①写入时可以添加mode(写入方式)
//以追加的方式写入
比如:df.write.format("json").mode("append").save("hdfs://192.168.220.25:9000/output")
mode介绍:error(默认)append(追加)overwrite(覆写)ignore(数据存在,则忽略)
//将文件写入到本地
df.write.format("json").save("/export/servers/spark/output")
//将文件写入到hdfs
df.write.format("json").save("hdfs://192.168.220.25:9000/output")
①写入时可以添加mode(写入方式)
//以追加的方式写入
比如:df.write.format("json").mode("append").save("hdfs://192.168.220.25:9000/output")
mode介绍:error(默认)append(追加)overwrite(覆写)ignore(数据存在,则忽略)
MySQL读取与存储
一.取出数据库中的数据
spark.read.format("jdbc")
.option("url","jdbc:mysql://master:3306/rdd")
.option("dbtable","user")
.option("user","root")
.option("password","mysql")
.load()
spark.read.format("jdbc")
.option("url","jdbc:mysql://master:3306/rdd")
.option("dbtable","user")
.option("user","root")
.option("password","mysql")
.load()
二.将数据写入到MySQL
df.write.mode("append")
.option("url","jdbc:mysql://master:3306/rdd")
.option("dbtable","user")
.option("user","root")
.option("password","mysql")
.save()
df.write.mode("append")
.option("url","jdbc:mysql://master:3306/rdd")
.option("dbtable","user")
.option("user","root")
.option("password","mysql")
.save()
Spark操作hive数据库
一.基本介绍
默认情况下,spark自带hive,可以直接写spark.sql("...")来操作内置的hive数据库
默认情况下,spark自带hive,可以直接写spark.sql("...")来操作内置的hive数据库
二.使用外部hive
①删除spark中内置的hive。即删除metastore_db和spark-warehouse文件夹
②将外部hive中的hive-site.xml文件复制到spark/conf中
③重启spark-shell
④这时spark.sql("...")访问的就是外部的hive了
⑤可以使用bin/spark-sql命令段操作hive
①删除spark中内置的hive。即删除metastore_db和spark-warehouse文件夹
②将外部hive中的hive-site.xml文件复制到spark/conf中
③重启spark-shell
④这时spark.sql("...")访问的就是外部的hive了
⑤可以使用bin/spark-sql命令段操作hive
Spark Streaming
基本介绍
一.基本介绍
1.Spark Straming用于流式数据处理。Spark Straming支持多种数据源输入,例如kafka,flume,twitter,zeroMQ和简单的TCP套接字等等。数据输入后可以用spark的高度抽象原语,如map,reduce,join,window进行操作运算。而结果也可以保存再很多地方,如HDFS,数据库等。
2.和spark基于RDD的概念很相似,spark streaming是用离散化流(discretized stream)作为抽象表示,叫做DStream。DStream是随着时间推移而收到的数据的序列。每个时间收到的数据都作为RDD存在,而DStream是由这些RDD组成的序列,因此得名“离散化
1.Spark Straming用于流式数据处理。Spark Straming支持多种数据源输入,例如kafka,flume,twitter,zeroMQ和简单的TCP套接字等等。数据输入后可以用spark的高度抽象原语,如map,reduce,join,window进行操作运算。而结果也可以保存再很多地方,如HDFS,数据库等。
2.和spark基于RDD的概念很相似,spark streaming是用离散化流(discretized stream)作为抽象表示,叫做DStream。DStream是随着时间推移而收到的数据的序列。每个时间收到的数据都作为RDD存在,而DStream是由这些RDD组成的序列,因此得名“离散化
二.Spark Steaming特点
1.易用
2.容错
3.易整合到spark体系
1.易用
2.容错
3.易整合到spark体系
WordCount案例实操
一.需求:使用netcat工具向9999端口不断的发送数据,通过sparkStreaming读取端口并统计不同单词的出现的次数
二.Linux安装netcat工具
yum install -y nc
nc -lk 9999 //向9999端口发送消息
yum install -y nc
nc -lk 9999 //向9999端口发送消息
三.代码演示
//使用SparkSteaming完成wordcount
//构建sparkConf对象
val conf = new SparkConf().setMaster("local[*]").setAppName("wordcount")
//构建实时数据分析环境对象
//采集周期:以指定的时间为周期采集实时数据
val streamingContext:StreamingContext = new StreamingContext(conf, Seconds(3))
//从指定端口采集数据
val socketLineDStream:ReceiverInputDStream[String] = streamingContext.socketTextStream("192.168.220.25",9999)
//将采集的数据进行分解
val wordDStream:DStream[String] = socketLineDStream.flatMap(line => line.split(" "))
//将数据结构转换为k-v类型
val mapDStream:DStream[(String,Int)] = wordDStream.map(a => a -> 1)
//统计每个单词的数量
val mapToSumDStream:DStream[(String,Int)] = mapDStream.reduceByKey(_+_)
//将结果打印
mapToSumDStream.print()
//启动采集器
streamingContext.start()
//Driver等待采集器执行
streamingContext.awaitTermination()
//使用SparkSteaming完成wordcount
//构建sparkConf对象
val conf = new SparkConf().setMaster("local[*]").setAppName("wordcount")
//构建实时数据分析环境对象
//采集周期:以指定的时间为周期采集实时数据
val streamingContext:StreamingContext = new StreamingContext(conf, Seconds(3))
//从指定端口采集数据
val socketLineDStream:ReceiverInputDStream[String] = streamingContext.socketTextStream("192.168.220.25",9999)
//将采集的数据进行分解
val wordDStream:DStream[String] = socketLineDStream.flatMap(line => line.split(" "))
//将数据结构转换为k-v类型
val mapDStream:DStream[(String,Int)] = wordDStream.map(a => a -> 1)
//统计每个单词的数量
val mapToSumDStream:DStream[(String,Int)] = mapDStream.reduceByKey(_+_)
//将结果打印
mapToSumDStream.print()
//启动采集器
streamingContext.start()
//Driver等待采集器执行
streamingContext.awaitTermination()
DStream创建
一.DStream
Spark Streaming原生支持一些不同的数据源。一些和核心的数据源已经被打包到Spark Streaming的maven工件中,而其他的一些数据源则可以通过Spark-Streaming-kafka等附加工件获取。每个接收器都以spark执行器程序中一个长期运行的任务形式运行,因此会占据分配给应用的CPU核心。此外,我们还需要有可用的CPU核心来处理数据。这意味着如果要运行多个接收器,就必须至少有和接收器数目相同的核心数,还要加上用来完成计算所需要的核心数。例如:如果我们想要在流计算应用中运行10个接收器,那么至少为应用分配11个CPU核心,所以如果在本地运行,不要使用local[1]
Spark Streaming原生支持一些不同的数据源。一些和核心的数据源已经被打包到Spark Streaming的maven工件中,而其他的一些数据源则可以通过Spark-Streaming-kafka等附加工件获取。每个接收器都以spark执行器程序中一个长期运行的任务形式运行,因此会占据分配给应用的CPU核心。此外,我们还需要有可用的CPU核心来处理数据。这意味着如果要运行多个接收器,就必须至少有和接收器数目相同的核心数,还要加上用来完成计算所需要的核心数。例如:如果我们想要在流计算应用中运行10个接收器,那么至少为应用分配11个CPU核心,所以如果在本地运行,不要使用local[1]
二.文件数据源
文件数据源:能够读取所有HDFS API兼容的文件系统文件,通过fileStream方式进行读取,Spark Streaming将会监控dataDriectory目录不断移动进来的文件,记住目前不支持嵌套目录。
注意: ①文件需要有相同的数据格式
②文件进入dataDriectory的方式需要通过移动或者重命名来实现
③一旦文件移动进目录,则不能再修改,即便修改了也不会读取数据
文件数据源:能够读取所有HDFS API兼容的文件系统文件,通过fileStream方式进行读取,Spark Streaming将会监控dataDriectory目录不断移动进来的文件,记住目前不支持嵌套目录。
注意: ①文件需要有相同的数据格式
②文件进入dataDriectory的方式需要通过移动或者重命名来实现
③一旦文件移动进目录,则不能再修改,即便修改了也不会读取数据
三.代码演示
val conf = new SparkConf().setMaster("local[*]").setAppName("SparkStreaming02_FileDataSource")
//构建实时数据分析环境对象
//采集周期:以指定的时间为周期采集实时数据
val streamingContext:StreamingContext = new StreamingContext(conf, Seconds(5))
//从指定文件夹采集数据
val fileDStream:DStream[String] = streamingContext.textFileStream("D:\\Spark\\test")
//将采集的数据进行分解
val wordDStream:DStream[String] = fileDStream.flatMap(line => line.split(" "))
//将数据结构转换为k-v类型
val mapDStream:DStream[(String,Int)] = wordDStream.map(a => a -> 1)
//统计每个单词的数量
val mapToSumDStream:DStream[(String,Int)] = mapDStream.reduceByKey(_+_)
//将结果打印
mapToSumDStream.print()题
val conf = new SparkConf().setMaster("local[*]").setAppName("SparkStreaming02_FileDataSource")
//构建实时数据分析环境对象
//采集周期:以指定的时间为周期采集实时数据
val streamingContext:StreamingContext = new StreamingContext(conf, Seconds(5))
//从指定文件夹采集数据
val fileDStream:DStream[String] = streamingContext.textFileStream("D:\\Spark\\test")
//将采集的数据进行分解
val wordDStream:DStream[String] = fileDStream.flatMap(line => line.split(" "))
//将数据结构转换为k-v类型
val mapDStream:DStream[(String,Int)] = wordDStream.map(a => a -> 1)
//统计每个单词的数量
val mapToSumDStream:DStream[(String,Int)] = mapDStream.reduceByKey(_+_)
//将结果打印
mapToSumDStream.print()题
四.RDD队列用法及说明
测试过程中,可以通过使用ssc.queueStream(queueOfRDDs)来创建DStream,每一个推送到这个队列的RDD,都会作为一个DStream处理
五.需求
循环创建几个RDD,将RDD放入到队列中。通过SparkStreaming创建DStream,计算WordCount
测试过程中,可以通过使用ssc.queueStream(queueOfRDDs)来创建DStream,每一个推送到这个队列的RDD,都会作为一个DStream处理
五.需求
循环创建几个RDD,将RDD放入到队列中。通过SparkStreaming创建DStream,计算WordCount
从Kafka中采集数据
一.用法和说明
在工程中需要引入Maven 工件spark-streaming-kafka_2.10 来使用它。包内提供的KafkaUtils 对象可以在StreamingContext 和JavaStreamingContext 中以你的Kafka 消息创建出DStream。由于KafkaUtils 可以订阅多个主题,因此它创建出的DStream 由成对的主题和消息组成。要创建出一个流数据,需要使用StreamingContext 实例、一个由逗号隔开的ZooKeeper 主机列表字符串、消费者组的名字(唯一名字),以及一个从主题到针对这个主题的接收器线程数的映射表来调用createStream() 方法
在工程中需要引入Maven 工件spark-streaming-kafka_2.10 来使用它。包内提供的KafkaUtils 对象可以在StreamingContext 和JavaStreamingContext 中以你的Kafka 消息创建出DStream。由于KafkaUtils 可以订阅多个主题,因此它创建出的DStream 由成对的主题和消息组成。要创建出一个流数据,需要使用StreamingContext 实例、一个由逗号隔开的ZooKeeper 主机列表字符串、消费者组的名字(唯一名字),以及一个从主题到针对这个主题的接收器线程数的映射表来调用createStream() 方法
二.添加依赖
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
<version>2.4.3</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
<groupId>org.apache.kafka</groupId
<artifactId>kafka-clients</artifactId>
<version>2.3.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
<version>2.4.3</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
<groupId>org.apache.kafka</groupId
<artifactId>kafka-clients</artifactId>
<version>2.3.0</version>
</dependency>
三.代码演示
package sparkStreaming
import kafka.serializer.StringDecoder
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka._
object test01 {
def main(args: Array[String]): Unit = {
//待接收的topic名称
val topics = "name1,spark"
//broker地址
val brokers = "192.168.220.25:9092"
//创建SparkConf对象
val conf = new SparkConf().setMaster("local[*]").setAppName("kafka test")
//创建ssc对象
val ssc = new StreamingContext(conf,Seconds(3))
//分割待接收topic
val topicsSet = topics.split(",").toSet
//连接kafka的配置文件
val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
//根据配置文件获取Kafka消息
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
ssc, kafkaParams, topicsSet
)
//读取Kafka消息
val lines = messages.map(_._2)
val words = lines.flatMap(_.split(" ")).map(x=>(x,1))
words.reduceByKey(_ + _).print()
ssc.start()
ssc.awaitTermination()
}
}
package sparkStreaming
import kafka.serializer.StringDecoder
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka._
object test01 {
def main(args: Array[String]): Unit = {
//待接收的topic名称
val topics = "name1,spark"
//broker地址
val brokers = "192.168.220.25:9092"
//创建SparkConf对象
val conf = new SparkConf().setMaster("local[*]").setAppName("kafka test")
//创建ssc对象
val ssc = new StreamingContext(conf,Seconds(3))
//分割待接收topic
val topicsSet = topics.split(",").toSet
//连接kafka的配置文件
val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
//根据配置文件获取Kafka消息
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
ssc, kafkaParams, topicsSet
)
//读取Kafka消息
val lines = messages.map(_._2)
val words = lines.flatMap(_.split(" ")).map(x=>(x,1))
words.reduceByKey(_ + _).print()
ssc.start()
ssc.awaitTermination()
}
}
DStream转换
一.DStream转换基本介绍
DStream上的原语于RDD类似,分为transformation(转换)和output operation(输出)两种,此外转换操作中还有一些比较特殊的原语,比如:updateStateByKey() transform()以及各种Window相关的原语。
DStream上的原语于RDD类似,分为transformation(转换)和output operation(输出)两种,此外转换操作中还有一些比较特殊的原语,比如:updateStateByKey() transform()以及各种Window相关的原语。
二.无状态转化操作
无状态转化就是把简单的RDD转化操作应用到每个批次上,也就是转化DStream中的每一个RDD。简单的说就是每个批次的数据没有关联
无状态转化就是把简单的RDD转化操作应用到每个批次上,也就是转化DStream中的每一个RDD。简单的说就是每个批次的数据没有关联
三.有状态转化操作 (重点)
1.UpdateStateByKey原语用于记录历史记录,有时,我们需要在 DStream 中跨批次维护状态(例如流计算中累加wordcount)。针对这种情况,updateStateByKey() 为我们提供了对一个状态变量的访问,用于键值对形式的 DStream。给定一个由(键,事件)对构成的 DStream,并传递一个指定如何根据新的事件 更新每个键对应状态的函数,它可以构建出一个新的 DStream,其内部数据为键值对
2.updateStateByKey() 的结果会是一个新的 DStream,其内部的 RDD 序列是由每个时间区间对应的(键,状态)对组成的。
updateStateByKey操作使得我们可以在用新信息进行更新时保持任意的状态。为使用这个功能,你需要做下面两步:
① 定义状态,状态可以是一个任意的数据类型。
② 定义状态更新函数,用此函数阐明如何使用之前的状态和来自输入流的新值对状态进行更新。
③使用updateStateByKey需要对检查点目录进行配置,会使用检查点来保存状态
updateStateByKey操作使得我们可以在用新信息进行更新时保持任意的状态。为使用这个功能,你需要做下面两步:
① 定义状态,状态可以是一个任意的数据类型。
② 定义状态更新函数,用此函数阐明如何使用之前的状态和来自输入流的新值对状态进行更新。
③使用updateStateByKey需要对检查点目录进行配置,会使用检查点来保存状态
window函数
一.Window Operations(窗口操作)介绍
Window Operations可以设置窗口的大小和滑动的间隔来动态的获取当前Streaming的用于状态
Window Operations可以设置窗口的大小和滑动的间隔来动态的获取当前Streaming的用于状态
//设置窗口函数
//窗口大小应该为采集周期的整数倍,窗口滑动的步长也应该为采集周期的整数倍
val window:DStream[(String,String)] = kafkaDStream.window(Seconds(9),Seconds(3))
//窗口大小应该为采集周期的整数倍,窗口滑动的步长也应该为采集周期的整数倍
val window:DStream[(String,String)] = kafkaDStream.window(Seconds(9),Seconds(3))
0 条评论
下一页