Spark原生架构
2022-09-10 09:26:26 0 举报
AI智能生成
spark大纲架构,系统的归纳spark技术
作者其他创作
大纲/内容
Datasets & DataFrames
基础
一个 DataFrame 是一个 Dataset 组成的指定列
DataFrame API 可以在 Scala,Java,Python,和 R中实现
DataFrame 也有丰富的函数库,包括 string 操作,date 算术
Dataset 与 RDD 相似,然而,并不是使用 Java 序列化或者 Kryo 编码器 来序列化用于处理或者通过网络进行传输的对象
Spark SQL 支持两种不同的方法用于转换已存在的 RDD 成为 Dataset
使用反射去推断一个包含指定的对象类型的 RDD 的 Schema
创建 Dataset 的方法是通过一个允许你构造一个 Schema 然后把它应用到一个已存在的 RDD 的编程接口
GraphX
简介
GraphX 是 Spark 中用于图形和图形并行计算的新组件
GraphX 公开了一组基本运算符(例如: subgraph,joinVertices 和 aggregateMessages)
GraphX 还包括越来越多的图形算法 和 构建器
属性 Graph
属性 Graph 是一个定向多重图形,用户定义的对象附加到每个顶点和边缘
示例属性 Graph
Graph 运算符
map,filter,以及 reduceByKey
Property 运算符
与 RDD map 运算符一样
Structural 运算符
Join 运算符
使用图形连接来自外部收集(RDD)的数据
邻域聚合
聚合消息 (aggregateMessages)
Map Reduce Triplets Transition Guide (Legacy)
计算级别信息
是计算每个顶点的程度
收集相邻点
在每个顶点处收集相邻顶点及其属性可以更容易地表达计算
Caching and Uncaching
Pregel API
Graph 建造者
GraphLoader.edgeListFile
Graph.apply
Graph.fromEdges
Graph.fromEdgeTuples
Vertex and Edge RDDs
VertexRDDs
EdgeRDDs
定义的各种分区策略之一来组织块中的边 PartitionStrategy
优化表示
Graph 算法
PageRank
连接组件
Triangle 计数
MLlib
RDD-API
数据类型
基本统计
分类和回归
协同过滤
聚类
维度降低
特征提取和转换
频繁模式挖掘
评估指标
PMML模型导出
基本统计
数据源
管道
提取,转换和选择功能
分类和回归
聚类
协同过滤
频繁模式挖掘
模型选择和调整
高级主题
数据类型
基本统计
分类和回归
协同过滤
聚类
维度降低
特征提取和转换
频繁模式挖掘
评估指标
PMML模型导出
优化(开发人员)
Spark SQL
简介
Spark SQL 是 Spark 处理结构化数据的一个模块
Spark SQL 提供了查询结构化数据及计算结果等信息的接口
Spark SQL 也能够被用于从已存在的 Hive 环境中读取数据
编程语言运行SQL 时,查询结果将以 Dataset/DataFrame的形式返回
可以使用 命令行或者通过 JDBC/ODBC与 SQL 接口
Spark SQL中所有功能的入口点是 SparkSession 类
Spark SQL能够把RDD 转换为一个DataFrame,并推断其类型
Data Sources(数据源)
Generic Load/Save Functions(通用 加载/保存 功能)
Manually Specifying Options(手动指定选项)
Run SQL on files directly(直接在文件上运行 SQL)
Save Modes(保存模式)
Saving to Persistent Tables(保存到持久表)
Bucketing, Sorting and Partitioning(分桶,排序和分区)
Parquet Files
Loading Data Programmatically(以编程的方式加载数据)
Partition Discovery(分区发现)
Schema Merging(模式合并)
Hive metastore Parquet table conversion(Hive metastore Parquet table 转换)
Hive/Parquet Schema Reconciliation
Metadata Refreshing(元数据刷新)
Configuration(配置)
spark.sql.parquet.binaryAsString
spark.sql.parquet.int96AsTimestamp
spark.sql.parquet.cacheMetadata
spark.sql.parquet.compression.codec
spark.sql.parquet.filterPushdown
spark.sql.hive.convertMetastoreParquet
spark.sql.parquet.mergeSchema
spark.sql.optimizer.metadataOnly
Hive 表
指定 Hive 表的存储格式
与不同版本的 Hive Metastore 进行交互
性能调优
在内存中缓存数据
spark.sql.inMemoryColumnarStorage.compressed
spark.sql.inMemoryColumnarStorage.batchSize
其他配置选项
spark.sql.files.maxPartitionBytes
spark.sql.files.openCostInBytes
spark.sql.broadcastTimeout
spark.sql.autoBroadcastJoinThreshold
spark.sql.shuffle.partitions
分布式 SQL 引擎
运行 Thrift JDBC/ODBC 服务器
运行 Spark SQL CLI
基础
简介
Apache Spark 是一个快速的,通用的集群计算系统
它对 Java,Scala,Python 和 R 提供了的高层 API
有一个经优化的支持通用执行图计算的引擎
还支持一组丰富的高级工具,包括用于 SQL 和结构化数据处理的 Spark SQL
用于机器学习的 MLlib
用于图计算的 GraphX 和 Spark Streaming
安全
Spark RPC(Spark 进程之间的通信协议)
Spark 支持基于 AES 的 RPC 连接加密
Spark 支持加密写入本地磁盘的临时数据
网页界面认证和授权
身份验证ACL
配置默认 SSL 设置,这些设置将用于所有支持的通信协议
Spark钥匙库
HTTP 安全报头
为网络安全配置端口
使用密钥表
使用票证缓存
Kubernetes 的安全交互
集群运行模式
Amazon EC2: 可让您在5分钟左右的时间内在EC2上启动集群的脚本
Standalone 部署模式: 此模式下无需第三方集群管理器即可快速启动独立集群
Mesos: 使用 Apache Mesos 部署私有集群
YARN: 在Hadoop NextGen(YARN)之上部署Spark
Kubernetes: 在Kubernetes上部署Spark
Spark Shell
Spark shell 提供了一种来学习该 API 比较简单的方式,以及一个强大的来分析数据交互的工具
Spark 的主要抽象是一个称为 Dataset 的分布式的 item 集合
Dataset
Dataset actions(操作)和transformations(转换)可以用于更复杂的计算
map->reduce
缓存
park 还支持 Pulling(拉取)数据集到一个群集范围的内存缓存
编程
概述
每一个 Spark 应用程序由一个在集群上运行着用户的 main 函数和执行各种并行操作的 _driver program_(驱动程序)组成
Spark 提供的主要抽象是一个 _弹性分布式数据集_(RDD),它是可以执行并行操作且跨集群节点的元素的集合
支持共享变量
_broadcast variables_(广播变量)
_accumulators_(累加器)
Spark 依赖
Spark 2.2.0 默认使用 Scala 2.11 来构建和发布直到运行
HDFS 版本添加一个 hadoop-client(hadoop 客户端)依赖
初始化 Spark
创建一个 SparkContext 对象,它会告诉 Spark 如何访问集群
使用 spark-submit 启动应用
弹性分布式数据集(RDDs)
简介
Spark 主要以一个 弹性分布式数据集_(RDD)的概念为中心,它是一个容错且可以执行并行操作的元素的集合
并行集合
调用 SparkContext 的 parallelize 方法来创建并行集合
可以并行操作的 distributed dataset(分布式数据集)中复制到另一个 dataset(数据集)
外部 Datasets(数据集)
从 Hadoop 所支持的任何存储源中创建 distributed dataset(分布式数据集)
本地文件系统,HDFS,Cassandra,HBase,Amazon S3
Spark 支持文本文件,SequenceFiles
任何其它的 Hadoop InputFormat
注意事项
如果使用本地文件系统的路径,所工作节点的相同访问路径下该文件必须可以访问
所有 Spark 基于文件的 input 方法,包括 textFile,支持在目录上运行,压缩文件,和通配符
textFile 方法也可以通过第二个可选的参数来控制该文件的分区数量
RDD 操作
_transformations(转换)_,它会在一个已存在的 dataset 上创建一个新的 dataset,所有的 transformations 都是 _lazy(懒加载的)
map(func)
filter(func)
flatMap(func)
mapPartitions(func)
mapPartitionsWithIndex(func)
sample(withReplacement, fraction, seed)
union(otherDataset)
intersection(otherDataset)
distinct([numTasks]))
groupByKey([numTasks])
reduceByKey(func, [numTasks])
aggregateByKey(zeroValue)(seqOp, combOp, [numTasks])
sortByKey([ascending], [numTasks])
join(otherDataset, [numTasks])
cogroup(otherDataset, [numTasks])
cartesian(otherDataset)
pipe(command, [envVars])
coalesce(numPartitions)
repartition(numPartitions)
repartitionAndSortWithinPartitions(partitioner)
_actions(动作)_,将在 dataset 上运行的计算后返回到 driver 程序
reduce(func)
collect()
count()
first()
take(n)
takeSample(withReplacement, num, [seed])
takeOrdered(n, [ordering])
saveAsTextFile(path)
saveAsSequenceFile(path)
saveAsObjectFile(path)
countByKey()
foreach(func)
Shuffle 操作
mapPartitions 对每个 partition 分区进行排序,例如,.sorted
repartitionAndSortWithinPartitions 在分区的同时对分区进行高效的排序.
sortBy 对 RDD 进行全局的排序
RDD Persistence(持久化)
使用 persist() 方法
MEMORY_ONLY
MEMORY_AND_DISK
MEMORY_ONLY_SER
(Java and Scala)
MEMORY_AND_DISK_SER
DISK_ONLY
MEMORY_ONLY_2
MEMORY_AND_DISK_2
cache() 方法
删除数据
RDD.unpersist()
Spark Streaming
简介
Spark Core API 的扩展
支持弹性的,高吞吐的,容错的实时数据流的处理
数据从Kafka,Flume,Kinesis 以及 TCP sockets多种数据源获取
map,reduce,join,window函数处理
data streams(数据流)上使用 机器学习 以及 图计算 算法
原理
Spark Streaming 接收实时输入数据流并将数据切分成多个 batch(批)数据
Spark 引擎处理它们以生成最终的 stream of results in batches(分批流结果)
依赖
Spark Streaming 可以通过 Maven 来管理依赖
Spark Streaming Core API 中不存在的数据源中获取数据,如 Kafka,Flume,Kinesis
初始化 StreamingContext
StreamingContext是所有的 Spark Streaming 功能的主入口点
StreamingContext 对象也可以从一个现有的 SparkContext 对象来创建
上下文初始后
通过创建输入 DStreams 来定义输入源。
通过应用转换和输出操作 DStreams 定义流计算(streaming computations)。
开始接收输入并且使用 streamingContext.start() 来处理数据。
使用 streamingContext.awaitTermination() 等待处理被终止(手动或者由于任何错误)。
使用 streamingContext.stop() 来手动的停止处理。
注意事项
一旦一个 context 已经启动,将不会有新的数据流的计算可以被创建或者添加到它。
一旦一个 context 已经停止,它不会被重新启动。
同一时间内在 JVM 中只有一个 StreamingContext 可以被激活。
在 StreamingContext 上的 stop() 同样也停止了 SparkContext。为了只停止 StreamingContext,设置 stop() 的可选参数,名叫 stopSparkContext 为 false。
一个 SparkContext 就可以被重用以创建多个 StreamingContexts,只要前一个 StreamingContext 在下一个StreamingContext 被创建之前停止(不停止 SparkContext)。
两种内置的 streaming source
_Basic sources(基础的数据源)_
_Advanced sources(高级的数据源)
source
File Streams: 用于从文件中读取数据
Streams based on Custom Receivers(基于自定义的接收器的流
Queue of RDDs as a Stream(RDDs 队列作为一个流)
高级Source
Kafka: Spark Streaming 2.2.0 与 Kafka broker 版本 0.8.2.1
Flume: Spark Streaming 2.2.0 与 Flume 1.6.0 相兼容
Kinesis: Spark Streaming 2.2.0 与 Kinesis Client Library 1.2.1
自定义 Sources(数据源)
Receiver Reliability(接收器的可靠性)
Reliable Receiver(可靠的接收器)
Unreliable Receiver(不可靠的接收器)
Join 操作
Stream-stream joins
leftOuterJoin,rightOuterJoin,fullOuterJoin
join window stream(窗口流)与 dataset
DStreams 上的输出
print()
Python API 这在 Python API 中称为 **pprint()**。
saveAsTextFiles(prefix, [suffix])
saveAsObjectFiles(prefix, [suffix])
Python API 这在Python API中是不可用的。
saveAsHadoopFiles(prefix, [suffix])
Python API 这在Python API中是不可用的。
foreachRDD(func)
MLlib 操作
Streaming 线性回归,Streaming KMeans 等
离线学习模型
缓存 / 持久性
reduceByWindow
reduceByKeyAndWindow
updateStateByKey
Checkpointing
Metadata checkpointing 定义 streaming 计算的信息保存到容错存储(如 HDFS)
Data checkpointing - 将生成的 RDD 保存到可靠的存储
使用状态转换 - 如果在应用程序中使用 updateStateByKey或 reduceByKeyAndWindow(具有反向功能)
从运行应用程序的 driver 的故障中恢复 - 元数据 checkpoint 用于使用进度信息进行恢复
streamingContext.checkpoint(checkpointDirectory)配置
部署要求
集群管理器集群
打包应用程序 JAR
为 executor 配置足够的内存
配置 checkpoint
配置应用程序 driver 的自动重新启动
配置预写日志
设置最大接收速率
monitor监控
running receivers(运行接收器)
receivers(接收器)是否处于 active(活动状态)
接收到的 records(记录)数
receiver error(接收器错误)
完成 batches(批次)
batch processing times(批处理时间)
queueing delays(排队延迟)
重要指标
Processing Time(处理时间)
Scheduling Delay(调度延迟)
Performance Tuning(性能调优)
通过有效利用集群资源
设置正确的 batch size(批量大小)
Reducing the Batch Processing Times(减少批处理时间)
Level of Parallelism in Data Receiving(数据接收中的并行级别)
Level of Parallelism in Data Processing(数据处理中的并行度水平)
Setting the Right Batch Interval(设置正确的批次间隔)
Data Serialization(数据序列化)
Input data(输入数据)
Persisted RDDs generated by Streaming Operations(流式操作生成的持久 RDDs)
内存调优
Persistence Level of DStreams(DStreams 的持久性级别)
Clearing old data(清除旧数据)
CMS Garbage Collector(CMS垃圾收集器)
Other tips(其他提示):为了进一步降低 GC 开销
语义
_At most once(最多一次)
_At least once(至少一次)
_Exactly once(有且仅一次)
基本语义
_Receiving the data(接收数据)
_Transforming the data(转换数据)
_Pushing out the data(推出数据)
组件
Cluster Manager 类型
Standalone -- 包含在 Spark 中使得更容易安装集群的一个简单 Cluster Manage
Apache Mesos -- 一个通用的 Cluster Manager,它也可以运行 Hadoop MapReduce 和其它服务应用。
Client Mode(客户端模式)
Cluster mode(集群模式)
Mesos 运行模式
Coarse-Grained(粗粒度)
Fine-Grained (deprecated)(细粒度,不推荐)
Hadoop YARN -- Hadoop 2 中的 resource manager(资源管理器)。
Kubernetes -- 用于自动化部署、扩展和管理容器化应用程序的开源系统
高级工具
集群范围的监控工具,例如 Ganglia可以提供对整体集群利用率和资源瓶颈的洞察。例如,Ganglia 仪表板可以快速显示特定工作负载是否为磁盘绑定,网络绑定或 CPU 绑定。
操作系统分析工具,如 dstat,iostat 和 iotop 可以在单个节点上提供细粒度的分析。
JVM 实用程序,如 jstack 提供堆栈跟踪,jmap 用于创建堆转储,jstat 用于报告时间序列统计数据和 jconsole 用于可视化地浏览各种 JVM 属性对于那些合适的 JVM 内部使用是有用的
Spark 调优
数据序列化
内存调优
内存大小: 对象使用的内存 大小 (你可能希望整个 dataset 都能在内存中)
成本: 访问对象的成本
开销: gc 开销(如果对象的周转率很高)。
内存管理概论
执行内存: 是指用于 shuffle 、join、sort 和 aggregation 的计算
存储内存: 是指用于在集群中 cache 和传播内部数据的内存。
确定内存消耗
优化数据结构
序列化 RDD 存储
GC优化
测量GC的影响
高级GC调优
其它考虑
并行级别
Reduce任务内存使用
广播大变量
数据局部性
作业调度
跨应用调度
standalone 和 YARN调度
Mesos
动态资源分配
配置和部署
资源分配策略
请求策略
移除策略
优雅的关闭Executor(执行器)
应用内调度
公平调度资源池
资源池默认行为
配置资源池属性
0 条评论
下一页