Flink内核源码解析
2023-10-05 09:47:01 7 举报
AI智能生成
Flink内核源码解析
作者其他创作
大纲/内容
pro-Job:提交命令flink后面都是args
CliFrontend:客户端
YarnJobClasterEntryPoint:Yarn的JobManager
TaskExectorRuner:task任务运行
org.apache.flink.client.cli.clifrontend:程序的入口
config.sh:环境信息的获取
1.getConfigurationDirectoryFromEnv:查找配置目录
2.loadConfiguration:加载全局配置 获取 Flink yaml 配置文件 加载 YAML 资源
3.loadCustomCommandLines:加载自定义命令行 commandline:依次添加 GenericCLI-->Yarn-->DefaultCLI 最后必须添加 DefaultCLI,active CustomCommandLine按顺序排列,DefaultCLI is Active 始终返回 true
4.CliFrontend:实例化客户端然后将全局配置和自定义命令行放进去 给CliFrontend的配置创建安全配置
1.检查操作 如果参数小于1 打印帮助界面 并请指定一个操作
2.获取参数
3.匹配操作:提交的参数是什么参数 run运行 rumApplication操作运行应用程序 list集合 info信息 cancel取消 stop停止 savepotin提交检查点
parseAndRun方法:分析命令行参数并启动请求的操作
5. parseAndRun:客户端调用方法 解析并运行配置
1.getRunCommandOptions:获取run动作,默认配置选项 buildGeneralOptions:加载默认的配置getProgramSpecificOptions:获取特定于程序的选项(可识别的配置项)
2.getCommandLine:获取命令行 mergeOptions:将命令行选项和自定义配置进行一个匹配 parse:根据指定的选项和属性分析参数( handleToken:处理任何命令行令牌(- -- 赋值))
3.validateAndGetActiveCommandLine:根据之前添加的顺序 挨个判断 是否 Generic、Yarn、Default
4.ProgramOptions:引用JAR文件程序和命令行选项加载 -c类路径 -j其它jar包 -p并行度 -d分离模式
5.getJobJarAndDependencies:获取作业Jar和其它依赖项
6.getEffectiveConfiguration:获取有效的配置(HA的ID、target(session、per-job)、JobManager任务、taskManager内存、每个Tm的Slot数量......)
7.PackagedProgram:此类封装表示一个程序 将ProgramOptions和有效配置封装起来
8.executeProgram:执行程序
run方法:执行运行操作
1.isActive
2.GenericCLI 中的 isActive: -e:已弃用、请改用 -t 选项,该选项也可与“应用程序模式”一起使用、用于执行给定作业的执行程序的名称 -t:给定应用程序的部署目标
3.FallbackYarnSessionCli 中的 isActive: -m Yarn-Cluster 或者 yarn有应用ID 或者 命令行指定了 或者 执行器是yarn的 只要满足一种 表示活动的
4.DefaultCLI 中的 isActive:直接返回true
validateAndGetActiveCommandLine方法
1.setContextClassLoader:配置执行环境上下文,用户代码里的getExecutionEnvironment就会拿到这些环境信息 (执行程序服务加载程序、配置、用户代码类装入器、强制实施单作业执行、抑制系统输出)
2.invokeInteractiveModeForExecution:监视当前线程的用户系统退出 callMainMethod:调用主方法(mainMethod.invoke:调用用户代码的main方法 执行代码)
executeProgram方法
CliFrontend类
1.execute:最后执行代码调用的方法
1.getJobGraph:将流图(StreamGraph) 转换成 作业图(JobGraph)
1.getClusterDescriptor:创建了YarnClient 初始化、启动了YarnClient
1.初始化、创建Hadoop的文件系统(HDFS)
2.YarnApplicationFileUploader:Yarn应用的文件上传器(FS、对应的HDFS路径) 高可用配置 默认2次、添加用户Jar包 上传flink依赖 将log4j配置文件 插入进去 上传flink的配置文件(flink.conf.yaml)
3.flink主要的依赖:dist.jar yarn能帮我们启动flink
4.创建Map用来存储AM的环境变量和类路径
5.将之前封装的map 设置到容器里
6.submitApplication(Yarn的提交流程):前面做了很多上传、环境配置、终于可以上传提交应用了
StartAppMaster方法
YarnClusterClientFactory类
2.createClusterDescriptor:集群描述器 创建、启动了 YarnClient,包含了一些yarn、flink的配置和环境信息(YarnClusterClientFactory)
3.fromConfiguration:创建一些简单的配置项
4.getClusterSpecification:获取集群特有资源配置:jobManager内存、taskManager内存、每个TaskManager的slot数
5.deployJobCluster:部署任务(部署前检查:jar包路径、conf路径、yarn最大核数、检查指定的yarn队列是否存在、检查yarn是否有足够的资源--RM_scheduler_minimum_allocation(yarn最小分配的参数) -d分离模式 提交完可以退出了 normal一直占用 保持连接) 启动任务StartAppMaster
AbstractJobClusterExecutor类:
2.executeAsync:异步触发程序执行。环境将执行导致“接收器”操作的程序的所有部分(AbstractJobClusterExecutor)
StreamExecutionEnvironment类
1.初始化服务:RPC相关
创建和启动 WebMonitorEndpoint(页面相关的)
创建ResourceManager
1.验证Job的状态和启动JobManager
1.确保我们接收Rpc和异步调用 异步不阻塞调用
2.真正启动JobMaster服务:StartHeartBeatServcies
3.重置和启动调度器
4.启动心跳服务:TM、RM
5.启动槽池:slotPool
1.创建注册对象
2.开始注册、注册成功之后 调用onRegistrationSuccess()方法
start方法
RegistereRpcConnection类
ResourceManagerLeaderListener内部类
notifyLeaderAddress:直接通知侦听器,因为我们已经知道JobManager的地址
6.作业准备就绪,尝试建立与RM连接、接收到领导通知后、就会建立连接、slotPool开始请求插槽
7.connectToResourceManager:slotPool连接到ResourceManager请求资源
JobMaster类
2.JobMaster服务启动
JobManagerRunnerImpl类
创建JobManager 启动JobMaster
RunJob方法
1.onStart方法:启动dispatcher服务、启动JobMaster
DisPatcher类
1.disPatcher启动为了做两件事:接收我们的作业、创建和启动Joster (Dispatcher类)
DefaultDisPatcherGateWayServiceFactory.java方法:创建和启动Dispatcher
1.创建Yarn的ResourceManager的客户端,并且初始化和启动
2.创建yarn的nodeManager的客户端,并且初始化和启动
YarnResourceManagerDriver类
1.创建Yarn的RM和NM的客户端,初始化和启动
2.通过选举服务,启动ResourceManager
3.startServiceOnLeadership:启动心跳服务 跟 TaskManager、JobMaster
RM内部的 SlotManager 去向 Yarn的RM申请资源
4.slotManager:启动槽管理器(真正管理的是ResourceManager)
5.startNetWorker:启动节点
ResourceManager类
启动ResourceManager(Yarn模式的)
2.创建和启动 JobManager里的组件(Dispatcher(有高可用)、ResourceManger(有高可用)、JobManager) Dispatcher和ResourceManager的高可用选举都是由StandAlone grantLeaderShip(HighAvailabiliterServices.DEFAULT_LEADER_ID):每个组件选举都是由这个方法
YanJobClusterEntryPoint类
该类是在Yarn容器中运行TaskExecutor的可执行入口点
1.taskExecutor.start():通过Rpc服务,启动TaskExecutor,找它的OnStart方法
1.onStart:启动服务
invokeRegistration:开始注册,调用的是这个方法
TaskExecutorToResourceManagerConnection类
2.连接ResourceManager注册Slot
1.创建和注册 新的这些slot
挂起表示要给分配资源
2.slot排队等待挂起
3.如果等待的slot不为空、分配slot
4.挂起的请求都已满足了,你暂时没事
将槽挂起
5.allocateSlot:如何分配slot
6.requesSlot:分配完之后,通知 TM 提供给 JM
SlotManagerImpl类
3.一直点点 直到SlotManagerImpl类
4.根据RM命令,分配自己的Slot
5.向JobManager提供slot
2.TaskExecutor类
YarnTaskExecutorRunner类
任务提交流程
Flink内核源码解析
0 条评论
回复 删除
下一页