spark任务执行
2024-04-17 17:20:00 8 举报
spark任务执行中的一些流程描述图,仅供参考
作者其他创作
大纲/内容
WorkerOffer
task
run()
runJob() -> stage切分提交
handleAllocatedContainers()
YarnClusterSchedulerBackend (Executor)
Client
Inbox
runDriver()
ApplicationMaster
runAllocatedContainers() { new ExecutorRunnable().run()}
Driver线程开启
6. 启动Executor
allocate()
main()
YarnClientSchedulerBackend
2. 启动ApplicationMaster
startUserApplication()
receiveLoopRunnable
setupEndpoint()
ExecutorBackend
通知主线程
Dispatcher
taskRunner.run()
waitBackendReady
awaitTermination(): Unit = { shutdownLatch.await() }
SparkSubmit
Spark任务提交流程图
startContainer()
yarnClient
runMain()
ResourceManager
StatusUpdates
inbox
8. 注册成功
SparkContext
Executor的启动与运行
Executor端
main线程runDriver()
YarnClusterApplication
awaitTermination()
Executor
Driver线程userThread
YarnClusterSchedulerBackend
SchedulerBackend
ExecutorRunnable
3. 启动driver线程,初始化SparkContext
AMRMClient
CoarseGrainedSchedulerBackend
launchTask(){ val tr = new TaskRunner() threadPool.execute(tr)}
stage1
调度流程
注册AM申请资源启动Executors
process{ case RpcMessage: case OneWayMessage: case OnStart: case OnStop:}
程序执行
submitApplication()
通知driver线程启动
NodeManager
9. 创建Executor计算对象
amClient.allocate()
createAllocator()
SparkSubmitArgumentsspark-submit参数解析与封装
7. 注册Executor
ThreadPool
TaskScheduler
splitPendingAllocationsByLocality()
Spark-submit任务提交
MessageLoop
5. 返回可用资源列表
updateResourceRequests()
SparkContext初始化
start()
sparkContextInitialized()
YarnRMClient
YarnCoarseGrainedExecutorBackend
YarnSchedulerBackend
runAllocatedContainers()
LocalityPreferredContainerPlacementStrategy
stage3
Driver
SchedulerBackend继承关系(spark-core+spark-yarn)
4. 注册AM,申请资源
stage2
DAGScheduler
doSubmit()
receive{ case RegisteredExecutor: case LaunchTask: case KillTask: case StopExecutor: case Shutdown:}
YarnAllocator
计算完成,线程关闭
StandaloneSchedulerBackend
resumeDriver()
任务调度
LaunchTask
job
allocateResources()
善后工作unregistercleanupStagingDir
Yarn Cluster
NettyRpcEnv
submitTasks()
Driver端
LocalSchedulerBackend
localityOfRequestedContainers()
1. submitApplication
receiveLoop(): Unit = { while(true) { val inbox = active.take() inbox.process(dispatcher) }}
registerAM()
0 条评论
下一页