SparkContext初始化
2019-08-29 17:18:06 0 举报
AI智能生成
Sparkcontext初始化
作者其他创作
大纲/内容
ContextCleaner
作用
清除临时数据
初始化代码
_cleaner =
if (_conf.getBoolean("spark.cleaner.referenceTracking", true)) {
Some(new ContextCleaner(this))
} else {
None
}
if (_conf.getBoolean("spark.cleaner.referenceTracking", true)) {
Some(new ContextCleaner(this))
} else {
None
}
总结
默认开启
清理RDD,shuffle,Broadcast,accumulator,checkpoint等数据
EventLoggingListener(可选组件)
作用
EventLoggingListener 是将事件持久化到存储的监听器,是 SparkContext 中可选组件
初始化代码
_eventLogger =
if (isEventLogEnabled) {
val logger =
new EventLoggingListener(_applicationId, _applicationAttemptId, _eventLogDir.get,
_conf, _hadoopConfiguration)
logger.start()
listenerBus.addToEventLogQueue(logger)
Some(logger)
} else {
None
}
if (isEventLogEnabled) {
val logger =
new EventLoggingListener(_applicationId, _applicationAttemptId, _eventLogDir.get,
_conf, _hadoopConfiguration)
logger.start()
listenerBus.addToEventLogQueue(logger)
Some(logger)
} else {
None
}
总结
spark.eventLog.enabled(默认为 false)
ExecutorAllocationManager(可选组件)
作用
ExecutorAllocationManager可以动态的分配最小Executor的数量、动态分配最大Executor的数量、每个Executor可以运行的Task数量等配置信息,并对配置信息进行校验
初始化代码
val dynamicAllocationEnabled = Utils.isDynamicAllocationEnabled(_conf)
_executorAllocationManager =
if (dynamicAllocationEnabled) {
schedulerBackend match {
case b: ExecutorAllocationClient =>
Some(new ExecutorAllocationManager(
schedulerBackend.asInstanceOf[ExecutorAllocationClient], listenerBus, _conf,
_env.blockManager.master))
case _ =>
None
}
} else {
None
}
_executorAllocationManager =
if (dynamicAllocationEnabled) {
schedulerBackend match {
case b: ExecutorAllocationClient =>
Some(new ExecutorAllocationManager(
schedulerBackend.asInstanceOf[ExecutorAllocationClient], listenerBus, _conf,
_env.blockManager.master))
case _ =>
None
}
} else {
None
}
总结
默认情况下不会创建ExecutorAllocationManager,可以修改属性spark.dynamicAllocation.enabled为true来创建
start方法将ExecutorAllocationListener加入listenerBus中,ExecutorAllocationListener通过监听listenerBus里的事件,动态的添加、删除Executor
通过不断添加Executor,遍历Executor,将超时的Executor杀死并移除
ShutdownHookManager
作用
ShutdownHookManager的创建是在SparkContext中,为了在Spark程序挂掉的时候,处理一些清理工作。
初始化代码
_shutdownHookRef = ShutdownHookManager.addShutdownHook(
ShutdownHookManager.SPARK_CONTEXT_SHUTDOWN_PRIORITY) { () =>
logInfo("Invoking stop() from shutdown hook")
try {
stop()
} catch {
case e: Throwable =>
logWarning("Ignoring Exception while stopping SparkContext from shutdown hook", e)
}
}
} catch {
case NonFatal(e) =>
logError("Error initializing SparkContext.", e)
try {
stop()
} catch {
case NonFatal(inner) =>
logError("Error stopping SparkContext after init error.", inner)
} finally {
throw e
}
}
ShutdownHookManager.SPARK_CONTEXT_SHUTDOWN_PRIORITY) { () =>
logInfo("Invoking stop() from shutdown hook")
try {
stop()
} catch {
case e: Throwable =>
logWarning("Ignoring Exception while stopping SparkContext from shutdown hook", e)
}
}
} catch {
case NonFatal(e) =>
logError("Error initializing SparkContext.", e)
try {
stop()
} catch {
case NonFatal(inner) =>
logError("Error stopping SparkContext after init error.", inner)
} finally {
throw e
}
}
总结
释放资源
删除文件
SparkEnv
Spark运行环境
SparkEnv主要创建的对象:
securityManager (安全管理器)
rpcEnv (rpc通信环境)
serializerManager (序列化管理器)
broadcastManager (广播变量管理器)
mapOutputTracker (map输出跟踪器)
shuffleManager (shuffle管理)
memoryManager (内存管理)
blockManager (块管理)
metricsSystem (计量系统)
outputCommitCoordinator (输出提交控制器)
createDriverEnv
createExecutorEnv
create
SparkEnv的使用
Hadoop 相关配置
作用
默认情况下,Spark使用HDFS作为分布式文件系统,所以需要获取Hadoop相关的配置信息
初始化代码
private var _hadoopConfiguration: Configuration = _
_hadoopConfiguration = SparkHadoopUtil.get.newConfiguration(_conf)
_hadoopConfiguration = SparkHadoopUtil.get.newConfiguration(_conf)
总结
将Amazon S3文件系统的AWS_ACCESS_KEY_ID和 AWS_SECRET_ACCESS_KEY加载到Hadoop的Configuration
将SparkConf中所有的以spark.hadoop.开头的属性都赋值到Hadoop的Configuration
将SparkConf的属性spark.buffer.size复制到Hadoop的Configuration的配置io.file.buffer.size
LiveListenerBus
作用
LiveListenerBus是SparkContext中的事件总线。
它异步地将事件源产生的事件(SparkListenerEvent)投递给已注册的监听器(SparkListener)。
Spark中广泛运用了监听器模式,以适应集群状态下的分布式事件汇报。
它异步地将事件源产生的事件(SparkListenerEvent)投递给已注册的监听器(SparkListener)。
Spark中广泛运用了监听器模式,以适应集群状态下的分布式事件汇报。
初始化代码
private var _listenerBus: LiveListenerBus = _
_listenerBus = new LiveListenerBus(_conf)
_listenerBus = new LiveListenerBus(_conf)
总结
LiveListenerBus继承了SparkListenerBus,并实现了将事件异步投递给监听器,达到实时刷新UI界面数据的效果。
工作流程图
图中的DAGScheduler、SparkContext、BlockManagerMasterEndpoint、DriverEndpoint及LocalSchedulerBackend都是LiveListenerBus的事件来源,它们都是通过调用LiveListenerBus的post方法将消息交给异步线程listenerThread处理的
1.概述
1.Spark Driver 的初始化始终围绕着 SparkContext 的初始化。
2.SparkContext 是 Spark 中的元老级 A1PI, 从 0.x.x 版本就已经存在。
3.SparkContext组件:
SparkEnv
LiveListenerBus
AppStatusStore
SparkUI
SparkStatusTracker
ConsoleProgressBar
DAGScheduler
SchedulerBackend
TaskScheduler
HeartbeatReceiver
ContextCleaner
JobProgressListener
EventLoggingListener(可选组件)
ExecutorAllocationManager(可选组件)
ShutdownHookManager
4.属性:
creationSite
allowMultipleContexts
startTime
stopped
addedFiles
addedlars
persistentRdds
executorEnvs
sparkUser
checkpointDir
localProperties
_conf
_jars
_files
_eventLogDir
_eventLogCodec
_hadoopConfiguration
_executorMemory
_applicationId
_applicationAttemptId
_listenerBusStarted
nextShuffleId
nextRddId
5.代码
SparkUI
作用
SparkUI 提供了用浏览器访问具有样式及布局并且提供丰富监控数据的页面。
attachTab()方法注册Tab
ExecutorsTab
JobsTab
StagesTab
EnvironmentTab
StorageTab
总结
其采用的是事件监听机制。发送的事件会存入缓存,由定时调度器取出后分配给监听此事件的监听器对监控数据进行更新。
如果不需要SparkUI,则可以将spark.ui.enabled置为false。
主要依赖于流行的Servlet容器Jetty实现
Spark Web UI(Spark2.3之前)Web UI的数据来自度量系统
Spark Web UI(Spark2.3之后)Web UI的数据来自AppStatusStore
spark.ui.killEnabled(在Spark Web UI界面中展示强行杀掉Spark Application Job的开关,默认为true)
AppStatusStore
作用
AppStatusStore提供Spark程序运行中各项监控指标的键值对化存储。Web UI中见到的数据指标基本都存储在这里
初始化代码
private var _statusStore: AppStatusStore = _
_statusStore = AppStatusStore.createLiveStore(conf)
_statusStore = AppStatusStore.createLiveStore(conf)
总结
AppStatusStore是在Spark 2.3.0版本才加入的,一个KVStore包装器,允许跟踪特定类型的元素数量和触发
一旦达到阈值就采取行动。 例如,这允许编写者控制多少数据
通过在添加新数据时可能删除旧数据来存储。
一旦达到阈值就采取行动。 例如,这允许编写者控制多少数据
通过在添加新数据时可能删除旧数据来存储。
AppStatusStore的构造依赖
ElementTrackingStore(为键值对存储KVStore)
AppStatusListener
AppStatusStore的实现依赖InMemoryStore与ElementTrackingStore,数据来源通过AppStatusListener写入
SparkStatusTracker
作用
SparkStatusTracker提供报告最近作业执行情况的低级API。
初始化代码
private var _statusTracker: SparkStatusTracker = _
_statusTracker = new SparkStatusTracker(this, _statusStore)
_statusTracker = new SparkStatusTracker(this, _statusStore)
总结
它的内部只有6个方法,从AppStatusStore中查询并返回诸如Job/Stage ID、活跃/完成/失败的Task数、Executor内存用量等基础数据。
它只能保证非常弱的一致性语义,也就是说它报告的信息会有延迟或缺漏。
SparkStatusTracker负责对Job和Stage的监控,实际也是使用了JobProgressListener中的监控数据,并额外进行了一些加工
ConsoleProgressBar
作用
ConsoleProgressBar显示Stage的进度
初始化代码
_progressBar =
if (_conf.get(UI_SHOW_CONSOLE_PROGRESS) && !log.isInfoEnabled) {
Some(new ConsoleProgressBar(this))
} else {
None
}
if (_conf.get(UI_SHOW_CONSOLE_PROGRESS) && !log.isInfoEnabled) {
Some(new ConsoleProgressBar(this))
} else {
None
}
总结
通过轮询SparkStatusTracker来获取数据,通过spark.ui.consoleProgress.update.interval设置时长,默认200毫秒
开启
spark.ui.showConsoleProgress=true(默认开启)
日志级别至少是WARN
DAGScheduler
作用
DAGScheduler主要负责分析用户提交的应用,并根据计算任务的依赖关系建立DAG,然后将DAG划分为不同的Stage,其中每个Stage由可以执行的一组Task构成,这些Task的执行逻辑完全相同,只是作用于不同的数据
初始化代码
_dagScheduler = new DAGScheduler(this)
总结
RDD的action操作,将会触发handleJobSubmitted方法
处理的时候,会先创建一个resultStage,每个job只有一个resultstage,其余的都是shufflestage.然后根据rdd的依赖关系,按照广度优先的思想遍历rdd,遇到shufflerdd就创建一个新的stage。
形成DAG图后,遍历等待执行的stage列表,如果这个stage所依赖的父stage执行完了,它就可以执行了;否则还需要继续等待。
最终stage会以taskset的形式,提交给TaskScheduler,然后最后提交给excutor。
HeartbeatReceiver
作用
HeartbeatReceiver是心跳接收器。Executor需要向Driver定期发送心跳包来表示自己存活。它本质上也是个监听器,继承了SparkListener
初始化代码
_heartbeatReceiver = env.rpcEnv.setupEndpoint(
HeartbeatReceiver.ENDPOINT_NAME, new HeartbeatReceiver(this))
HeartbeatReceiver.ENDPOINT_NAME, new HeartbeatReceiver(this))
总结
需要在“createTaskScheduler”之前注册“HeartbeatReceiver”,因为Executor将在构造函数中检索“HeartbeatReceiver”
executor每10s心跳一次startDriverHeartbeater(),conf配置spark.executor.heartbeatInterval
executor心跳超时时间120s,conf配置spark.network.timeout
SchedulerBackend
作用
SchedulerBackend是Apache Spark 中任务调度系统的抽象,可以调度来自各种集群管理器的资源(例如内置的Spark Standalone,Hadoop YARN,Kubernetes,Apache Mesos和Spark local)
初始化代码
val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode)
_schedulerBackend = sched
_schedulerBackend = sched
主要实现
LocalSchedulerBackend
MesosFineGrainedSchedulerBackend
CoarseGrainedSchedulerBackend
YarnClientSchedulerBackend
YarnClusterSchedulerBackend
StandaloneSchedulerBackend
...
...
TaskScheduler
作用
TaskScheduler是Spark中的底层调度器,目前只被TaskSchedulerImpl具体实现。
初始化代码
val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode)
_taskScheduler = ts
_taskScheduler = ts
总结
跟随SchedulerBackend一起被初始化
TaskScheduler的具体实现TaskSchedulerImpl会通过SchedulerBackend根据不同的集群模式对tasks进行调度
0 条评论
下一页