工作流运行
2025-02-18 20:05:00 0 举报
Scheduler
作者其他创作
大纲/内容
服务注册、监听、心跳、容错
registerProcessor方法会把processor注册到serverHandler中
有两种Executor工厂,一种是Syn一种是Asyn
将executor提交到对应的线程池,有两种线程池syncTaskExecuteThreadPool和asyncTaskExecuteThreadPool
workerTaskExecutorFactory.createWorkerTaskExecutor()得到执行器
.......
onTaskFailed
WorkerTaskExecutorFactoryManager#getWorkerTaskExecutorFactory(taskExecutionContext)
........
NettyServerHandler#channelRead()
workerRpcServer.start()
onTaskSuccess()
workerRpcClient.start()
从传输过来的msg里面拿到CommandType,然后从porcessors的map里面拿到对应的Processor
taskPluginManager从最开始run方法中加载
执行器放入线程池中
根据传过来的taskExecutionContext拿到对应的WorkerTaskExecutorFactory
Spark
WorkerTaskExecutorthreadPoolManager
GlobalTaskDispatchWaitingQueueLooper
serverBootstrap.childHandler(initNettyChanel(ch))
taskListenerManager.getTaskStateChangeListeners().forEach().onTaskSuccess
WorkflowExecuteThreadPool
BaseLoopTask#execute()
WorkerTaskExecutorThreadPoolManager#submitWorkerTaskExecutor(baseWorkerTaskExecutor)
TaskChannel taskChannel = taskPluginManager.getTaskChannelMap().get(taskExecutionContext.getTaskType());
服务注册、监听、心跳
NettyServerHandler#processReceived
通过SPI加载taskChannelFactoryMap和taskChannelMap
WhaleWorkerServer#run()(通过Spring的@PostConstruct注解)
loopTaskInstanceStatus = queryTaskInstanceStatus(taskExecutionInfo);
MasterSchedulerBootStrap
Zookeeper
BaseWorkerTaskExecutor
MasterServer
WorkerRPCClient
SHELL
MasterRPCClient
processInstances.forEach(this::bootStrapWorkflowInstance);
ITask task = taskChannel.create()
已同步为例
SyncWorkerTaskExecutorThreadPool#submitWorkerTaskExecutor
WorkerTaskDispatchProcessor
taskDispatcherLooperManager.start();
MasterSchedulerBootstrap#bootStrapWorkflowInstance()
MasterSchedulerBootstrap
传过来的msg---->通过JsonUtils转成taskDispatchrequest--->从request中拿到taskExecutionContext
GlobalWorkerTaskExecutorHolder.putWorkerTaskExecutor(syncWorkerTaskExecutor);
WorkerRPCServer经过各种Processor
将Execuotor放入线程池执行
会把NettyServerHandler serverHandler添加到pipeline中
Command监听
SQL
WorkflowEventLooper
执行自定义Task类中的方法
initialize();
Master
task.execute();
taskPluginManager.loadPlugin();
taskExecutionInfo = submitLoopTask();
ch.pipeline().addlast(serverHandler)
this.nettyRemotingServer.start();
DB
SyncWorkerTaskExecutor#execute()
根据状态执行对应的方法
createCommand
WorkerTaskDispatchProcessor#process()
Executor中放入了taskPluginManager,和listenerManager
workflowEventLooper.start();
List<ProcessInstance> processInstances = command2ProcessInstance(commands);
Flink
MasterSchedulerBootstrap#findCommands()
UI
onTaskPause()
Worker
doSubmitNewTask()
API
. . . . . .
MasterSchedulerBootstrap#run()
经过各种ProcessorMasterRPCServer
threadPoolExecutor.execute(syncWorkerTaskExecutor::execute);
0 条评论
下一页