DolphinScheduler Master核心代码流程
2022-11-29 15:30:04 1 举报
Apache DolphinScheduler是一个分布式去中心化,易扩展的可视化DAG工作流任务调度平台。致力于解决数据处理流程中错综复杂的依赖关系,使调度系统在数据处理流程中开箱即用
作者其他创作
大纲/内容
executeProcess()
循环从队列中消费数据,分发任务
TaskPriorityQueueConsumer.java@PostConstructsuper.start()
masterRegistry.registry()
path是否为空
Y
N
Master容错处理:查询到需要容错的流程实例,将实例上下文补充后进行容错处理Worker容错处理:查询到需要容错的任务实例,将任务上下文补充后进行容错处理
processInstanceId是否为0
cmdParams是否存在ProcessInstanceId的key
在Spring的Bean生命周期中,实例化后会调用afterPropertiesSet方法进行一些额外操作,只需要实现InitializingBean接口即可。
1. 如果是开始节点,直接返回SUCCESS2. 获取到依赖节点进行遍历:2.1 如果dag不包含依赖节点或者skipTaskNodeList与forbiddenTaskList中包括依赖节点,continue2.2 completeTaskList中不包括的话返回WAITING2.3 依赖任务未暂停和取消的话返回WAITING2.4 条件任务则continue2.5 依赖任务不成功则返回FAILED
否
1. 获取executorManager,当前为worker类型2. 选择host,这里根据一个权重规则进行选择
按不同类型的任务实例new不同的任务执行线程,提交到任务执行线程池中进行执行
启动定时任务QuartzExecutors
1. 先判断自身状态,dead了就停止2. 判断节点资源是否充足,不充足就非正常3. 将负载等信息写入zk
1. 获取锁2.获取活跃线程数量3. 取一条待执行的命令
先提交到数据库,然后构建一个TaskPriority对象后分发到一个优先级队列taskUpdateQueue
zkMasterClient中使用初始化好的zkClient
删除掉zk中的deadmaster
注册一个监听
初始化状态监听
检查Master的cpu和内存,高于给定值则休眠等待
@PostConstruct init1. 初始化一个master执行服务的线程池,“Master-Exec-Thread”2. 初始化一个Netty客户端
获取容错分布式锁/ds/lock/failover/masters/ds/lock/failover/workers
从zk上获取到master节点从zk上获取到worker节点
1. 已经在readyToSubmitTaskQueue队列中continute2. 已经在completeTaskList中continute3. 非暂停和取消的任务加入到readyToSubmitTaskQueue
1. 获取commandType2. 获取cmdParams3. processInstanceId=0
启动Master的调度服务线程
获取原来的processInstanceId,并设置一些参数
初始化一个NettyServer,并注册三个Processor
启动一个master心跳汇报线程,上报master指标
afterPropertiesSet()
afterPropertiesSet注册worker和client两个executorManager
循环等待Master注册成功
ServerNodeManager.javaafterPropertiesSet()中从zk中加载node,起一个线程来定时去同步worker节点信息,同时初始化MasterNode和WorkerGroupNode的监听器
创建zk永久节点/ds/nodes/master/ds/nodes/worker
while循环
endProcess()
constructProcessInstance()
new MasterExecThread会初始化一个taskExecService线程池和一个netty客户端
prepareProcess()
cmdParams是否为空
根据任务节点创建任务实例
登录成功
更新流程实例的状态1. activeTaskNode大于0或者存在需要重试的任务,则为运行时状态,包括:READY_STOP,READY_PAUSE,WAITING_THREAD,RUNNING_EXECUTION2. 流程实例失败,则为FAILURE3. 存在等待线程的任务,则为WAITING_THREAD4. 如果为READY_PAUSE,则存在重试任务则FAILURE;暂停任务的列表不为空或者readyToSubmitTaskQueue队列不为空则为PAUSE;其余为SUCCESS;5. 如果为READY_STOP,则:stoplist和killlist不为空则STOP;其余为SUCCESS;6. 如果为RUNNING_EXECUTION,则获取killList,readyToSubmitTaskQueue队列不为空则为RUNNING_EXECUTION;killTasks不为空则为FAILURE;其余为成功
TaskUpdateQueueConsumerThread
注册一个进程退出的Hook
submitStandByTask()
1. 构建TaskExecutionContext,添加租户信息,队列信息,SQL/DataX/Procedure/Sqoop2. 构建ExecutionContext3. dispatcher.dispatch(executionContext)
initSystemZNode()
注册Master
zk注册中心初始化
启动一个ZKMasterClient,用于容错
获取到DAG中要首次提交的所有任务节点
new一个MasterExecThread,扔到执行线程池去执行
没有可执行的命令,睡一会儿
如果errorTaskList有内容话,则completeTaskList中的所有暂停任务需要设置为kill
removeZKNodePath
重置processInstance的命令参数
至此结束
treeCacheStart
this.masterSchedulerService.start()Master调度线程:MasterSchedulerThread
1. 停止标志2. 关闭线程池、NettyServer、zkClient、Quartz实例
判断当前要提交任务的依赖任务执行结果,为SUCCESS则提交任务,FAILED加入到对应队列
this.zkMasterClient.start()
MasterServer
initTaskQueue()
遍历activeTaskNode1. 找不到任务,则任务提交失败,同时从activeTaskNode移除2. 任务完成,从activeTaskNode移除;3. 任务成功,则加入到completeTaskList4. 任务失败分为几种情况:4.1 任务需要容错,则加入到recoveryToleranceFaultTaskList中4.2 任务需要重试,则加入到readyToSubmitTaskQueue4.3 任务不需要重试,加入到completeTaskLIst,如果是条件任务继续submit满足条件的节点,非条件的加入到errorTaskList,同时流程实例的失败策略为结束的话要kill掉流程中的其他任务处理线程5. 停止和暂停的任务加入到completeTaskList中
创建zk临时节点/ds/nodes/master/master-ip并监听状态
文本
scheduleProcess()
进入到循环,直到流程实例停止
cmdParams是否存在WaitingThreadInstanceId的key
创建三个zknode/ds/nodes/master/ds/nodes/worker/ds/dead-server
如果节点资源满足条件可以提交任务了,则提交任务
buildFlowDag()
记住密码
command命令不为空且流程为上线状态
活跃Master节点为1的话开启容错
是
构建一个zkClient
LowerWeightHostManager.java@PostConstruct中初始化一个定时执行的线程来计算节点的workerHostWeightsMap
submitPostNode(null)
消费
生成一个新的processInstance
初始化系统znode
RefreshResourceTask()
1. 根据context获取到workerHostWeights2. 找到worker组中权重最小的worker节点3. 将该节点set到ExecutionContext中
权重计算 = cpu * 10 + memory * 20 + loadAverage * 70
实例超时发送告警邮件
操作command,返回一个processInstance
获取一个Quartz实例,并进行初始化,然后start
获取分布式锁/ds/lock/failover/startup-master
NettyExecutorManager.execute()
processInstanceId=cmdParams.get(xxx)
生成一个新的processInstanceId
runProcess()
1. 根据command构建一个流程实例2. 检查线程是否够用3. 补充实例信息4. 保存流程实例并删除command
0 条评论
下一页