马中华--Spark任务执行流程分析
2020-06-28 10:02:11 1 举报
Spark任务执行过程源码分析
作者其他创作
大纲/内容
driver.send(RegisteredApplication(appID)
第一件事:初始化TaskScheduler
当 Worker 接收到 LaunchExecutor 消息的时候,把 Executor 的信息都封装在一个 ExecutorRunner 的对象中,调用 start() 方法启动Executor,并且在 Executor 内部初始化一个 ThreadPool 用来执行 Task
spark-submit --class XXX /a/b/cc.jar
Worker 启动好了 Executor 之后, 该 Executor 会自动的向 对应的 APP 的 Driver 程序进行反向注册,证明自己已经启动好了
第二件事:初始化StandaloneSchedulerBackEnd
eventProcessLoop = new DAGSchedulerEventProcessLoop()
registerApplication(app)
3
7
Task 有两种类型: ShuffleMapTask 和 ResultTask
当 ClientEndpoint 启动之后,会将用户提交的任务和相关参数,封装到 ApplicatoinDescription 中提交给 Master 进行任务注册, 注册对象为:RegisterApplication,包装了ApplicatoinDescription
submitStage(finalStage)
2
JobSubmitted
eventThread.start()
RegisterApplication
第二条主线:spark application 在初始化完了之后,执行 action 操作之后执行的 Task 执行Spark的应用程序 Application 执行 action算子 --> sc.runJob() --> dagScheduler.runJob() --> dagScheduler.submitJob() --> eventProcessLoop.post(JobSubmitted)
ReviveOffers
DriverEndpoint
当 Executor 接收到 LaunchTask 消息的时候,就把每个 Task 封装成 TaskRunner 然后提交给 线程池执行。
第四件事:初始化DAGSchedulerEventProcessLoop
第六件事:初始化StandaloneAppClient
TaskSchedulerImpl
当执行 Spark Application 的 action 算子的时候,会进行层层递进的提交,直到最后,提交一个 JobSubmitted 事件给 DAGScheduler 内部的事件处理器:DAGSchedulerEventProcessLoop
提交任务
backend.reviveOffers()
DAGSchedulerEventProcessLoop 的内部维护了一个 eventQueue 的事件队列。将来 Job 任务的提交事件 JobSubmitted 或者 JobCancelled 提交过来放在 eventQueue,然后 DAGSchedulerEventProcessLoop 会从该队列中获取事件然后执行处理
运行SparkSubmit的main方法,在这个方法中,通过反射的方式创建我们自己编写的主类,然后调用main() 方法,然后开始执行我们自己的代码,注意:Spark应用程序的 Driver 就是运行在 SparkSubmit 中
接收到 TaskSet 调用 backEnd 的 reviveOffers() 启动任务的执行
Task
StandaloneSchedulerBackEnd
executor.launchTask()
Master 通过计算调度资源,将资源调度相关的信息,封装在 LaunchExecutor 对象中 发送给 对应的 Worker
LuanchTask
第七件事:初始化ClientEndpoint
当 SparkContext 初始化好了之后,就把 jar 包交给 DAGScheduler,然后划分为多个 Stage,把每个 Stage 再提交给 TaskScheduler 变成 TaskSet,然后由 DriverEndPoint 发送给 Executor运行
Task 分为 ShuffleMapTask或者ResultTask
dagScheduler.doOnReceive(event)
handleJobSubmitted()
当 Worker 把 executor 启动成功之后,就会给 Master 返回一个消息,消息内容封装在 ExecutorStateChanged 对象中。该对象中主要包含 APPID,execID
TaskRunner
任务注册对象,封装了 Driver 提交 job 到 Master 的时候所需要的各种信息。
DAGScheduler
5
第一条主线:spark application 的提交和初始化过程spark-submit --> SparkSubmit --> main() --> submit() --> doRunMain() --> runMain() --> 通过反射创建我自己的主类对象,然后调用main() 启动 --> 执行我们的代码 --> 初始化 SparkContext对象 --> 创建RDD --> 触发Action算子 --> 提交Job --> worker执行任务 --> 任务结束
ApplicationDescription
CoarseGrainedExecutorBackend
ClientEndpoint
ExecutorStateChanged
当 master 接收到 clientEndpoint 提交的任务请求的时候,会将请求所有参数等相关信息,封装在 Application 中。然后将其进行持久化,然后,添加到 waitingApps 的 应用程序待执行队列中,任务队列,默认按照 FIFO 的方式进行执行。
第三件事:初始化DAGScheduler
DAGScheduler 的作用,就是 RDD 的依赖关系链,按照宽依赖划分成多个 Stage。DAGScheduler 在启动的时候,会去初始化和启动一个叫做 DAGSchedulerEventProcessLoop 的事件处理器, 用来接收和处理将来 job 的提交
RegisteredApplication
4
waitingApps += app
RegisterExecutor
EventLoop.onReceive(event)
Client
Master
Application 注册成功之后, Master 会返回给 ClientEndpoint 一个消息,最重要的是 applicationID, 封装在 RegisteredApplication 对象中
注册成功之后,就会 开始 执行 schedule() 方法调度资源,发命令让 对应的 worker 启动 executor 进程。
eventProcessLoop.start()
schedule()
Executor
Worker
DroverEndPoint 依次调用 makeOffers() 方法和 launchTasks() 方法来提交任务
LaunchExecutor
当代码运行到 new SparkContext 的时候,执行 sparkContext 的初始化。SparkContext执行初始化的运行,会初始化三个重要的对象,依次初始化:TaskSchedulerImpl,StandaloneSchedulerBackEnd,DAGScheduler
1
Driver 程序通过 DriverEndPoint 向 Worker 提交任务, 任务消息封装在 LauchTask中,由该 Worker 中的对应的 Executor 负责具体执行
Spark 的应用程序 Applicatoin 在 Spark 的 StandAlone 集群中的运行流程主要分为两条主线:第一就是运行之前的初始化运行环境的过程,第二就是运行过程。
SparkSubmt
6
第五件事:初始化DAGScheduler
把 driver 提交过来的 application 添加到一个 等待队列 waitingApps 中
SparkContext
taskScheduler.submitTasks(TaskSet)
StandaloneAppClient
ThreadPool
收藏
0 条评论
下一页
为你推荐
查看更多