DolphinScheduler源码图
2024-07-15 15:21:34 5 举报
DolphinScheduler源码图
作者其他创作
大纲/内容
前置流程实例ip等于本机ip通知
notifyProcessChanged(workflowExecuteThread.getProcessInstance());
if (task.taskCanRetry())
while无限循环
submitTaskExec(task)(WorkflowExecuteRunnable);
线程执行成功回调函数future.addCallback(new ListenableFutureCallback())
run()
添加各种Netty处理器processor
for (String subsequent : startVertexes)
从workflowEventQueue拉取workflowevent事件:workflowEvent = workflowEventQueue.poolEvent();
启动线程(容错处理)
buildFlowDag();
DependResult dependResult = getDependResultForTask(task);(获取task的依赖结果,若没有依赖默认为SUCCESS)
masterRegistryClient.start()
workflowExecuteThread.addStateEvent(stateEvent);
this.stateEvents.add(stateEvent);
TaskInstance task = readyToSubmitTaskQueue.peek();
2.对1中的各节点创建任务实例
1.获取接下来要执行的流程
MasterServer
submitStandByTask();
异步执行
run()
远程发送TaskKillRequestCommand
onSuccess
4.run
ITaskProcessor taskProcessor = TaskProcessorFactory.getTaskProcessor(taskInstance.getTaskType());
processService.createCommand(command);
最终执行notifyAll一样的逻辑
@PostConstructTaskPriorityQueueConsumer.init()
updateProcessInstanceState();
createCommand
taskProcessor.taskInstance().getState().isFinished()
workerflow事件处理逻辑
Netty是由a target=\"_blank\" href=\"https://baike.baidu.com/item/JBOSS提供的一个java开源框架,现为 a target=\"_blank\" href=\"https://baike.baidu.com/item/Github上的独立项目。Netty提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序
stateEventResponseService.addStateChangeEvent(stateEvent)
workflowExecuteThreadPool.executeEvent(workflowExecuteThread);
nettyRemotingServer.start()
WorkflowExecuteRunnable.submitPostNode
开始按DAG执行任务
根据状态类型获取状态处理器
failoverMaster(needFailoverMasterHost);
启动调度
taskProcessor.action(TaskAction.RUN);
前置流程实例ip不等于本机ip通知
启动线程(流程正常结束处理)
逐一处理需要容错处理的流程实例
若无,则休眠1s
只有CommonTaskProcessor有以下分配逻辑,其中BlockingTaskProcessor返回false;其他TaskProcessor返回true
workflowExecuteThread::handleEvents
2.处理要提交的任务列表
masterRegistryClient.setRegistryStoppable(this)
processService.submitTaskWithRetry()
notifyProcessHostUpdate(taskInstance);
所有类型的任务都会将任务实例写进数据库(action的逻辑)
while (!this.stateEvents.isEmpty())
异步调用call方法:workflowExecuteRunnable::call
2.修改流程状态
共有以下六个proc:1.BlockingTaskProcessor2.CommonTaskProcessor3.ConditionTaskProcessor4.DependtTaskProcessor5.SubTaskProcessor6.SwitchTaskProcessor
构建DAG
removeTaskFromStandbyList(task);
this.eventExecuteService.start();
this.failoverExecuteThread.start();
实例化任务执行器
根据taskinstance生成的TaskExecutionContext对象,然后构建taskPriority对象
masterFailoverService.checkMasterFailover();
1.逐一处理queue任务中submitStandByTask方法逻辑
当处理成功时
将任务实例逐渐提交到队列中(过滤已完成和stop的)
1.任务入列
执行StateEventHandler实现类(共7种)
启动线程:super.start();命令转换流程执行器
processInstance.getHost().equalsIgnoreCase(address)
initDependParameters();
如果task是第一次运行,则传递前置任务的参数
调用workflowEventHandler处理该事件:workflowEventHandler.handleWorkflowEvent(workflowEvent);
获取前置工作流实例和任务实例
masterRPCServer.start()
WorkflowExecuteRunnable workflowRunnable = new WorkflowExecuteRunnable()
启动nettyRemotingServer
ProcessScheduleTask.executeInternal(JobExecutionContext context)
RPC是远程过程调用
workflowEventHandler()
若DependResult.SUCCESS == dependResult
this.submitStateEvent(stateEvent);
next
List<ProcessInstance> processInstances = command2ProcessInstance(commands);
addTaskToStandByList(task);
font color=\"#4d4d4d\
take:在command扫描线程中启动了workflowEventLooper线程用于消费workerFlowEvent
1.通知worker修改Host如果任务处理执行中且任务类型为common,则通知worker修改任务实例的HOST,用于执行后回应。(当master宕机时,task所在的master可能发生变化)
(1)若preNodeCode为null,则取DAG的起始节点(2)若节点为ConditionsTask,则找出符合条件的后置节点(3)若节点为SwitchTask,则找出符合条件的后置节点(4)以上均不符合,则取preNodeCode为后置节点
创建连接状态监听器ConnectionStateListener
通过spi加载TaskChannelFactory
绑定服务停机的勾子
StateEventHandler stateEventHandler = StateEventHandlerManager.getStateEventHandler(stateEvent.getType())
2.SUBMIT
stateEvent = this.stateEvents.peek();
sendKillCommandToWorker(taskInstance);
postNodeList.add(subsequent);
按任务实例逐一处理(不包括swicth等master任务)
WorkflowExecuteRunnable.submitPostNode()
taskProcessor.action(TaskAction.SUBMIT);
this.stateEvents.remove(stateEvent);
根据schedule绑定的工作流定义编码查询工作流定义,根据工作流定义生成command,然后入库
通知worker
stateEventProcessor
submitPostNode(null);
如果任务支持重试且任务被强制成功,则停止正在重试的任务
schedulerApi.start()
taskPluginManager.loadPlugin();
workflowExecuteThread.workFlowFinish()
创建WorkflowExecuteRunnable 写入到workflowEventQueue中:workflowEventQueue.addEvent(new WorkflowEvent();
3.DISPATH
远程发往其他Master workflowStateEventChangeCommand
this.nettyRemotingServer = new NettyRemotingServer(serverConfig);
readyToSubmitTaskQueue.put(taskInstance);
taskProcessor.action(TaskAction.DISPATCH)
过滤掉要跳过的节点,跳过已完成或者禁用的节点
List<Command> commands = findCommands();
每个processInstance创建一个WorkflowExecuteRunnable
doFailoverMaster(masterHost);
启动线程,解析DAG图workflowEventLooper.start();
如果parentNodeCode是dag分支的结束节点,则将当前节点的自定义参数合并到流程实例参数中
workflowEventQueue
初始化任务执行器
taskUpdateQueue.put(taskPriority);(队列实例taskPriorityQueueImpl)
如果工作流结束,则通知父流程状态发生变化
提交第一批即将执行的节点
遍历:for (WorkflowExecuteRunnable workflowExecuteThread : this.processInstanceExecCacheManager.getAll())
initTaskQueue
获取WorkflowExecuteRunnable:WorkflowExecuteRunnable workflowExecuteRunnable = processInstanceExecCacheManager.getByProcessInstanceId()
执行异常,重新入列:workflowEventQueue.addEvent(workflowEvent);
this.masterSchedulerBootstrap.init();(初始化一个MasterPreExecThread的线程池)this.masterSchedulerBootstrap.start();
收藏
0 条评论
下一页