Flink源码流程
2022-03-23 16:50:15 1 举报
为你推荐
查看更多
flink源码流程
作者其他创作
大纲/内容
execute(getStreamGraph(jobName));一直往下走最终到org.apache.flink.streaming.api.graph.StreamGraphGenerator#generate发现里面有:
font color=\"#ff0000\
我们进入create跳转到org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponentFactory#create是一个抽象方法,找到实现方法org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory#create发现有以下功能:这边dispatcher是一些高可用的服务,getDispatcherLeaderRetriever这个可以找到它当前的leader是谁(我们高可用有主和备)①// 大家注意这边的每个组件都有自己的高可用服务dispatcherLeaderRetrievalService = highAvailabilityServices.getDispatcherLeaderRetriever();②//创建 ResourceManager:Yarn模式的ResourceManager,注意这边是flink的RM,进入createResourceManager//创建ResourceManager对象,返回的是new YarnResourceManagerresourceManager = resourceManagerFactory.createResourceManager(…) ③ 创建和启动 Dispatcher => dispatcher会创建和启动JobMaster,dispatcherRunner = dispatcherRunnerFactory.createDispatcherRunner(…)④ 启动 ResourceManager这边在启动RM的时候还会顺带去连接Yarn的RM和NM这边JobMaster和Dispatcher都启动之后才会去启动ResourceManagerresourceManager.start();
5.3
45
正式启动 Dispatcher,Dispatcher主要负责两件事情:1.接收用户的作业,2.启动JobMaster,进入start因为Rpc服务是是基于Akk的组件通信,那么它如果调用start那对应的对方会去执行 onStart()方法这个方法相当于接收到消息做出回应我们不管通信过程的话就直接找dispatcher里面的onStart()方法,上面就有Dispatcher类直接进入然后后接着看 onStart()dispatcher.start();启动了dispatcher之后我们进入Dispatcher找到org.apache.flink.runtime.dispatcher.Dispatcher#onStart,因为我们用的是Akka通信,既然有start命令开启一个dispatcher,就看到会有onStart进行接收,我们在onStart看到① 启动 dispatcher服务,startDispatcherServices(); ② 启动JobMaster,这边才是启动了JobMaster,startRecoveredJobs();我们进入startRecoveredJobs方法就会发现到达org.apache.flink.runtime.dispatcher.Dispatcher#runJob,这个里面调用了org.apache.flink.runtime.dispatcher.Dispatcher#createJobManagerRunner这个方法,
找到org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#execute()方法,这个方法里面我们发现了一段代码execute(getStreamGraph(jobName)) 获取StreamGraph,并接着执行,这边getStreamGraph(jobName)是StreamGraph 入口,我们进入execute方法会到达org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#executeAsync(org.apache.flink.streaming.api.graph.StreamGraph)这个方法,这个方法的功能就是:
48.
我们进入startJobMasterServices跳转到org.apache.flink.runtime.jobmaster.JobMaster#startJobMasterServices这个方法里面有:① 启动心跳服务:taskmanager(类似小弟)、ResourceManager。// 因为JobMaster是老大,taskmanager是小弟,这两个要有交互,JobMaster把任务分给TaskManager,TaskManager要定时会报一些情况// ResourceManager(类似管家)和JobMaster(类似主人)保持心跳是因为要确定资源运行情况,资源够不够需不需要申请startHeartbeatServices();② JobMaster内部有slotpool(ResourceManager里面也有一个slotmanager),启动 slotpoolfont color=\"#ff0000\
9
TaskManager启动YarnTaskExecutorRunner是TaskManager的入口类,我们回到org.apache.flink.yarn.YarnResourceManagerDriver#initializeInternal这个方法里面发现:创建Yarn的ResourceManager的客户端,注意不是flink的(flink里面的AM里面的RM向Yarn里面的RM去申请资源,font color=\"#ff0000\
创建Yarn的ResourceManager的客户端,注意不是flink的(flink里面的AM里面的RM向Yarn里面的RM去申请资源,flink里面怎么申请的就是通过创建Yarn里面的RM客户端就是这边代码),并且初始化和启动resourceManagerClient = yarnResourceManagerClientFactory.createResourceManagerClient//初始化resourceManagerClient.init(yarnConfig);//启动resourceManagerClient.start();//注册了一个AMfinal RegisterApplicationMasterResponse registerApplicationMasterResponse = registerApplicationMaster();//创建yarn的 NodeManager的客户端,并且初始化和启动(因为容器启动之后,要去启动TaskManager)nodeManagerClient = yarnNodeManagerClientFactory.createNodeManagerClient(yarnContainerEventHandler);//初始化nodeManagerClient.init(yarnConfig);//启动 nodeManagerClient.start();//然后返回到ResourceManager类的initialize();
31.进入createScheduler
17.② 进入createClusterDescriptor
6
我们进入execute找到org.apache.flink.client.deployment.executors.AbstractJobClusterExecutor#execute这个方法的功能是:
34.进入③
因为我们程序中都有这一段代码StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment()以及最后肯定会写env.execute(\"OrderWideApp\")这个代码;所以我们接下来找到org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#execute()方法,
24.ClusterEntrypoint#runCluster
Per-job模式的AM container加载运行入口是YarnJobClusterEntryPoint中的main()方法我们找到YarnJobClusterEntrypoint.java类的main方法org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint#main这个方法里面的功能有:
找到实现类ApplicationClientProtocolPBClientImpl.java的实现方法submitApplication(SubmitApplicationRequest request) 这个方法下面有//取出报文SubmitApplicationRequestProto requestProto = ((SubmitApplicationRequestPBImpl) request).getProto();//将报文发送发送到服务端,并将返回结果构成return new SubmitApplicationResponsePBImpl(proxy.font color=\"#ff0000\
7
3
10.到这边org.apache.flink.client.cli.CliFrontend#main方法就结束了,接下来是新的步骤进入org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#execute()
41
命令行程序启动flink\\bin\\flink找到org.apache.flink.client.cli.CliFrontend,CliFrontend就是我们的入口类,找到这个类的org.apache.flink.client.cli.CliFrontend#main方法,
5.4
1
40
18.③ 进入getClusterSpecification
这个getYarnJobClusterEntrypoint是YarnJob模式的入口类,在某个容器里面启动了AM,然后ApplicationMaster里面就执行这个路口类getYarnJobClusterEntrypoint():Yarn 集群入口点的类名。
我们先进入这个方法clusterClientFactory.createClusterDescriptor 会跳转到这边org.apache.flink.client.deployment.ClusterClientFactory#createClusterDescriptor这个是一个抽象方法,我们找到实现类org.apache.flink.yarn.YarnClusterClientFactory#createClusterDescriptor进入,发现getClusterDescriptor(configuration);我们进入getClusterDescriptor,跳转到org.apache.flink.yarn.YarnClusterClientFactory#getClusterDescriptor发现有:
回到org.apache.flink.client.cli.CliFrontend#run里面发现有org.apache.flink.client.cli.CliFrontend#getJobJarAndDependencies进入里面发现有jarFilePath这个是获取 用户的jar包(这个JobJar就是我们代码写完打的包)和其他依赖,
这个方法里面我们发现了一段代码execute(getStreamGraph(jobName)) 获取StreamGraph,并接着执行,这边getStreamGraph(jobName)是StreamGraph 入口,
找到ClientRMService.java的submitApplication(SubmitApplicationRequest request)方法,最终这个submitApplication方法里面有//将应用请求提交到Yarn上的RMAppManager去提交任务font color=\"#ff0000\
我们进入到YarnClusterDescriptor,跳转到org.apache.flink.yarn.YarnClusterDescriptor#YarnClusterDescriptor这个类创建了包括以下信息yarn的配置、yarn客户端、flink的配置、用户的jar包是否包含、yarn的队列
29
接着我们进入clusterClientFactory.getClusterSpecification这个方法,发现是一个抽象方法,我们找到org.apache.flink.client.deployment.AbstractContainerizedClusterClientFactory#getClusterSpecification这个方法里面的功能是:① 获取jobManagerMemoryMB这个是jobManager的内存② 获取taskManagerMemoryMB这个是taskManager的内存③ 获取slotsPerTaskManager这个是每个taskManage的slot槽的数量④ 在这边把上面的一个个的参数设置到这个ClusterSpecification叫集群特有配置类里面这边用了建造者模式new ClusterSpecification.ClusterSpecificationBuilder() .setMasterMemoryMB(jobManagerMemoryMB) .setTaskManagerMemoryMB(taskManagerMemoryMB) .setSlotsPerTaskManager(slotsPerTaskManager) .createClusterSpecification();
我们进入到deployInternal方法里面跳转到org.apache.flink.yarn.YarnClusterDescriptor#deployInternal,这个方法里面的功能非常多:
我们进入createResourceManager跳转到org.apache.flink.runtime.resourcemanager.ResourceManagerFactory#createResourceManager(…)是个抽象方法,找到具体的实现方法org.apache.flink.runtime.resourcemanager.active.ActiveResourceManagerFactory#createResourceManager(…)这里面有一个new ActiveResourceManager(resourceManagerRuntimeServices.getSlotManager注意在这边同时创建了SlotManager)
4
33.① 进入startJobMasterServices
正式启动 ResourceManager,resourceManager.start();因为有start方法,所以我们找到对应的org.apache.flink.runtime.resourcemanager.ResourceManager#onStart方法里面有启动ResourceManager的核心服,startResourceManagerServices();我们进入startResourceManagerServices,跳转到org.apache.flink.runtime.resourcemanager.ResourceManager#startResourceManagerServices这个方法里面有:创建了Yarn的RM和NM的客户端,初始化并启动,initialize();高可用服务所有会有选举 上面那些都是和yarn的RM和NM有关,下面才是自己的RM。Flink的RM先创建了Yarn的RM和NM的客户端,因为要通信使用。 startResourceManagerServices继续往下面有:
我们每个组件都有高可用的,要去选举,这边leaderElectionService就是每个组件对应的一个选举服务 ,启动dispacher的leader选举leaderElectionService.start(dispatcherRunner);我们继续进入start是一个抽象方法,找到实现org.apache.flink.runtime.leaderelection.StandaloneLeaderElectionService#start这个方法中有contender.grantLeadership(HighAvailabilityServices.DEFAULT_LEADER_ID);这边是核心,grantLeadership这方法名要记住因为后面会经常看到,每个组件都有选举服务每个组件都要这么跳过来,最终都要调用这个方法,不同的组件对这个方法有不同的实现,我们继续进入到grantLeadership一直往下走会跳很多层,最后到达org.apache.flink.runtime.dispatcher.runner.JobDispatcherLeaderProcess#onStart这个onStart里面调用了org.apache.flink.runtime.dispatcher.runner.AbstractDispatcherLeaderProcess.DispatcherGatewayServiceFactory#create方法继续找到实现类org.apache.flink.runtime.dispatcher.runner.DefaultDispatcherGatewayServiceFactory#create最后在这个方法里面发现: 真正创建Dispatcher,注意创建但是没启动,启动是下面一段代码。
5.2
①.创建了YarnClient,这个客户端是用来和Yarn做交互的final YarnClient yarnClient = YarnClient.createYarnClient();②.创建一个Yarn配置,创建好之后不需要传参数就可以直接获取到Yarn配置了,因为环境里面有(就是之前那些各种配置之类的)final YarnConfiguration yarnConfiguration = new YarnConfiguration();③.初始化、启动 YarnClientyarnClient.init(yarnConfiguration);yarnClient.start();④.创建YarnClusterDescriptor集群描述器,new font color=\"#ff0000\
51.下面的所有准备都做好之后就会先Yarn提交任务,接下来就要是Yarn方面的内内容可以在这边找YarnClientImpl.java
47
我们进入到deployJobCluster,调转到org.apache.flink.client.deployment.ClusterDescriptor#deployJobCluster是一个抽象方法,找到实现方法org.apache.flink.yarn.YarnClusterDescriptor#deployJobCluster发现有font color=\"#ff0000\
5.1
① 获取executor执行器的工厂,flink使用了大量的java的工厂模式,所以经常看到各种Factory,final PipelineExecutorFactory executorFactory = executorServiceLoader.getExecutorFactory(configuration);(这边除了PipelineExecutorFactory 还有YarnJobClusterExecutorFactory、YarnSessionClusterExecutorFactory、KubernetesSessionClusterExecutorFactory等等)② 选择合适的executor提交任务CompletableFuture<JobClient> jobClientFuture = executorFactory.getExecutor(configuration).font color=\"#ff0000\
在run方法里面里面有org.apache.flink.client.cli.CliFrontend#validateAndGetActiveCommandLine进行客户端的选择,判断方式是①\t是否指定为per-job模式,即指定”-m yarn-cluster”; ID = \"yarn-cluster\"②\t是否存在flink在yarn的appID,即yarn-session模式是否启动③\t executor的名字为 \"yarn-session\" 或 \"yarn-per-job\"三个其中一个满足即可,因为我们是yarn模式,所以最后会选择Yarn-session-cli模式,
org.apache.flink.client.cli.CliFrontendParser#parse方法里面发现调用这个org.apache.commons.cli.DefaultParser#parse方法内部调用了handleToken,这个方法就是具体解析参数的方法,就是各种情况的解析,逻辑大体相同:去除-或--前缀,校验参数,参数校验好之后,回到org.apache.flink.client.cli.CliFrontend#run,
一个关键的参数是 List<Transformation<?>> transformations。Transformation代表了从一个或多个DataStream生成新DataStream的操作。DataStream的底层其实就是一个 Transformation,描述了这个DataStream是怎么来的。DataStream 上常见的 transformation 有 map、flatmap、filter等。这些transformation会构造出一棵 StreamTransformation 树,通过这棵树转换成 StreamGraph。
37.进入initialize
启动心跳服务:是TaskManager、JobMaster(flink内部的RM还要和JobMaster通信)的心跳,进入startHeartbeatServices可以看到taskManagerHeartbeatManager和jobManagerHeartbeatManagerstartHeartbeatServices();font color=\"#ff0000\
28
2
49.③ 进入startAppMaster
30.进入createJobManagerRunner
47.进入deployInternal
YarnClientImpl.java的submitApplication(ApplicationSubmissionContext appContext)方法中有rmClient.submitApplication(request),进入submitApplication
38.进入YarnResourceManagerDriver#initializeInternal
27.③ 在创建ResourceManager结束之后回到ClusterEntrypoint#runCluste继续往下,创建和启动 Dispatcher,Dispatcher又会会创建和启动JobMaster
① 其中一大推代码都是在校验各种信息② 创建应用、获取应用的id ,final YarnClientApplication yarnApplication = yarnClient.createApplication();font color=\"#ff0000\
50.
bin/flink run -t yarn-per-job -c com.xxx.xxx.WordCount./WordCount.jar
JobMaster的启动:org.apache.flink.runtime.dispatcher.Dispatcher#createJobManagerRunne这个方法里面同时也在这个方法里面启动了JobMaster就是runner.start();因为有一个start肯定有一个onStart方法要找的话就要到JobManagerRunnerImpl里面找,注意这边启动JobMaster是通过JobManagerRunner它是一个接口,所以你要找到JobManagerRunner的实现类的org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl#start方法,这个方法里面也有可用的leader选举服务,每个小组件都有,这又是一股熟悉的方法,leaderElectionService.start(this);也涉及到高可用,一直往下找发现org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl#grantLeadership这个是真正的选举,这个方法里面有org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl#verifyJobSchedulingStatusAndStartJobManager进入之后发现startJobMaster(leaderSessionId);继续进入最后找到org.apache.flink.runtime.jobmaster.JobMaster#start,继续找到org.apache.flink.runtime.jobmaster.JobMaster#startJobExecution最终这个方法里面有:① 真正启动JobMaster服务,进入startJobMasterServicesstartJobMasterServices();② 重置和启动调度器,这边是物理执行图的开始,进入resetAndStartSchedulerresetAndStartScheduler();
35.进入resetAndStartScheduler开始物理执行图
19.④ 进入deployJobCluster
进入execute方法会到达org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#executeAsync(org.apache.flink.streaming.api.graph.StreamGraph)这个方法,这个方法的功能就是:
进入executeProgram,会跳到org.apache.flink.client.ClientUtils#executeProgram,这个方法有以下几个功能:① getUserCodeClassLoader用户代码类加载器,就是我们写的类② 配置环境的上下文,用户代码里的 getExecutionEnvironment就会拿到这些环境信息ContextEnvironment这个是当前环境的上下文对象,就是我们写代码开头的第一行那个获取执行环境就是从这边来的,还有StreamContextEnvironment也是环境对象都是从这边设置的③ org.apache.flink.client.program.PackagedProgram#invokeInteractiveModeForExecution调用用户代码的main方。
我们进入到font color=\"#ff0000\
ApplicationClientProtocolPBClientImpl
5
在这个方法里面有选择创建哪种类型的客户端这里面依次添加了 Generic、Yarn(这个模式会设置一个org.apache.flink.yarn.cli.FlinkYarnSessionCli,后面会启动这个类)和Default三种命令行客户端(后面根据isActive()按顺序选择),
org.apache.flink.client.cli.CliFrontend#run
ClientRMService
我们进入到startAppMaster,会跳转到org.apache.flink.yarn.YarnClusterDescriptor#startAppMaster这个方法里面就是一大推各种校验上传文件之类的大概300多行:初始化文件系统(HDFS)上传文件的工具类yarn重试次数,默认2不是高可用重试次数为1// 多次调用上传HDFS的方法,分别是:// => systemShipFiles:日志的配置文件、lib/目录下除了dist的jar包// => shipOnlyFiles:plugins/目录下的文件// => userJarFiles:用户代码的jar包fileUploader.registerMultipleLocalResources (... ...);// 上传和配置ApplicationMaster的jar包:flink-dist*.jar// 上传flink配置文件// 将JobGraph写入tmp文件并添加到本地资源,并上传到HDFS// 上传flink配置文件// jobmanager内存配置,因为要启动ApplicationMaster,JM就是在里面运行的,所以内存之类的相关参数要配置//封装启动AM container的Java命令,这边就是安装启动ApplicationMasterContainer容器final ContainerLaunchContext amContainer = font color=\"#ff0000\
创建和启动 Dispatcher => dispatcher会创建和启动JobMaster,dispatcherRunner = dispatcherRunnerFactory.createDispatcherRunner(…)我们进入createDispatcherRunner,跳转到org.apache.flink.runtime.dispatcher.runner.DispatcherRunnerFactory#createDispatcherRunner抽象方法找实现方法org.apache.flink.runtime.dispatcher.runner.DefaultDispatcherRunnerFactory#createDispatcherRunner这个具体方法里面有DefaultDispatcherRunner.create,我们进入create,再进入DispatcherRunnerLeaderElectionLifecycleManager.createFor的createFor,一直往下跳转到org.apache.flink.runtime.dispatcher.runner.DispatcherRunnerLeaderElectionLifecycleManager#DispatcherRunnerLeaderElectionLifecycleManager发现有
找到ApplicationClientProtocolPBServiceImpl.java的font color=\"#ff0000\
44
11.getStreamGraph(jobName
42
8
21.找到并进入org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint#main
23.③ 进入runClusterEntrypoint
回到org.apache.flink.client.cli.CliFrontend#run往下找org.apache.flink.client.cli.CliFrontend#getEffectiveConfiguration这个方法是一个抽象的方法,进入getEffectiveConfiguration方法找到org.apache.flink.yarn.cli.FlinkYarnSessionCli#toConfiguration会在这边的方法里面把我们的各种配置参数设置到org.apache.flink.configuration.Configuration这个类中,后面的参数会从这个类获取,
25
org.apache.flink.runtime.jobmaster.JobMaster#createScheduler这个方法里面用到工厂模式schedulerNGFactory.createInstance,接着一直深入到org.apache.flink.runtime.scheduler.SchedulerBase#createExecutionGraph发现这个方法里面有ExecutionGraphBuilder.buildGraph这边就是真正的创建ExecutionGraph
发现会调用这个类org.apache.flink.client.cli.CliFrontendParser,这个类是存了各种参数符号,后面要做匹配用,然后进入
回到org.apache.flink.client.cli.CliFrontend#run发现下面调用了org.apache.flink.client.cli.CliFrontend#executeProgram这个方法就是核心逻辑:执行程序
22
14.② 进入execute
我们进入font color=\"#ff0000\
26.② 进入createResourceManager
启动后slot pool开始向slot manager请求slot,进入ResourceManagerLeaderListener,一直往下走就会到SlotPoolImpl 这个是SlotPool的实现类,在这个类的org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl#requestSlotFromResourceManager方法真正去向申请了Flink里面的ResourceManager的SlotManager申请资源,最后org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl#allocateResource(这个allocateResource方法其实就是RM的)的方法去向flink的ResourceManager申请资源,
15
16.① 进入getJobGraph
46
13
入口类org.apache.flink.runtime.jobmaster.JobMaster#startJobExecution方法开始一直往下追到org.apache.flink.runtime.scheduler.DefaultExecutionVertexOperations#deploy这边就是真正的开始部署,继续到org.apache.flink.runtime.executiongraph.Execution#deploy这边就是包含了从Execution Graph到真正物理执行图的转换,比如将IntermediateResultPartition转化成ResultPartition,ExecutionEdge转成InputChannelDeploymentDescriptor(最终会在执行时转化成InputGate)。如果一直往下追,最后会到达org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput#processElement这个已经是最底层了这边会看到流状态、判断是否是水印、判断是否是迟点的数据这些东西,这边还有一个功能就是emitRecord找到实现方法org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.StreamTaskNetworkOutput#emitRecord这里面有调用processElement找实现OneInputStreamOperator(这下面非常多算子,每个算子对应一个operator),比如org.apache.flink.streaming.api.operators.StreamMap#processElement发现有output.collect(element.replace(userFunction.map(element.getValue())));userFunction.map() 就是用户定义的MapFunction里的map方法element这个是数据把它replace替换成userFunction用户定义的函数(就是用户复写的map{把数据element.getValue()传入})通过collect采集器往下游发送
32.还是进入createJobManagerRunner
ApplicationClientProtocolPBServiceImpl
org.apache.flink.client.cli.CliFrontend#main
顺着org.apache.flink.client.cli.CliFrontend#main方法里面的代码一直往下会进入到org.apache.flink.client.cli.CliFrontend#run这个方法非常重要,进入这个run方法里面,
进入font color=\"#ff0000\
12.进入execute方法
39.开始启动YarnTaskExecutorRunne
我们进入ActiveResourceManager,最终创建的就是Flink自己的org.apache.flink.runtime.resourcemanager.ResourceManager#ResourceManager,这个是用来和Yarn的ResourceManager做通信的。
36.④启动RM,进入ResourceManager类
20.进入getYarnJobClusterEntrypoint()
我们进入initialize,跳转到org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager#initialize再次进入initialize方法一直进入initialize方法直到org.apache.flink.yarn.YarnResourceManagerDriver#initializeInternal(这个方法很重要会对接到TaskManager启动那边)这边方法里面有:
43
0 条评论
回复 删除
下一页