Spark基础
2023-06-04 11:25:59 3 举报
AI智能生成
详细总结了spark的基础知识点,包含spark的RDD,SparkSQL,SparkStreaming等等
作者其他创作
大纲/内容
Spark SQL
Spark SQL概述
定义
Spark用来处理结构化数据的一个模块,它提供了2个编程抽象:DataFrame和DataSet,将Spark SQL转换成RDD,然后提交到集群执行
特点
易整合
统一的数据访问方式
兼容Hive
标准的数据连接
DataFrame
定义
DataFrame是一个分布式数据容器,除了数据以外,还记录数据的结构信息,即schema,也支持嵌套数据类型(struct、array和map)
DataFrame是为数据提供了Schema的视图。可以把它当做数据库中的一张表来对待
DataFrame也是懒执行的。性能上比RDD要高,主要原因:
优化的执行计划:查询计划通过Spark catalyst optimiser进行优化
逻辑查询计划优化就是一个利用基于关系代数的等价变换,将高成本的操作替换为低成本操作的过程
与RDD的关系图
分支主题
DataSet
DataSet比DataFrame多展示了数据的类型
SparkSQL编程
SparkSession
SparkSession是Spark最新的SQL查询起始点,实质上是SQLContext和HiveContext的组合
RDD、DataFrame、DataSet
RDD - DataFrame
RDD -> DataFrame
// RDD转换为DF,DS时,需要增加隐式转换,需要引入spark环境对象的隐式转换规则
import spark.implicits._
val df = rdd.toDF("id","name","age")
DataFrame -> RDD
val rdd1 = df.rdd
RDD DataSet
RDD -> DataSet
val userRDD = rdd.map {
case (id, name, age) => {
User(id, name, age)
}
}
分支主题
val userDS = userRDD.toDS()
DataSet -> RDD
val rdd3 = userDS.rdd
DataFrame - DataSet
DataFrame -> DataSet
val ds = df.as[User]
DataSet -> DataFrame
val df1 = ds.toDF()
图示
分支主题
共性与差异
共性
三者均是Spark平台下的分布式弹性数据集
都有惰性机制,Action时才运算
根据内存,自动缓存运算
均有partition概念
import spark.implicits._,需要导入此包支持操作
差异
RDD不支持SparkSQL
与RDD和Dataset不同,DataFrame每一行的类型固定为Row,每一列的值没法直接访问,只有通过解析才能获取各个字段的值
Dataset和DataFrame拥有完全相同的成员函数,区别只是每一行的数据类型不同
用户自定义函数
// 1. 继承UserDefinedAggregateFunction// 2. 重写方法
// 主方法中注册函数
SparkSQL数据源
通用加载/保存方法
手动指定选项
Spark SQL的默认数据源为Parquet格式
修改配置项spark.sql.sources.default,可修改默认数据源格式
当数据源格式不是parquet格式文件时,需要手动指定数据源的格式。数据源格式需要指定全名(例如:org.apache.spark.sql.parquet)
如果数据源格式为内置格式,则只需要指定简称定json, parquet, jdbc, orc, libsvm, csv, text来指定数据的格式
spark.read.format("json").load
peopleDF.write.format("parquet").save
文件保存选项
SaveMode.ErrorIfExists(default)
SaveMode.Append
SaveMode.Overwrite
SaveMode.Ignore
JSON文件
Spark SQL 能够自动推测 JSON数据集的结构,并将它加载为一个Dataset[Row].
可以通过SparkSession.read.json()去加载一个 一个JSON 文件
spark.read.json
Parquet文件
Parquet是一种流行的列式存储格式,可以高效地存储具有嵌套字段的记录
Spark SQL 提供了直接读取和存储 Parquet 格式文件的方法
spark.read.parquet
JDBC
Spark SQL可以通过JDBC从关系型数据库中读取数据的方式创建DataFrame
通过对DataFrame一系列的计算后,还可以将数据再写回关系型数据库中
需要将相关的数据库驱动放到spark的类路径下
Hive数据库
内嵌Hive使用
外部Hive应用
SparkSQL实战
SparkStreaming
Spark Streaming概述
定义
Spark Streaming使用离散化流(discretized stream)作为抽象表示,叫作DStream。DStream 是随时间推移而收到的数据的序列。在内部,每个时间区间收到的数据都作为 RDD 存在,而DStream是由这些RDD所组成的序列
特点
易用
容错
易整合
架构
分支主题
DStream入门
Discretized Stream是Spark Streaming的基础抽象,代表持续性的数据流和经过各种Spark原语操作后的结果数据流。在内部实现上,DStream是一系列连续的RDD来表示。每个RDD含有一段时间间隔内的数据。
DStream创建
文件数据源
文件数据流:能够读取所有HDFS API兼容的文件系统文件,通过fileStream方法进行读取
注意事项
文件需要有相同的数据格式;
文件进入 dataDirectory的方式需要通过移动或者重命名来实现;
一旦文件移动进目录,则不能再修改,即便修改了也不会读取新数据;
RDD队列
ssc.queueStream(queueOfRDDs)
用法
//3.创建RDD队列
val rddQueue = new mutable.Queue[RDD[Int]]()
//4.创建QueueInputDStream
val inputStream = ssc.queueStream(rddQueue,oneAtATime = false)
//8.循环创建并向RDD队列中放入RDD
for (i <- 1 to 5) {
rddQueue += ssc.sparkContext.makeRDD(1 to 300, 10)
Thread.sleep(2000)
}
自定义数据源
用法
//3.创建自定义receiver的Streaming
val lineStream = ssc.receiverStream(new CustomerReceiver("hadoop102", 9999))
CustomerReceiver
class CustomerReceiver(host: String, port: Int) extends Receiver[String](StorageLevel.MEMORY_ONLY) {
分支主题
//最初启动的时候,调用该方法,作用为:读数据并将数据发送给Spark
override def onStart(): Unit = {
new Thread("Socket Receiver") {
override def run() {
receive()
}
}.start()
}
分支主题
//读数据并将数据发送给Spark
def receive(): Unit = {
分支主题
//创建一个Socket
var socket: Socket = new Socket(host, port)
分支主题
//定义一个变量,用来接收端口传过来的数据
var input: String = null
分支主题
//创建一个BufferedReader用于读取端口传来的数据
val reader = new BufferedReader(new InputStreamReader(socket.getInputStream, StandardCharsets.UTF_8))
分支主题
//读取数据
input = reader.readLine()
分支主题
//当receiver没有关闭并且输入数据不为空,则循环发送数据给Spark
while (!isStopped() && input != null) {
store(input)
input = reader.readLine()
}
分支主题
//跳出循环则关闭资源
reader.close()
socket.close()
分支主题
//重启任务
restart("restart")
}
分支主题
override def onStop(): Unit = {}
}
Kafka数据源
在工程中需要引入 Maven 工件 spark- streaming-kafka_2.10 来使用它
用法
导入依赖
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
<version>2.1.1</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.11.0.2</version>
</dependency>
DStream转换
DStream输出
Spark概述
Hadoop小剧场
Hadoop1.x版本的问题
分支主题
Hadoop2.x版本
分支主题
Spark小剧场
为什么使用函数式编程
分支主题
什么是Spark
Spark是基于内存的快速、通用。可扩展的大数据分析引擎
Spark内置模块
模块分区
Spark SQL 结构化数据 | Spark Streaming 实时计算
Spark Core
独立调度器 | Yarn | Mesos
模块解释
Spark Core
实现了Spark的基本功能,包含任务调度、内存管理、错误恢复、与存储系统交互等模块。Spark Core中还包含了对弹性分布式数据集(Resilient Distributed DataSet,简称RDD)的API定义
Spark SQL
是Spark用来操作结构化数据的程序包。通过Spark SQL,我们可以使用 SQL或者Apache Hive版本的SQL方言(HQL)来查询数据。Spark SQL支持多种数据源,比如Hive表、Parquet以及JSON等
Spark Streaming
是Spark提供的对实时数据进行流式计算的组件。提供了用来操作数据流的API,并且与Spark Core中的 RDD API高度对应
Spark MLlib
提供常见的机器学习(ML)功能的程序库。包括分类、回归、聚类、协同过滤等,还提供了模型评估、数据 导入等额外的支持功能
集群管理器
Spark设计为可以高效地在一个计算节点到数千个计算节点之间伸缩计算。为了实现这样的要求,同时获得最大灵活性,Spark支持在各种集群管理器(Cluster Manager)上运行,包括Hadoop YARN、Apache Mesos,以及Spark自带的一个简易调度器,叫作独立调度器。
Spark运行模式
重要角色
Driver(驱动器)
概述
Spark的驱动器是执行开发程序中的main方法的进程。它负责开发人员编写的用来创建SparkContext、创建RDD,以及进行RDD的转化操作和行动操作代码的执行
Driver:任务的调度和切分
功用
把用户程序转为作业(JOB)
跟踪Executor的运行状况
为执行器节点调度任务
UI展示应用运行状况
Executor(执行器)
概述
Spark Executor是一个工作进程,负责在 Spark 作业中运行任务,任务间相互独立。Spark 应用启动时,Executor节点被同时启动,并且始终伴随着整个 Spark 应用的生命周期而存在。
Executor:任务的执行
功用
负责运行组成 Spark 应用的任务,并将结果返回给驱动器进程;
通过自身的块管理器(Block Manager)为用户程序中要求缓存的RDD提供内存式存储。RDD是直接缓存在Executor进程内的,因此任务可以在运行时充分利用缓存数据加速运算。
图示
分支主题
运行模式
Local模式
模式
local
所有计算运行在一个线程中
local[K]
指定使用几个线程来运行计算,通常CPU有几核,就指定几个线程
local[*]
按CPU内部最多的Cores设置线程数
基本语法
bin/spark-submit \
--class <main-class>
--master <master-url> \
--deploy-mode <deploy-mode> \
--conf <key>=<value> \
... # other options
<application-jar> \
[application-arguments]
参数说明
--master 指定Master的地址,默认为Local
--class: 你的应用的启动类 (如 org.apache.spark.examples.SparkPi)
--deploy-mode: 是否发布你的驱动到worker节点(cluster) 或者作为一个本地客户端 (client) (default: client)*
--conf: 任意的Spark配置属性, 格式key=value. 如果值包含空格,可以加引号“key=value”
application-jar: 打包好的应用jar,包含依赖. 这个URL在集群中全局可见。 比如hdfs:// 共享存储系统, 如果是 file:// path, 那么所有的节点的path都包含同样的jar
application-arguments: 传给main()方法的参数
--executor-memory 1G 指定每个executor可用内存为1G
--total-executor-cores 2 指定每个executor使用的cup核数为2个
Standalone模式
Yarn模式
SparkCore
RDD概述
什么是RDD
Spark是一个分布式数据集的分析框架,将计算单元缩小为更适合分布式计算和并行计算的模型,称之为RDD
RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是Spark中最基本的数据(计算)抽象。
弹性
自动进行内存和磁盘数据存储的切换
基于Lineage(血统)的高效容错机制
Task如果失败,自动进行特定次数的重试
Stage如果失败,自动进行特定次数的重试
checkpoint和persist,可主动或被动触发
数据调度弹性:DAGScheduler、TaskScheduler与资源管理无关
数据分片的高度弹性(coalesce)
分布式
数据的来源
数据集
数据的类型 & 计算逻辑的封装 (数据模型)
代码中是一个抽象类,它代表一个不可变、可分区、里面的元素可并行计算的集合。
不可变
计算逻辑不可变
可分区
提高数据处理能力
并行计算
多任务同时执行
图示
分支主题
RDD的属性
一组分区(Partition)即数据集的基本组成单位
一个计算各个分区间的函数
一个有关于各个RDD间依赖关系的列表
一个存储存取每个Partition的优先位置(preferred location)的列表
优先位置是为了利于计算
一个关于键值key-value分片的Partitioner
RDD的特点
分区
RDD逻辑上是分区的,每个分区的数据是抽象存在的
计算的时候会通过一个compute函数得到每个分区的数据
如果RDD是通过已有的文件系统构建,则compute函数是读取指定文件系统中的数据
如果RDD是通过其他RDD转换而来,则compute函数是执行转换逻辑将其他RDD的数据进行转换
只读
要想改变RDD中的数据,只能在现有的RDD基础上创建新的RDD
RDD的操作算子包括两类,一类叫做transformations,它是用来将RDD进行转化,构建RDD的血缘关系
一类叫做actions,它是用来触发RDD的计算,得到RDD的相关计算结果或者将RDD保存的文件系统中
依赖
RDDs维护着操作算子转换的血缘关系,即依赖
依赖的分类
窄依赖
一对一
宽依赖
多对多
图示
分支主题
缓存
如果在应用程序中多次使用同一个RDD,可以将该RDD缓存起来,计算一次后进入缓存,就不再根据血缘关系计算
Checkpoint
迭代会使RDDs之间的血缘关系变长,因此RDD引入checkpoint,将数据持久化存储,从而减轻血缘关系的依赖
RDD编程
RDD的创建
从集合中创建RDD(内存)
val rdd = sc.parallelize(Array(1,2,3,4,5,6,7,8))
val rdd1 = sc.makeRDD(Array(1,2,3,4,5,6,7,8))
从外部的存储创建RDD(硬盘)
val rdd2= sc.textFile("hdfs://hadoop131:9000/RELEASE")
从其他RDD创建(转换)
RDD的转换
Value类型
map(fuc)
返回一个新的RDD,该RDD由每一个输入元素经过func函数转换后组成
mapPartitions(fuc)
类似于map,但独立地在RDD的每一个分片(分区)上运行.
map()和mapPartitions()的区别
map():每次处理一条数据
mapPartitions():每次处理一个分区的数据,这个分区的数据处理完后,原RDD中分区的数据才能释放,可能导致OOM
当内存空间较大的时候建议使用mapPartitions(),以提高处理效率
mapPartitionsWithIndex(fuc)
类似于mapPartitions,但func带有一个整数参数表示分片的索引值,因此在类型为T的RDD上运行时,func的函数类型必须是(Int, Interator[T]) => Iterator[U]
flatMap(fuc)
类似于map,但是每一个输入元素可以被映射为0或多个输出元素(所以func应该返回一个序列,而不是单一元素)
glom(配合flatmap可以合并分区)(fuc)
将每一个分区形成一个数组,形成新的RDD类型时RDD[Array[T]]
groupBy(fuc)
分组,按照传入函数的返回值进行分组。将相同的key对应的值放入一个迭代器
filter(fuc)
过滤。返回一个新的RDD,该RDD由经过func函数计算后返回值为true的输入元素组成。
sample(withReplacement, fraction, seed)
以指定的随机种子随机抽样出数量为fraction的数据,withReplacement表示是抽出的数据是否放回,true为有放回的抽样,false为无放回的抽样,seed用于指定随机数生成器种子。
distinct([numTasks])
对源RDD进行去重后返回一个新的RDD。默认情况下,只有8个并行任务来操作,但是可以传入一个可选的numTasks参数改变它
coalesce(numPartitions)
缩减分区数,用于大数据集过滤后,提高小数据集的执行效率
repartition(numPartitions)
根据分区数,重新通过网络随机洗牌所有数据
coalesce和repartition的区别
coalesce重新分区,可以选择是否进行shuffle过程。由参数shuffle: Boolean = false/true决定
repartition实际上是调用的coalesce,默认是进行shuffle的
sortBy(func,[ascending], [numTasks])
使用func先对数据进行处理,按照处理后的数据比较结果排序,默认为正序
双Value类型交互
union(otherDataset)
并集
subtract (otherDataset)
差集
intersection(otherDataset)
交集
cartesian(otherDataset)
笛卡尔积
zip(otherDataset)
将两个RDD组合成Key/Value形式的RDD,这里默认两个RDD的partition数量以及元素数量都相同,否则会抛出异常
Key-Value类型
partitionBy
对pairRDD进行分区操作,如果原有的partionRDD和现有的partionRDD是一致的话就不进行分区, 否则会生成ShuffleRDD,即会产生shuffle过程
可以通过指定的分区器决定数据计算的分区,spark默认的分区器为HashPartitioner
groupByKey
groupByKey也是对每个key进行操作,但只生成一个sequence
reduceByKey(func, [numTasks])
在一个(K,V)的RDD上调用,返回一个(K,V)的RDD,使用指定的reduce函数,将相同key的值聚合到一起,reduce任务的个数可以通过第二个可选的参数来设置
reduceByKey和groupByKey的区别
reduceByKey:按照key进行聚合,在shuffle之前有combine(预聚合)操作,返回结果是RDD[k,v]
groupByKey:按照key进行分组,直接进行shuffle
一般情况下reduceByKey比groupByKey更好,但要注意combine的使用条件必须是不影响最终的业务逻辑
aggregateByKey
参数:(zeroValue:U,[partitioner: Partitioner]) (seqOp: (U, V) => U,combOp: (U, U) => U)
在kv对的RDD中,,按key将value进行分组合并,合并时,将每个value和初始值作为seq函数的参数,进行计算,返回的结果作为一个新的kv对,然后再将结果按照key进行合并,最后将每个分组的value传递给combine函数进行计算(先将前两个value进行计算,将返回结果和下一个value传给combine函数,以此类推),将key与计算结果作为一个新的kv对输出。
即计算两次,一次分区内,一次分区外
foldByKey
aggregateByKey分区内和分区外计算规则相同可用foldByKey
combineByKey[C]
(createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C)
createCombiner
mergeValue
mergeCombiners
将第一个key出现的v转换结构计算规则,第二个参数表示分区内计算规则,第三个参数表示分区间计算规则
sortByKey([ascending], [numTasks])
在一个(K,V)的RDD上调用,K必须实现Ordered接口,返回一个按照key进行排序的(K,V)的RDD
mapValues
针对于(K,V)形式的类型只对V进行操作
join(otherDataset, [numTasks])
在类型为(K,V)和(K,W)的RDD上调用,返回一个相同key对应的所有元素对在一起的(K,(V,W))的RDD
cogroup(otherDataset, [numTasks])
在类型为(K,V)和(K,W)的RDD上调用,返回一个(K,(Iterable<V>,Iterable<W>))类型的RDD
Action
reduce(func)
通过func函数聚集RDD中的所有元素,先聚合分区内数据,再聚合分区间数据
collect()
在驱动程序中,以数组的形式返回数据集的所有元素。
count()
返回RDD中元素的个数
first()
返回RDD中的第一个元素
take(n)
返回一个由RDD的前n个元素组成的数组
takeOrdered(n)
返回该RDD排序后的前n个元素组成的数组
aggregate
(zeroValue: U)(seqOp: (U, T) ⇒ U, combOp: (U, U) ⇒ U)
fold(num)(func)
折叠操作,aggregate的简化操作,seqop和combop一样
saveAsTextFile(path)
saveAsSequenceFile(path)
saveAsObjectFile(path)
countByKey()
针对(K,V)类型的RDD,返回一个(K,Int)的map,表示每一个key对应的元素个数
foreach(func)
在数据集的每一个元素上,运行函数func进行更新
RDD中的函数传递
在实际开发中我们往往需要自己定义一些对于RDD的操作,那么此时需要主要的是,初始化工作是在Driver端进行的,而实际运行程序是在Executor端进行的,这就涉及到了跨进程通信,是需要序列化的
函数的序列化 extends Serializable
传递方法
传递属性
RDD依赖关系
Lineage
在大量记录上执行的单个操作,将创建RDD的一系列Lineage(血统)记录下来,以便恢复丢失的分区
RDD的Lineage会记录RDD的元数据信息和转换行为
如何查看Lineage
.toDebugString
如何查看依赖类型
.dependencies
窄依赖
窄依赖指的是每一个父RDD的Partition最多被子RDD的一个Partition使用
一对一关系
宽依赖
宽依赖指的是多个子RDD的Partition会依赖同一个父RDD的Partition,会引起shuffle
多对多关系
DAG(Directed Acyclic Graph)
原始的RDD通过一系列的转换就就形成了DAG,根据RDD之间的依赖关系的不同将DAG划分成不同的Stage,对于窄依赖,partition的转换处理在Stage中完成计算。对于宽依赖,由于有Shuffle的存在,只能在parent RDD处理完成后,才能开始接下来的计算,因此宽依赖是划分Stage的依据
图示
分支主题
任务划分
名词解释
Application
初始化一个SparkContext即生成一个Application,Driver也可理解为Application
Job
一个Action算子就会生成一个Job,只针对于Action算子
Stage
根据RDD之间的依赖关系的不同将Job划分成不同的Stage,遇到一个宽依赖则划分一个Stage
Task
Stage是一个TaskSet,将Stage划分的结果发送到不同的Executor执行即为一个Task
Application->Job->Stage-> Task每一层都是1对n的关系
RDD缓存
RDD通过persist方法或cache方法可以将前面的计算结果缓存,默认情况下 persist() 会把数据以序列化的形式缓存在 JVM 的堆空间中
RDD CheckPoint
本质是通过将RDD写入Disk做检查点
检查点通过将数据写入到HDFS文件系统实现了RDD的检查点功能
在checkpoint的过程中,该RDD的所有依赖于父RDD中的信息将全部被移除
对RDD进行checkpoint操作并不会马上被执行,必须执行Action操作才能触发
键值对RDD数据分区器
概述
Hash
Hash分区为当前的默认分区
Range
Spark中分区器直接决定了RDD中分区的个数、RDD中每条数据经过Shuffle过程属于哪个分区和Reduce的个数
只有Key-Value类型的RDD才有分区器的,非Key-Value类型的RDD分区器的值是None
获取RDD分区
partitioner属性
Hash分区
计算key的hashcode,与分区个数取余,小于0则加分区个数,大于0则加0
弊端:导致每个分区中数据量的不均匀,极端情况下会导致某些分区拥有RDD的全部数据
Ranger分区
将一定范围内的数映射到某一个分区内,尽量保证每个分区中数据量的均匀,而且分区与分区之间是有序的,但是分区内的元素是不能保证顺序的。
实现步骤
从整个RDD中抽取样本并排序,计算每个分区最大key,形成数组变量
判断key在rangeBounds中的范围,赋予id下标
自定义分区
继承Partitioner
实现方法
numPartitions: Int:返回创建出来的分区数。
getPartition(key: Any): Int:返回给定键的分区编号(0到numPartitions-1)。
equals():Java
判断相等性的标准方法
数据读取与保存
文件类数据读取与保存
Text文件
读取textFile
保存saveAsTextFile
Json文件
使用RDD读取JSON文件处理很复杂,同时SparkSQL集成了很好的处理JSON文件的方式,所以应用中多是采用SparkSQL处理JSON文件
import scala.util.parsing.json.JSON
读取textFile
解析map
Csv文件
Sequence文件
Hadoop用来存储二进制形式的key-value对而设计的一种平面文件(Flat File)
SequenceFile文件只针对PairRDD
读取sequenceFile()
保存saveAsSequenceFile()
Object文件
对象文件是将对象序列化后保存的文件,采用Java的序列化机制
读取objectFile
保存saveAsObjectFile
文件系统类数据读取与保存
HDFS
最抽象的两个函数接口
hadoopRDD
输入格式(InputFormat)
键类型
值类型
分区值
newHadoopRDD
输入格式(InputFormat)
键类型
值类型
分区值
备注
在Hadoop中以压缩形式存储的数据,不需要指定解压方式就能够进行读取
Hadoop本身有一个解压器会根据压缩文件的后缀推断解压算法进行解压.
map-reduce如何读取某一类型数据,将该对应读取方式改写为上述的两种接口
MySQL数据库
jdbcRDD
添加依赖
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.27</version>
</dependency>
代码详解
MySQL读取
// 定义连接mysql的参数
val driver = "com.mysql.jdbc.Driver"
val url = "jdbc:mysql://hadoop102:3306/rdd"
val userName = "root"
val passWd = "000000"
//创建JdbcRDD
val rdd = new JdbcRDD(sc, () => {
Class.forName(driver)
DriverManager.getConnection(url, userName, passWd)
},
"select * from `rddtable` where `id`>=?;",
1,
10,
1,
r => (r.getInt(1), r.getString(2))
)
MySQL写入
def insertData(iterator: Iterator[String]): Unit = {
Class.forName ("com.mysql.jdbc.Driver").newInstance()
val conn = java.sql.DriverManager.getConnection("jdbc:mysql://hadoop102:3306/rdd", "root", "000000")
iterator.foreach(data => {
val ps = conn.prepareStatement("insert into rddtable(name) values (?)")
ps.setString(1, data)
ps.executeUpdate()
})
}
HBase数据库
概述
Spark 可以通过Hadoop输入格式访问HBase
添加依赖
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>1.3.1</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>1.3.1</version>
</dependency>
代码详解
HBase读取
//构建HBase配置信息
val conf: Configuration = HBaseConfiguration.create()
conf.set("hbase.zookeeper.quorum", "hadoop102,hadoop103,hadoop104")
conf.set(TableInputFormat.INPUT_TABLE, "rddtable")
//从HBase读取数据形成RDD
val hbaseRDD: RDD[(ImmutableBytesWritable, Result)] = sc.newAPIHadoopRDD(
conf,
classOf[TableInputFormat],
classOf[ImmutableBytesWritable],
classOf[Result])
HBase写入
//创建HBaseConf
val conf = HBaseConfiguration.create()
val jobConf = new JobConf(conf)
jobConf.setOutputFormat(classOf[TableOutputFormat])
jobConf.set(TableOutputFormat.OUTPUT_TABLE, "fruit_spark")
分支主题
//构建Hbase表描述器
val fruitTable = TableName.valueOf("fruit_spark")
val tableDescr = new HTableDescriptor(fruitTable)
tableDescr.addFamily(new HColumnDescriptor("info".getBytes))
分支主题
//创建Hbase表
val admin = new HBaseAdmin(conf)
if (admin.tableExists(fruitTable)) {
admin.disableTable(fruitTable)
admin.deleteTable(fruitTable)
}
admin.createTable(tableDescr)
分支主题
//定义往Hbase插入数据的方法
def convert(triple: (Int, String, Int)) = {
val put = new Put(Bytes.toBytes(triple._1))
put.addImmutable(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes(triple._2))
put.addImmutable(Bytes.toBytes("info"), Bytes.toBytes("price"), Bytes.toBytes(triple._3))
(new ImmutableBytesWritable, put)
}
分支主题
//创建一个RDD
val initialRDD = sc.parallelize(List((1,"apple",11), (2,"banana",12), (3,"pear",13)))
分支主题
//将RDD内容写到HBase
val localData = initialRDD.map(convert)
分支主题
localData.saveAsHadoopDataset(jobConf)
}
RDD编程进阶
Spark三大数据结构
RDD
弹性分布数据集
累加器
分布式共享只写数据
广播变量
分布式共享只读数据
累加器
系统累加器
所有分片处理时更新共享变量
自定义累加器
extends org.apache.spark.util.AccumulatorV2
广播变量(调优策略)
val broadcastVar = sc.broadcast(Array(1, 2, 3))
全局可读
扩展
0 条评论
下一页
为你推荐
查看更多