Spark执行流程
2021-10-29 20:59:05 24 举报
spark的全流程
作者其他创作
大纲/内容
SparkContext
4
CoarseGrainedExecutorBackend
14.准备Executor
小段总结:1)ApplicationMaster是进程2)Driver是其中的线程3)先用Driver初始化你的SparkContext环境4)再创建Executor5)再执行你的逻辑代码
分区0 (task0)
new Client
YarnClusterApplication
第151行private def loadEnvironmentArguments(): Unit = { ... ... // action默认赋值submit,这就对应了SparkSubmit第90行的匹配的SUBMIT action = Option(action).getOrElse(SUBMIT) //第227行 }
8.2
ShuffleRDD
5.1
27
分区2
ResultStage
span style=\"font-size: inherit;\
class OneToOneDependency[T](rdd: RDD[T]) extends NarrowDependency[T](rdd) { ...//调用父类的构造方法}
Executor
Thread
task运行
18
20
使用java脚本启动一个YarnCoarseGrainedExecutorBackend进程用于Executor通信后台
注册应用程序
override def reviveOffers(): Unit = { driverEndpoint.send(ReviveOffers)//自己给自己发送任务消息 }
26
10
Dirver的SparkEnv,Executor也和这个差不多
task
分区0
override def run(): Unit = { val res = task.run(..)//其中包含一个runTask()的而抽象方法,这就代表着每一个task都会进行重写,那就意味着每个task都有自己的计算逻辑}
spark分两条线,一条是准备资源,一条是计算
outbox有几个目标就有几个
7
创建taskSet
ApplicationMaster进程
ShuffledRDD
19
Dispatcher
TaskSet
14
TaskManager
窄依赖
3
11
TaskPool中有多个这样的TaskManager
连接资源和计算两条线
DAGScheduler
21
同时开启两条线程,但计算线程的执行的逻辑是等资源线程完成后才开始执行,有个while循环判断,所以有一个方法一直在等待资源环境是否准备好,另外一个线程就去走Driver执行自己main方法资源这条路,直到executor反向注册才通知计算这条路已经准备好了
SparkSubmit类
RDD
YarnCoarseGrainedExecutorBackend后台进程
def resourceOffers() = { for (currentMaxLocality <- taskSet.myLocalityLevels) {//第462行 ...//选择最有近的executor节点,相当于机架感知,但不一定能将task准确地发送到数据所在的executor,但尽量移动计算不移动数据}
24
15
Client
分区0task0
private def resumeDriver(): Unit = { // 通知程序可以往下走了,也就是执行我们自己的逻辑代码 sparkContextPromise.synchronized { sparkContextPromise.notify() } }
loadEnvironmentArguments()
13.1准备Driver
28
8.1
MessageLoop$DedicatedMessageLoop
9:若有父依赖
第220行prepareSubmitEnvironment(...) { var childMainClass = \"\" ... ... // yarn集群模式 if (isYarnCluster) {//第714行 YARN_CLUSTER_SUBMIT_CLASS=\"org.apache.spark.deploy.yarn.YarnClusterApplication\
Executor的诞生
9.2
集群管理器
ShuffleDependency
宽依赖
17
def run(): Unit = {//第1176行 this.appId = submitApplication()...}
ShuffleMapStage
分区1task1
依赖形成
启动Process(SparkSubmit)
22
终端运行spark-submit,其中%SPARK_CMD%就是你在终端输入的启动命令,然后底层java执行 java -cp org.apache.spark.deploy.SparkSubmit
9
启动JVM虚拟机
分区1(task2)
12
ExecutorRunnable(处于线程池中)所以它属于AppMaster进程中的一个线程,和Driver不是一个线程
def post(event: E): Unit = { if (!stopped.get) { if (eventThread.isAlive) { eventQueue.put(event) //将事件存入队列 ... }
23
开始启动Executor
SparkContext中初始化
反向注册
CoarseGrainedSchedulerBackend(Driver的通信线程)
PairRDDFunctions,它是通过隐式方法从RDD转换过来的
阶段划分
TaskPool
SparkSubmitArguments类
CoarseGrainedSchedulerBackend(Driver的通信线程)CoarseGrainedExecutorBackend(Executor的通信线程)
13.3
Executor启动
20.1
13
abstract class NarrowDependency[T](_rdd: RDD[T]) extends Dependency[T] { def getParents(partitionId: Int): Seq[Int] //这里获取的是上游的rdd override def rdd: RDD[T] = _rdd}
MapPartitionsRDD
第108行,这个对象一被new就会执行此方法,因为这相当于在构造器中调用的此方法parse(args.asJava)loadEnvironmentArguments()//第115行
计算和数据的位置存在不同的级别,优先级依次递减1进程本地化:数据和计算在同一个进程中2节点本地化:数据和计算在同一个节点中3机架本地化:数据和计算在同一个机架中4任意
override def onReceive(event: DAGSchedulerEvent): Unit = { doOnReceive(event)//第2152行}
createYarnClient
29
def createContainerLaunchContext()={ val amClass = if (isClusterMode) {//根据yarn的模式(集群或客户端)启动不同的组件 Utils.classForName(\"org.apache.spark.deploy.yarn.ApplicationMaster\").getName } else { Utils.classForName(\"org.apache.spark.deploy.yarn.ExecutorLauncher\").getName } ...//第1000行 Seq(Environment.JAVA_HOME.$$() + \"/bin/java\
分区2(task2)
第432行private def createAllocator(..)={ //这个client就是之前的YarnRMClient allocator = client.createAllocator(...) //第465行 ... allocator.allocateResources() //第479行,接收可用的资源}
inbox在一个RpcEndpoint只有一个
YarnClientImpl
YarnClusterScheduler
10:若有shuffle依赖,没有不执行
private def makeOffers(): Unit = { //取任务操作,task一般是均匀的轮询到每个节点,底层会执行调度算法 scheduler.resourceOffers(workOffers) if (taskDescs.nonEmpty) {//如果任务不为空启动任务 launchTasks(taskDescs)}
第254行def allocateResource()= synchronized{//获取所有可分配的容器(container)val allocatedContainers = allocateResponse.getAllocatedContainers() allocatorBlacklistTracker.setNumClusterNodes(allocateResponse.getNumClusterNodes) // 可分配的容器大于0,才可以进行分配 if (allocatedContainers.size > 0) { ...... // 处理可分配的容器 handleAllocatedContainers(allocatedContainers.asScala) }
这个job中一共5个task
// yarnClient主要用来和RM通信,创建了ResourceManager protected ApplicationClientProtocol rmClient; ... ... public YarnClientImpl() { super(YarnClientImpl.class.getName()); }
9.1
触发job
5.3
分区1
public static YarnClient createYarnClient() { YarnClient client = new YarnClientImpl(); return client; }
//第133行 ,在容器中执行的脚本指令//在另外一个NM上启动Executor的通信后台进程,此时Executor还没产生//注意此时还没有生成Executor,而是启动一个YarnCoarseGrainedExecutorBackend,而它会注册自己private def prepareCommand(): List[String] = {//拼接java指令 ... ... YarnSparkHadoopUtil.addOutOfMemoryErrorArgument(javaOpts) val commands = prefixEnv ++ Seq(Environment.JAVA_HOME.$$() + \"/bin/java\
//在SparkContext的初始化完成后会调用这个方法_taskScheduler.postStartHook()
19.1
def run(..)={ driver = fetcher.setupEndpointRefByURI(arguments.driverUrl)//第303行,创建一个driver和Driver进行通信 val env = SparkEnv.createExecutorEnv(...)//第331行,建立一个Executor的通信环境 //backendCreateFn就是用于生成CoarseGrainedExecutorBackend对象的方法,并注册为通信终端,并在后面inbox中通过添加OnStart激活,等于是在当前通信环境安装一个通信终端 env.rpcEnv.setupEndpoint(\"Executor\
30反向注册
Dependency
执行main函数
RpcEndpointRef类主要用于发送数据def sendef ask
第985行override def main(args: Array[String]): Unit = { submit.doSubmit(args) //执行提交流程}
发送任务
第63行def run(): Unit = { logDebug(\"Starting Executor Container\") ////创建与某个nmClient与NM关联 nmClient = NMClient.createNMClient() nmClient.init(conf)//初始化 nmClient.start()//建立连接 startContainer()//启动Executor Container }
NodeManager
CoarseGrainedSchedulerBackend,在初始化SparkContext的时候就被声明了,Driver的通信线程被_taskScheduler.start()时启动
5
阶段总结:1)先判断RDD依赖2)再划分阶段3)再根据每个阶段的最后RDD的分区数划分任务4)任务调度FIFO5)任务分发执行
31
RM选择一个NM运行传过来的指令启动ApplicationMaster进程bin/java org.apache.spark.deploy.yarn.ApplicationMaster或者bin/java org.apache.spark.deploy.yarn.ExecutorLauncher
第263行final def run(): Int = { ... ...//若是集群模式就运行Driver if (isClusterMode) { runDriver() } else{ runExecutorLauncher() }
CoarseGrainedExecutorBackend(Executor通信线程)
OneToOneDependency
执行到action算子
阶段总结:1)若没有shuffle过程,程序至少有一个ResultStage阶段2)若有shuffle过程,阶段数=shuffle依赖数+13)RDD中每个分区都会单独形成一个task
RpcEndpoint类主要用于接收数据def receivedef receiveAndReply
ThreadPool
13.4
6
HadoopRDD
YarnAllocator
1
private def launchTasks(tasks: Seq[Seq[TaskDescription]]): Unit = { val serializedTask = TaskDescription.encode(task)//将任务序列化,编码 for (task <- tasks.flatten) { //循环取出任务,若任务数量没超过其限制,没超过则找到对应executor终端给其发送消息启动task执行 //而这个executor终端就是根据本地化级别选出来的 executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))//第368行 }}
任务划分
override def postStartHook(): Unit = { //唤醒runDriver中的awaitResult()就是唤醒等待SparkContext的那个线程,让它往下执行去创建资源环境 ApplicationMaster.sparkContextInitialized(sc) super.postStartHook()//又开启一个等待线程等待程序往下走,而它需要runDriver中的resumeDriver来唤醒,唤醒后就可以执行自己的逻辑代码了 logInfo(\"YarnClusterScheduler.postStartHook done\") }
bin/spark-submit \\--class org.apache.spark.examples.SparkPi \\--master yarn \\--deploy-mode cluster \\./examples/jars/spark-examples_2.12-3.0.0.jar \\10
5.2
25
相当于这里的获取的rdd就是它
NettyRpcEnv
16
任务提交
Driver线程的诞生
8.3
EventLoop
TaskSchedulerImpl
重写
prepareSubmitEnvironment(args)
8
任务调度
private val yarnClient = YarnClient.createYarnClient//创建yarn客户端
7.1
stage划分
Driver运行
parse(args.asJava)
parseArguments(args)
2
13.2
第97行protected def parseArguments(args: Array[String]){ new SparkSubmitArguments(args) //第98行,构建一个对象}
0 条评论
下一页