spark diver资源申请任务提交时序图
2021-02-25 15:50:50 2 举报
spark yarn-cluster 资源申请任务提交时序图
作者其他创作
大纲/内容
添加到消息队列中 receivers.offer(data)
receivers:LinkedBlockingQueue[EndpointData]
dagScheduler.handleJobSubmitted
extends EventLoop[DAGSchedulerEvent]
YarnRMClient
mainMethod.invoke
yarnClient.start()
run()
new()
prepareCommand()(bin/java CoarseGrainedExecutorBackend)
rpcEnv.setupEndpoint()
Client
new Thread().run()
new DriverEndpoint
processRpcRequest
NettyRpcEnv
createAMRMClient()
YarnClusterApplication
instanceofResponseMessage
Dispatcher
font color=\"#000099\
instanceofRequestMessage
SchedulerBackend
createSparkEnv()
(class)ApplicationMaster
SparkSubmitArguments
DAGSchedulerEventProcessLoop
excu CMD(bin/java org.apache.spark.deploy.SparkSubmit)
prepareSubmitEnvironment(args)
org.apache.spark.launcher.Main.main()-->builder.buildCommand(env)
DriverEndpoint
org.apache.spark.launcher.Main.main
submitApplication()
doOnReceive()
OneWayOutboxMessage
sendwith()
org.apache.spark.deploy.yarn.Client
ThreadDAGSchedulerEventProcessLoop
runJob()
registerAM()
createDriverEndpoint()
driver Thread
runImpl()
AMRMClient(和RM交互)
_taskScheduler.start()
org.apache.spark.deploy.SparkSubmit.main
createServer()
handleAllocatedContainers
classForName
request instanceof ChunkFetchRequest || RpcRequest || OneWayMessage || StreamRequest
data.inbox.process()
虚线箭头在别处调用Thread.start()后启动
eventQueue阻塞队列
JVM主要验证submit参数
eventQueue: BlockingQueue
case RegisterExecutor
executorRef.send(RegisteredExecutor)
org.apache.spark.deploy.SparkSubmit
NettyRpcEnvFactory
action 算子
onReceive()
createTaskScheduler
new
TransportChannelHandler
TransportClientTransportResponseHandlerTransportRequestHandler
channelActivechannelInactivefont color=\"#0000cc\
SparkSubmit JVM
request instanceof RequestMessage or ResponseMessage
SparkEnv
DAGScheduler
run(while(!stopped.get)..
dagScheduler.runJob()
ChannelInboundHandlerAdapter
backend.start()
Dispatcher
create()
take()
CMD(java .....)
getOrCreateParentStages()
createNMClient()init(conf)start()
new DAGScheduler
createDriverEnv()
eventQueue.put(event)
createSchedulerBackend
spark-submit.sh
createDriverEndpointRef
MessageLoop: Thread
new Thread(driver)
RpcRequest
Spark-class.sh
rmClient
Inbox
createYarnClient
start()
inbox.process()
TransportContext
new YarnRMClient()
TransportClient
send()
runMain
runDriver()
createResultStage()
YarnAllocator
startUserApplication()
clientRegisteredExecutor
Server
postMessage()
(object)ApplicationMaster.main
spark RPC通信
userThread.setName(\"Driver\")
NettyRpcHandler
font color=\"#000066\
ExecutorRunnable
EndpointData
DriverEndpoint in CoarseGrainedSchedulerBackend
endpoint.receiveAndReply()
1. SparkSubmit // 启动进程 -- main // 封装参数 -- new SparkSubmitArguments // 提交 -- submit // 准备提交环境 -- prepareSubmitEnvironment // Cluster -- childMainClass = \"org.apache.spark.deploy.yarn.YarnClusterApplication\
new Thread(和NM交互)
NMClient
new Thread()
ApplicationMaster JVM
TransportServer(netty)
run(while(true))
startContainer()
registerApplicationMaster
eventQueue.take()
responseHandler.handle((ResponseMessage) request)
RpcMessage
take()是阻塞的
getShuffleDependencies()
doRunMain()
runAllocatedContainers
requestHandler.handle((RequestMessage) request)
new SparkSubmitArguments
userAPP.main
case
getOrCreateShuffleMapStage
requestHandler.handle()
NettyRpcHandler
onReceive(eventQueue.take())
startServer()
allocateResources()
SparkContext
submitJob()
TaskScheduler
RpcEnv
YarnClient
register()
new Thread ().start()userAPP
post()
new ResultStage
parse(args.asJava)
0 条评论
下一页