Spark内部原理
2022-03-29 16:00:06 0 举报
Spark内部原理
作者其他创作
大纲/内容
NodeManager
通信模块RPC
YarnClusterApplication
Task
Thread
Executor
①yarnClient = YarnClient.createYarnClient这个属性非常重要②YarnClient client = new YarnClientImpl();③this.appId = submitApplication()这个就是提交应用程序,这个方法返回一个叫appId,这appId是全局yarn的应用id,后续的应用报告、状态这些都是可以通过appId去查询,所以后面这些代码是对当前应用程序的状态进行操作,以及它的一些执行报告。④下面三个就是客户端启动了,和服务器建立了连接了 launcherBackend.connect() yarnClient.init(hadoopConf) yarnClient.start()⑤准备各种环境,如:创建了应用得到一个响应id、创建容器的启动环境、创建提交的环境注意:/bin/java org.apache.spark.deploy.yarn.ApplicationMaster 这个命令就是在这边封装的⑥yarnClient等于建立了ResourceManager连接,submitApplication等于提交,yarnClient.submitApplication(appContext)
CoarseGrainedExecutorBackend(Executor,jps查看就叫YarnCoarseGrainedExecutorBackend)
1.1 脚本启动执行
6.4 onStart
ThreadPool
YarnRMClient
10. 告知Executor启动成功
3. AM根据参数,启动Driver线程并初始化SparkContext
9. 创建Executor计算对象
YarnClient
用来和ApplicationMaster里面的driver通信的环境
7. 注册Executor
1.4 封装提交参数和命令bin/java ApplicationMaster--jar … --class …①属性rmClient:这个就是ResourceManager,这个就是yarn的资源调度结点protected ApplicationClientProtocol rmClient;
Client
YarnCoarseGrainedExecutorBackend其实是用来做通信的,CoarseGrainedExecutorBackend才是真正的Executor①注册RPC的通信终端、通信的地址、通信的引用②通过driverPropsFetcher的ask发送了一个给Driver的请求,这个消息就是注册我当前的Executor执行器, 然后Driver肯定会收到③接受到Driver发送的true消息,接着在CoarseGrainedExecutorBackend自己发了一条消息告诉自己当前已经注册好了 然后发送执行的LaunchedExecutor的命令:driver.get.send(LaunchedExecutor(executorId))④CoarseGrainedSchedulerBackend中有找到:case LaunchedExecutor(executorId) => 这边接收了LaunchedExecutor然后启动了Executor
①这边就会创建一个分配器createAllocator,这个分配器就是从yarn那边获取的②通过分配器得到allocateResources就是我们可分配的资源,yarn通过allocateResources需要把哪些资源让你用给你返回回去
1.5 提交任务信息submitApplication
driverPropsFetcher
ApplicationMaster(是一个进程,jps查看就叫ApplicationMaster)
6.3 启动Executorbin/java CoarseGrainedExecutorBackend
SparkSubmitArguments
launcherPool
1.2 SparkSubmitArguments解析参数①SparkSubmitArguments这里面就是各种属性的封装②用正则表达式的方式对参数进行解析,通过正则表达式拿到参数名称,通过正则表达式拿到参数值--master --class
6.2 ExecutorRunnalbe(NMClient)①ExecutorRunnalbe里面有nmClient属性,这个就是用来创建与某一个NodeManager的关联②这个关联NodeManager就开始初始化,启动之类的 nmClient.init(conf) nmClient.start() 然后启动Container startContainer()③prepareCommand()这个方法里面封装了YarnCoarseGrainedExecutorBackend就是Executor的通信后台,这个对象里面又封装了CoarseGrainedExecutorBackend这个是Executor真正的进程名称
8. 注册成功
3.1 执行代码3.2 初始化SparkContext①SparkContext里面有private var _schedulerBackend: SchedulerBackend,这个就是通信后台,和driverPropsFetcher发送过来的信息对接②包括Executor总的核数、注册的数量③最后回复了一个true,说明成功了
11.分配任务
6.1 launcherPool(线程池)①new ExecutorRunnable(.......参数).run()
10 任务切分
2 启动ApplicationMaster
span style=\"font-size: inherit;\
5.返回资源可用列表
bin/spark-submit \\--class WordCount \\--master yarn \\--deploy-mode cluster \\./WordCount.jar \\./input ./output
Yarn的ResourceManager
SparkSubmit的伴生对象的main方法①这边就是new一个对象 val submit = new SparkSubmit()②submit.doSubmit(args)这边是执行提交,这个args就是我们上面main传进来的参数③appArgs.action SUBMIT、KILL、REQUEST_STATUS、PRINT_VERSION这边是应用参数的动作,匹配某一个分支,这边默认情况下就是SUBMIT,这个时候就可以提交了④构建一个参数对象new SparkSubmitArguments(args)span style=\"font-size: inherit;\
Driver
ExecutorRunnalbe(NMClient)
SparkSubmit
0 条评论
回复 删除
下一页