Spark源码流程
2022-03-23 16:50:52 1 举报
spark源码流程
作者其他创作
大纲/内容
进入org.apache.spark.executor.CoarseGrainedExecutorBackend#onStart
def submitApplication(): ApplicationId = { ResourceRequestHelper.validateResources(sparkConf) var appId: ApplicationId = null try {① //这下面三个就是客户端启动了,和服务器建立了连接了 launcherBackend.connect() yarnClient.init(hadoopConf) yarnClient.start()同样下面有一段代码:② //这边就是告诉我们的ResourceManager我要创建一个应用val newApp = yarnClient.createApplication()val newAppResponse = newApp.getNewApplicationResponse()//创建了应用得到一个响应idappId = newAppResponse.getApplicationId()在下面又有一段代码:③ 创建容器的启动环境:val containerContext = createContainerLaunchContext(newAppResponse)④ 创建提交的环境:val appContext = font color=\"#ff0000\
setupEndpoint进入然后ctrl+H找到org.apache.spark.rpc.netty.NettyRpcEnv#setupEndpoint
进入ApplicationMasterArguments后下面有parseArgs(args.toList)就是解析参数,比如:--jar,--class,-arg其实就是我们的命令行参数,一直在传,但是用不上
回到:org.apache.spark.executor.CoarseGrainedExecutorBackend#onStart
找到:if (isClusterMode) {//如果是集群模式,我们就是进入这个模式 runDriver()//进入} else {//如果是client模式就到这边runExecutorLauncher()}
进入createYarnClient
再进入YarnClientImpl
这个main方法就是我们自己程序的,然后会执行到我们的这段代码准备环境val sparConf = new SparkConf().setMaster(\"local\").setAppName(\"WordCount\
org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.DriverEndpoint#receive{找到:这边接收了LaunchedExecutor然后启动了case LaunchedExecutor(executorId) =>//增加核数 executorDataMap.get(executorId).foreach { data => data.freeCores = data.totalCores }//做一些操作 makeOffers(executorId)}到了这边整个环境才就ok了(Driver、Executor、ResourceManager、NodeManager都有了,剩下的就是你的一些作业的功能,任务的划分,阶段的划分之类的)
containerContext是container启动环境发现:就是启动环境,本地的一些东西还有一个java虚拟机的配置的什么的private def createContainerLaunchContext(newAppResponse: GetNewApplicationResponse) : ContainerLaunchContext = { logInfo(\"Setting up container launch context for our AM\") val appId = newAppResponse.getApplicationId//这段代码一直往下走看到 这个代码又启动了一个程序//这个指令会被封装一下放到下面的容器中val commands = prefixEnv ++ Seq(Environment.JAVA_HOME.$$() + \"/bin/java\
注意maven中要加才能找到这个YarnClusterApplication类:<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-yarn_2.12</artifactId> <version>3.0.0</version></dependency>
进入org.apache.spark.rpc.netty.Dispatcher#registerRpcEndpoint
回到org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.DriverEndpoint#receive
def run(): Unit = { logDebug(\"Starting Executor Container\")//线程池里面有ExecutorRunnable,这个ExecutorRunnable里面有nmClient属性,这个就是用来创建与某一个NodeManager的关联, nmClient = NMClient.createNMClient()//那么这个关联NodeManager就开始初始化,启动之类的 nmClient.init(conf) nmClient.start()//然后启动Container startContainer()进入}
send发送消息,org.apache.spark.executor.CoarseGrainedExecutorBackend#receive接收消息
我们当前的这个CoarseGrainedExecutorBackend对象就是一个RpcEndpoint,那么CoarseGrainedExecutorBackend就要遵循生命周期意味着当你把收件箱放一个onStart之后我们的CoarseGrainedExecutorBackend就应该可以收到消息,CoarseGrainedExecutorBackend能收到消息的话这边就有一个onStartorg.apache.spark.executor.CoarseGrainedExecutorBackend#onStart
进入①parseArguments方法
进入submitApplication
注意⑤这边
到⑤这边
进入run方法
进入⑥ resumeDriver()方法
到③结束,这边我们的代码和上下文环境准备好之后就会去唤醒那边的线程
到④这边
进入到org.apache.spark.deploy.yarn.ExecutorRunnable#run
进入ApplicationMaster
进入① org.apache.spark.deploy.yarn.ExecutorRunnable#prepareCommand
private def resumeDriver(): Unit = { // When initialization in runDriver happened the user class thread has to be resumed. sparkContextPromise.synchronized {//这个Driver就会去通知notify,通知以后就会继续往下走了 sparkContextPromise.notify() }}到了这里对于我们来说现在我们就可以让Driver程序继续往下,Driver程序继续往下就意味着后面的我们的逻辑代码就可以开始执行了,后面就开始我们的各种任务阶段之类的划分了。
找到ApplicationMaster中的main方法:① //把我们的命令行参数做一个封装 val amArgs = new ApplicationMasterArguments(args)进入 ② //创建Yarn配置以及创建ApplicationMasterval yarnConf = new YarnConfiguration(SparkHadoopUtil.newConfiguration(sparkConf))master = new font color=\"#ff0000\
这边最重要的是这个参数mainClass = Utils.classForName(childMainClass)是怎么来的,来自这边:prepareSubmitEnvironment(args)发现:var childMainClass = \"\",我们往下找到给childMainClass赋值的代码:childMainClass = YARN_CLUSTER_SUBMIT_CLASS,点击这个参数发现:YARN_CLUSTER_SUBMIT_CLASS = \"org.apache.spark.deploy.yarn.YarnClusterApplication\"
进入② submit方法
这个对象里面有个方法private[spark] class ClientArguments(args: Array[String]) {这个方法是parseArgs,这个方法解析了各种参数(这些参数就是我们外部传进入的参数) private def parseArgs(inputArgs: List[String]): Unit =
进入② 发送消息给org.apache.spark.SparkContext#_schedulerBackend
搜索org.apache.spark.deploy.yarn.ApplicationMaster
发现:new SparkSubmitArguments(args)//构建一个参数对象
start方法被⑤调用了
回到
进入ClientArguments
def run(): Unit = {//这个就是提交应用程序,这个方法返回一个叫appId,这appId是全局yarn的应用id,后续的应用报告、状态这些都是可以通过appId去查询,所以后面这些代码是对当前应用程序的状态进行操作,以及它的一些执行报告。 this.appId = submitApplication
发现:这个val font color=\"#ff0000\
进入run
这边的Inbox是说我们当前的结点里面会有一个Inbox,这个Inbox可以往里面发送消息,往下找:@GuardedBy(\"this\")protected val messages = new java.util.LinkedList[InboxMessage]()inbox.synchronized { messages.add(OnStart)//发送消息}这个消息就叫OnStart,这个OnStart发给自己,发给自己告诉自己要做一件事情叫OnStart事情,,怎么理解呢?找到private[spark] trait RpcEndpoint 这个就是通信的一个终端,用来做通信的,这个就是通信的一个终端,用来做通信的,这个通信的环境有一个生命周期{@code constructor -> onStart -> receive* -> onStop}这个生命周期表述的是四个阶段这边就有一个onStart
进入runAllocatedContainers
val allocateResponse = amClient.allocate(progressIndicator)//可分配的容器val allocatedContainers = allocateResponse.getAllocatedContainers()//如果可分配的容器大于0就可以进行分配了if (allocatedContainers.size > 0) { logDebug((\"Allocated containers: %d. Current executor count: %d. \" + \"Launching executor count: %d. Cluster resources: %s.\
@Publicpublic static YarnClient createYarnClient() { YarnClient client = new YarnClientImpl(); return client;}
① //这个方法里面就是把我们可分配的容器做一个分类,因为比方说我们需要20个容器,那这20个容器之间什么关系,这个方法会把他整理一下,比如说是不是在同一台主机上,这边就涉及到我们之前说的首选位置def handleAllocatedContainers(allocatedContainers: Seq[Container]): Unit = ........一大堆代码都是分配好容器② //在当分配好容器之后到这个方法下面:runAllocatedContainers(containersToUse)进入
SparkSubmitArguments类,这里面就是各种属性,往下找到parse(args.asJava)这个是把命令行参数进行解析,进入找到:action = Option(action).getOrElse(SUBMIT)判断action是否有值,如果没有值就给一个SUBMIT,把SUBMIT传给action
④中的代码就是之前有提到的判断mainClass是否继承了SparkApplication,是的情况下用反射来创建YarnClusterApplication对象 val app: SparkApplication = if (classOf[SparkApplication].isAssignableFrom(mainClass)) { mainClass.getConstructor().newInstance().asInstanceOf[SparkApplication]} else { new JavaMainApplication(mainClass)}
往下找://这个org.apache.spark.executor.YarnCoarseGrainedExecutorBackend就是Executor的通信后台Seq(\"org.apache.spark.executor.YarnCoarseGrainedExecutorBackend\
① //这段代码启动用户的应用程序,这边要准备好上下文环境对象,下面才能继续走,这个方法非常重要 userClassThread = font color=\"#ff0000\
执行好我们的代码之后:这边的阻塞就有放开继续往下走了。
进入⑤createAllocator方法
进入org.apache.spark.executor.CoarseGrainedExecutorBackend#run
//这个构造方法往上找public YarnClientImpl() { super(YarnClientImpl.class.getName());} //找到属性rmClient:这个就是ResourceManager,这个就是yarn的资源调度结点protected ApplicationClientProtocol rmClient;
① //用到了类加载器,这个类加载器会来自args.userClass的类,点击args.userClass这个之后会发现这个类就是我们命令行参数传进来的类名称val mainMethod = userClassLoader.loadClass(args.userClass)② //从我们指定的类中找到main方法 .getMethod(\"main\
进入org.apache.spark.deploy.yarn.ExecutorRunnable#startContainer
进入spark-submit文件发现:exec \"${SPARK_HOME}\"/bin/spark-class org.apache.spark.deploy.SparkSubmit \"$@\"当你去启动一个Java虚拟机执行一个进程,一个类的时候,它会走SparkSubmit.main方法拷贝org.apache.spark.deploy.SparkSubmit到idea源码里面找到它的伴生对象注意:你用jsp查看spark的进程也是叫SparkSubmit
进入org.apache.spark.deploy.SparkSubmitArguments
搜索这个org.apache.spark.executor.YarnCoarseGrainedExecutorBackend
回到:org.apache.spark.SparkContext#_schedulerBackend这个就是通信后台,正好和上面的发送过来的信息对接 private var _schedulerBackend: SchedulerBackend进入 = _到这边
所以接下来我们进入org.apache.spark.deploy.yarn.YarnClusterApplication
执行一个Spark程序从提交应用开始:bin/spark-submit \\--class org.apache.spark.examples.SparkPi \\--master yarn \\--deploy-mode cluster \\./examples/jars/spark-examples_2.12-3.0.0.jar \\ 10
进入startUserApplication到这边:
进入① prepareSubmitEnvironment(args)方法
进入Client
进入SparkSubmit
进入org.apache.spark.rpc.netty.Inbox
进入runDriver方法
进入SchedulerBackend,然后Ctrl+H找到集群模式org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
进入到org.apache.spark.rpc.netty.DedicatedMessageLoop
private val client = new YarnRMClient()进入
进入②runMain方法
进入handleAllocatedContainers处理和分配的容器
可以看到这createApplicationSubmissionContext里面只是一些参数什么的
进入④ createApplicationSubmissionContext
进入③ createContainerLaunchContext
回到:CoarseGrainedExecutorBackend
0 条评论
回复 删除
下一页