SparkContext初始化
2019-08-29 17:18:06 0 举报
AI智能生成
为你推荐
查看更多
Sparkcontext初始化
作者其他创作
大纲/内容
SparkContext初始化
ContextCleaner
作用
清除临时数据
初始化代码
_cleaner = if (_conf.getBoolean(\"spark.cleaner.referenceTracking\
总结
默认开启
EventLoggingListener(可选组件)
EventLoggingListener 是将事件持久化到存储的监听器,是 SparkContext 中可选组件
spark.eventLog.enabled(默认为 false)
ExecutorAllocationManager(可选组件)
ExecutorAllocationManager可以动态的分配最小Executor的数量、动态分配最大Executor的数量、每个Executor可以运行的Task数量等配置信息,并对配置信息进行校验
默认情况下不会创建ExecutorAllocationManager,可以修改属性spark.dynamicAllocation.enabled为true来创建
start方法将ExecutorAllocationListener加入listenerBus中,ExecutorAllocationListener通过监听listenerBus里的事件,动态的添加、删除Executor
通过不断添加Executor,遍历Executor,将超时的Executor杀死并移除
ShutdownHookManager
ShutdownHookManager的创建是在SparkContext中,为了在Spark程序挂掉的时候,处理一些清理工作。
_shutdownHookRef = ShutdownHookManager.addShutdownHook( ShutdownHookManager.SPARK_CONTEXT_SHUTDOWN_PRIORITY) { () => logInfo(\"Invoking stop() from shutdown hook\") try { stop() } catch { case e: Throwable => logWarning(\"Ignoring Exception while stopping SparkContext from shutdown hook\
释放资源
删除文件
SparkEnv
Spark运行环境
SparkEnv主要创建的对象:
securityManager (安全管理器)
rpcEnv (rpc通信环境)
serializerManager (序列化管理器)
broadcastManager (广播变量管理器)
mapOutputTracker (map输出跟踪器)
shuffleManager (shuffle管理)
memoryManager (内存管理)
blockManager (块管理)
metricsSystem (计量系统)
outputCommitCoordinator (输出提交控制器)
createDriverEnv
createExecutorEnv
create
SparkEnv的使用
Hadoop 相关配置
默认情况下,Spark使用HDFS作为分布式文件系统,所以需要获取Hadoop相关的配置信息
private var _hadoopConfiguration: Configuration = __hadoopConfiguration = SparkHadoopUtil.get.newConfiguration(_conf)
将Amazon S3文件系统的AWS_ACCESS_KEY_ID和 AWS_SECRET_ACCESS_KEY加载到Hadoop的Configuration
将SparkConf中所有的以spark.hadoop.开头的属性都赋值到Hadoop的Configuration
将SparkConf的属性spark.buffer.size复制到Hadoop的Configuration的配置io.file.buffer.size
LiveListenerBus
LiveListenerBus是SparkContext中的事件总线。它异步地将事件源产生的事件(SparkListenerEvent)投递给已注册的监听器(SparkListener)。Spark中广泛运用了监听器模式,以适应集群状态下的分布式事件汇报。
private var _listenerBus: LiveListenerBus = __listenerBus = new LiveListenerBus(_conf)
LiveListenerBus继承了SparkListenerBus,并实现了将事件异步投递给监听器,达到实时刷新UI界面数据的效果。
工作流程图
图中的DAGScheduler、SparkContext、BlockManagerMasterEndpoint、DriverEndpoint及LocalSchedulerBackend都是LiveListenerBus的事件来源,它们都是通过调用LiveListenerBus的post方法将消息交给异步线程listenerThread处理的
1.概述
1.Spark Driver 的初始化始终围绕着 SparkContext 的初始化。
2.SparkContext 是 Spark 中的元老级 A1PI, 从 0.x.x 版本就已经存在。
3.SparkContext组件:
AppStatusStore
SparkUI
SparkStatusTracker
ConsoleProgressBar
DAGScheduler
SchedulerBackend
TaskScheduler
HeartbeatReceiver
JobProgressListener
4.属性:
creationSite
allowMultipleContexts
startTime
stopped
addedFiles
addedlars
persistentRdds
executorEnvs
sparkUser
checkpointDir
localProperties
_conf
_jars
_files
_eventLogDir
_eventLogCodec
_hadoopConfiguration
_executorMemory
_applicationId
_applicationAttemptId
_listenerBusStarted
nextShuffleId
nextRddId
5.代码
SparkUI 提供了用浏览器访问具有样式及布局并且提供丰富监控数据的页面。
attachTab()方法注册Tab
ExecutorsTab
JobsTab
StagesTab
EnvironmentTab
StorageTab
其采用的是事件监听机制。发送的事件会存入缓存,由定时调度器取出后分配给监听此事件的监听器对监控数据进行更新。
如果不需要SparkUI,则可以将spark.ui.enabled置为false。
主要依赖于流行的Servlet容器Jetty实现
Spark Web UI(Spark2.3之前)Web UI的数据来自度量系统
Spark Web UI(Spark2.3之后)Web UI的数据来自AppStatusStore
spark.ui.killEnabled(在Spark Web UI界面中展示强行杀掉Spark Application Job的开关,默认为true)
AppStatusStore提供Spark程序运行中各项监控指标的键值对化存储。Web UI中见到的数据指标基本都存储在这里
private var _statusStore: AppStatusStore = __statusStore = AppStatusStore.createLiveStore(conf)
AppStatusStore是在Spark 2.3.0版本才加入的,一个KVStore包装器,允许跟踪特定类型的元素数量和触发一旦达到阈值就采取行动。 例如,这允许编写者控制多少数据通过在添加新数据时可能删除旧数据来存储。
AppStatusStore的构造依赖
ElementTrackingStore(为键值对存储KVStore)
AppStatusListener
AppStatusStore的实现依赖InMemoryStore与ElementTrackingStore,数据来源通过AppStatusListener写入
SparkStatusTracker提供报告最近作业执行情况的低级API。
它的内部只有6个方法,从AppStatusStore中查询并返回诸如Job/Stage ID、活跃/完成/失败的Task数、Executor内存用量等基础数据。
它只能保证非常弱的一致性语义,也就是说它报告的信息会有延迟或缺漏。
SparkStatusTracker负责对Job和Stage的监控,实际也是使用了JobProgressListener中的监控数据,并额外进行了一些加工
ConsoleProgressBar显示Stage的进度
_progressBar = if (_conf.get(UI_SHOW_CONSOLE_PROGRESS) && !log.isInfoEnabled) { Some(new ConsoleProgressBar(this)) } else { None }
通过轮询SparkStatusTracker来获取数据,通过spark.ui.consoleProgress.update.interval设置时长,默认200毫秒
开启
spark.ui.showConsoleProgress=true(默认开启)
日志级别至少是WARN
DAGScheduler主要负责分析用户提交的应用,并根据计算任务的依赖关系建立DAG,然后将DAG划分为不同的Stage,其中每个Stage由可以执行的一组Task构成,这些Task的执行逻辑完全相同,只是作用于不同的数据
_dagScheduler = new DAGScheduler(this)
RDD的action操作,将会触发handleJobSubmitted方法
处理的时候,会先创建一个resultStage,每个job只有一个resultstage,其余的都是shufflestage.然后根据rdd的依赖关系,按照广度优先的思想遍历rdd,遇到shufflerdd就创建一个新的stage。
形成DAG图后,遍历等待执行的stage列表,如果这个stage所依赖的父stage执行完了,它就可以执行了;否则还需要继续等待。
最终stage会以taskset的形式,提交给TaskScheduler,然后最后提交给excutor。
HeartbeatReceiver是心跳接收器。Executor需要向Driver定期发送心跳包来表示自己存活。它本质上也是个监听器,继承了SparkListener
需要在“createTaskScheduler”之前注册“HeartbeatReceiver”,因为Executor将在构造函数中检索“HeartbeatReceiver”
executor每10s心跳一次startDriverHeartbeater(),conf配置spark.executor.heartbeatInterval
executor心跳超时时间120s,conf配置spark.network.timeout
SchedulerBackend是Apache Spark 中任务调度系统的抽象,可以调度来自各种集群管理器的资源(例如内置的Spark Standalone,Hadoop YARN,Kubernetes,Apache Mesos和Spark local)
主要实现
LocalSchedulerBackend
MesosFineGrainedSchedulerBackend
CoarseGrainedSchedulerBackend
YarnClientSchedulerBackend
YarnClusterSchedulerBackend
StandaloneSchedulerBackend
...
TaskScheduler是Spark中的底层调度器,目前只被TaskSchedulerImpl具体实现。
跟随SchedulerBackend一起被初始化
TaskScheduler的具体实现TaskSchedulerImpl会通过SchedulerBackend根据不同的集群模式对tasks进行调度
0 条评论
回复 删除
下一页