DolphinScheduler源码图
2023-02-14 15:27:12 0 举报
DolphinScheduler源码图
作者其他创作
大纲/内容
nettyRemotingServer.start()
addTaskToStandByList(task);
绑定服务停机勾子
若无休眠1秒
processService.createCommand(command)
启动调度
2、处理要提交的任务列表
若执行异常,重新入列workflowEventQueue.addEvent(...)
sendKillCommandToWorker(taskInstance)
nettyRemotingServer = new NettyRemotingServer(serverConfig)
远程发往其他MasterworkflowStateEventChangeCommand
updateProcessInstanceState()
ITaskProcessor taskProcessor=TaskProcessorFactory.getTaskProcessor(taskType)
notifyMyself
if (task.taskCanRetry())
workflowEventHandler.handleWorkflowEvent(workflowEvent)
workflowExecuteRunnable = processInstanceExecCacheManager.getByProcessInstanceId(...)
this.stateEvents.remove(stateEvent)
启动线程(命令转换流程执行器)
共有以下六个TaskProcessor:BlockingTaskProcessorCommonTaskProcessorConditionTaskProcessorDependentTaskProcessorSubTaskProcessorSwitchTaskProcessor
submitStateEvent(stateEvent)
逐一处理需要容错处理的流程实例
遍历所有StreamTaskExecuteRunnable
List<ProcessInstance> processInstances = command2ProcessInstance(commands);
run()
stateEventProcessor
stateEvent = this.stateEvents.peek();
ProcessScheduleTask.executeInternal(JobExecutionContext context))
workflowExecuteThreadPool.executeEvent(workflowExecuteThread)
stateEventResponseService.addStateChangeEvent(stateEvent)
while无限循环
每个工作流实例创建一个WorkflowExecuteRunnable实例
根据Schedule绑定的工作流定义编码查询工作流定义根据工作流定义生成Command,然后入库
通过SPI加载TaskChannelFactory
2、SUBMIT
failoverMaster(needFailoverMasterHost)
实例化任务执行器
supper.start()
buildFlowDag()
registryClient.subscribe(...)
启动NettyServer
前置流程实例IP等于本机IP本地通知
removeTaskFromStandbyList(task)
根据taskInstance生成TaskExecutionContext然后构建TaskPriority对象
notifyProcessChanged(......)
schedulerApi.start()
回应发来StreamTaskEvent的Worker或其他Master
workflowEventLooper.start()
如果parentNodeCode是dag分支的结束节点,则将当前节点的自定义参数合并到流程实例参数中
masterRegistryClient.start()
异步执行
eventExecuteService.start()
过滤要跳过的节点,跳过已完成的或禁用的节点
所有类型的任务都会将任务实例写入数据库(action的逻辑)
registry()
此处还有任务组处理逻辑
执行StateEventHandler实现类(共7种)
SubTaskk
streamTaskEventHandler()
processService.submitTaskWithRetry(......)
提交第一批即将执行的节点
masterFailoverService.checkMasterFailover()
handleTaskEvent(taskEvent)
this.stateEvents.add(stateEvent)
take
开始按DAG顺序执行任务
failoverExecuteThread.start()
通知worker
workflowExecuteRunnable::call
next
taskProcessor.action(TaskAction.DISPATCH)
masterSchedulerBootstrap.init()masterSchedulerBootstrap.start()
masterRPCServer.start()
远程发送TaskKillRequestCommand
当处理成功时
while (!this.stateEvents.isEmpty())
taskProcessor.taskInstance().getState().isFinished()
前置流程实例IP不等于本机IP远程通知
1、获取接下来要执行的节点
如果任务支持重试且任务被强制成功,则停止正在重试的任务
@PostConstructTaskPriorityQueueConsumer.init()
List<Command> commands = findCommands()
2、修改流程状态
WorkflowExecuteRunnable.submitPostNode(String parentNodeCode)
streamTaskExecuteRunnable::handleEvents
按任务实例逐一处理(不包括switch等master任务)
1、逐一处理任务queue中的任务submitStandByTask方法的逻辑
taskProcessor.action(TaskAction.SUBMIT)
添加各种Netty处理器Processor
submitTaskExec(task)(WorkflowExecuteRunnable)
StateEventHandler stateEventHandler =StateEventHandlerManager.getStateEventHandler(stateEvent.getType())
sendAckToWorker(taskEvent)
1、通知worker修改Task的Host如果任务处理执行中且任务类型为common,则通知Worker修改任务实例的Host,用于执行后回应。(当master宕机时,task所在master可能会发生变化)
3、DISPATCH
最终执行notifyMyself一样的逻辑
根据状态类型获取状态处理器
submitStandByTask()
workflowEvent = workflowEventQueue.poolEvent()
2、对1中的各个节点逐一创建任务实例
taskUpdateQueue.put(taskPriority)(队列实例:TaskPriorityQueueImpl)
将任务实例逐一提交到队列中(过滤已完成、正在stop的实例)
processInstance.getHost().equalsIgnoreCase(address)
只有CommonTaskProcessor有以下分派逻辑其中BlockingTaskProcessor返回false,其他TaskProcessor返回true
workflowEventQueue
DependentTask初始化依赖参数
if (workflowExecuteThread.workFlowFinish())
构建DAG
若DependResult.SUCCESS == dependResult
postNodeList.add(subsequent)
stateEventCallbackService.sendResult(......)
线程执行成功回调函数
workflowEventHandler()
DependResult dependResult = getDependResultForTask(task)(获得task的依赖结果,若没有依赖默认为SUCCESS)
notifyProcess
workflowEventQueue.addEvent(...)
WorkflowExecuteRunnable workflowRunnable = new WorkflowExecuteRunnable(...)
1、任务入列
workflowExecuteThread.handleEvents()
for (String subsequent : startVertexes)
doFailoverMaster(masterHost)
4、RUN
notifyProcessHostUpdate(taskInstance)
启动线程(容错处理)
taskProcessor.action(TaskAction.RUN)
MasterServer
获取前置工作流实例和任务实例
submitPostNode(null)
workflowExecuteThread.addStateEvent(stateEvent)
masterRegistryClient.setRegistryStoppable(this)
启动线程(流程正常结束处理)
VarPool(变量池,任务之间参数传递规则):1、如果当前任务为首节点,则直接取流程实例的varPool(子流程才有);2、遍历每个前置任务实例的变量池中的所有参数,先设置为IN类型,再合并放在一个Map中,合并规则:(1)如果当前参数值为空,则不覆盖已有同名参数值;(2)如果当前参数值不为空,则对比已有同名参数所属任务实例的执行结束时间,取结束晚的任务实例的参数值。(3)如果不是上述两种情况,则直接添加到Map中。
遍历所有WorkflowExecuteRunnable
启动线程(解析流程DAG图)
TaskInstance task = readyToSubmitTaskQueue.peek()
如果实例状态有变化,则添加状态事件
如果task是第一次执行,则传递前置任务的参数
初始化任务执行器
readyToSubmitTaskQueue.put(taskInstance)
initDependParameters()
onSuccess
initTaskQueue()
taskPluginManager.loadPlugin()
(1)若preNodeCode为空,则取DAG的起始节点(2)若节点为ConditionsTask,则找出符合条件的后置节点(3)若节点为SwitchTask,则找出符合条件的后置节点(4)以上均不符合,则取preNodeCode的后置节点
如果工作流结束,则通知父流程状态变化
0 条评论
回复 删除
下一页