Spark初始化和任务调度
2021-08-16 10:10:26 32 举报
Spark初始化和任务调度
作者其他创作
大纲/内容
scheduleExecutorsOnWorkers
Task
执行Job-8-向executor发送LaunchTask消息,在executor上启动task
启动Executor
onStart
注册Driver-11-返回Driver启动成功
发出DriverState给Worker
prepareAndRunDriver
注册APP-2
runMain
创建调度池
submitMissingTasks
说明2.RDD自身的getPreferedLocations中的数据,最大化的优化的效率,因为getPreferedLocations中表明了每个Partition的数据本地性,虽然当前Partition可能被persist或者checkpoint,但是persist或者checkpoint默认情况下肯定是和getPreferedLocations中的Partition的数据本地性是一致的,所以这就极大的简化Task数据本地性算法的实现和效率的优化------------------------------------------------说明4创建tasks原则1.根据stage的类型进行匹配,为每个partition创建一个task ShuffleMapStag-->ShuffleMapTask ResultStage-->ResultTask2.将之前准备好的task最佳位置,taskBinary等其他信息作为参数传入,创建Task3.最后加入到tasks中------------------------------------------------5.即tasks.size大于0,则为stage中的task,创建TaskSet对象,调用taskScheduler的submitTasks方法,提交TaskSet给TaskScheduler
1.创建工作目录2.组成一些参数3.生成ProcessBuilder4.启动本地进程4.重定向输出流文件5.等待退出状态吗6.向worker发送状态改变
注册Driver-10-Drive状态变更
getMissingParentStages
注册APP-5-返回状态更新
Master
SparkContext
Driver
6)创建和启动DAGScheduler;
launchTask
allocateWorkerResourceToExecutors
执行Job-15
ProcessBuilder
注册APP-11-创建
9)启动metricsSystem测量系统10)创建和启动Executor分配管理器 ExecutorAllocationManager: 基于工作负载动态分配Executor和删除 Executor的代理11)ContextCleaner 的创建与启动 ContextCleaner 用于清理那些超出范围的RDD等等12)设置并启动监听总线ListenerBus13)spark 环境更新14)向事件总线投递Application启动15)向度量系统注册DAGSchedulerSource,BlockManagerSource,以及executorAllocationManagerSource16)将SparkContext标记为激活
状态处理:向master发送消息本地:1.移除本地drivers集合2.移除dirver的资源3.添加到完成drivers集合
1.封装DriverRunner启动线程2.启动线程执行
执行Job-1-提交Job: 放入DAGSchedulerEventProcessLoop 队列继承EventLoop内部是消息队列LinkedBlockingDeque,while循环处理
5)创建任务调度器TaskScheduler;
启动类
注册APP-3
1.创建driver,waitingDrivers2.shuffle选择worker3.计算资源是否满足4.在worker上调度driver: worker.endpoint.send(LaunchDriver)
注册Driver-9-在系统中执行命令,启动一个进程
注册APP-13-发送executor注册状态
执行Job-10
APPClient
注册Driver-4
handleJobSubmitted
DagScheduler
脚本启动
注册APP-14
接受发送请求处理
注册APP-8-返回APP状态
注册driver到Master
注册Driver-5
获取spark属性信息
注册APP-12
执行Job-16-更新task的运行状态
removeDriver
--------------StatusUpdate-----------更新task状态:task的状态为isFinished,将executor 空闲cpu信息更新,调用makeOffers将资源释放
CoarseGrainedSchedulerBackend
注册APP-4-注册自身listener
执行模块
1.必须进行初始化SparkConf和SparkContext,不然就不是spark程序了
执行Job-14
TaskScheduler
1.加载默认配置2.获取API的配置
ExecutorRunner
注册APP-8
注册Driver-12
4
2)创建并初始化Spark UI;3)Hadoop 相关配置及Executor 环境变量的设置;4)创建心跳接收器;和Executor交互
注册APP-1-ask分配集群资源
handleDriverStateChanged
执行Job-9
1)创建Spark Env:1.拷贝SparkConf2.创建DriverEnv获取其信息比如地址等
Worker
执行Job-13-TaskRunner.run1.Spark运行过程中这些日志2.调用CoarseGrainedExecutorBackend的statusUpdate方法,更新状态3.执行准备:反序列化,设置本地参数,设置TaskMemoryManager等4.调用Task的run方法5.将task运行时间指标数据添加到metrics度量系统6.将task直接结果写到blockManager,存储级别为MEMORY_AND_DISK_SER7.调用CoarseGrainedExecutorBackend的statusUpdate更新状态方法,将task状态更新为已完成
1,获取sparkConf2.创建DriverEndpoint: DriverEndpoint用于提交task到 Executor,接收Executor返回的计算结果
RpcEnv
Spark-Submit.sh
1.选择调度算法(平均,还有个顺序分把一个分worker完在分第二个)2.返回给每个worker分配好的core数组
Executor
实际提交用户应用程序
消息处理模块-----------发送Executor注册---------------- 向Driver发送RegisterExecutor消息-----------接受RegisteredExecutor----------- 创建Executor,由Executor实现大部分功能-----------启动Task--------------------1.反序列化task2.调用executor的launchTask
SparkSubmit
移除dirver:1.用drivers集合删除2.添加到completedDrivers集合3.更新为finalStat4.调用schedule()方法
模式匹配:local,yarn初始化 1.MasterUrl获取 2.创建SchedulerBackend:StandaloneSchedulerBackend 等 3.initialize初始化方法:创建调度池,设置调度模式 FIFO/FAIR
driverEndpointRef
注册Driver-1
注册Driver-3
注册APP-6
注册Driver-8-构建
注册Driver-2
submitJob
1.在下jar包,创建driver目录2.创建CommandUtils.buildProcessBuilder3.运行driver:输出文件重定向,格式化等
run
ProcessBuilderLike
LaunchExecutor
1.worker需要分配executor数量2.为application添加executor3.启动executor4.等待返回结果4.向driver发送消息
注册Driver-7
submitStage
prepareSubmitEnvironment
Application模块
start
1.runJob2.dagScheduler.runJob
---------------接受并反馈-----------------接受executor的注册请求1.判断executor是否重复注册2.添加内存缓存,向hashMap中添加executor id与rpc Address信息3.向totalRegisteredExecutors中 加14. 将executorId与data信息添加到executorDataMap中5.向CoarseGrainedExecutorBackend发送消息Executor注册成功6.向listenerBus投递SparkListenerExecutorAdded事件------------------只接受---------------------------执行Job-7-reviveOffers请求----------------1.调用launchTasks:提交Tasks任务2找到对应的executor3.序列化task4.向executor发送LaunchTask消息,在executor上启动task
Cluster模式: 注册driver到Masterclient模式: 直接调用用户编写的主方法
1.先查是否aliveMaster2.创建executor工作目录2.为executor创建本地目录3.executor在环境变量中获取SPARK_EXECUTOR_DIRS配置4.worker将接受到的信息,封装成ExecutorRunner对象5.启动Runner6.向Master报告executor的状态
执行Job-11-给每个Task都会创建一个TaskRunner,TaskRunner继承自Runneralbe,TaskRunner的run方法中,会运行task执行Job-12-将task添加到runningTasks集合中,标记为task正在运行3.将TaskRunner被放到一个线程池中执行
CoarseGrainedExecutorBackend
Driver模块
注册APP-7
注册APP-8-返回executor启动成功
7)启动TaskScheduler8)初始化块管理器BlockManager;
font color=\"#009900\
SparkConf
注册APP-10-创建
submitTasks方法
调用具体Task的实现类:举例ShuffleMapTask1.记录运行时间,GC时间2.反序列化3.得到ShuffleWriter,后期会关注shuffle源码解读4.调用rdd的iterator5.最后调用RDD的实现类:这里是: MapPartitionsRDD的computefont color=\"#ff3333\
启动Driver
注册Driver-6-启动线程
封装参数资源管理器选择:yarn等部署模式:client/cluster
SchedulerBackendStandaloneAppClientListener一个类两个功能
receive
注册APP-9-Shell调用,启动CoarseGrainedExecutorBackend进程
DriverRunner
执行Job
rootPool
执行Job-6-提交TaskSet到TaskScheduduler的submitTasks
spark.deploy.Client
1)组装参数driver地址,executor的配置包括内存和coreshu等2.构建ApplicationDescription3.构建APPClient4.注册状态更新5更新状态为运行
1.检查Master状态2.根据传过来的参数生成一个ApplicationInfo3.加入缓存4.加入等待队列5.持久化APP6.向drive发送状态消息7.进行调度
0 条评论
下一页