Flink源码——CliFronted 提交流程(客户端提交Flink应用Jar包启动)
2022-04-08 17:19:44 11 举报
Flink源码——CliFronted 提交流程(客户端提交Flink应用Jar包启动)
作者其他创作
大纲/内容
\tfinal PackagedProgram program = getPackagedProgram(programOptions);
flink.txt文件
执行
获取 main 方法实例
得到 运行主类的 main 方法实例entryClass 自己编写的应用程序
mainMethod = entryClass.getMethod(\"main\
加载得到 main Class
检查参数的长度
if(args.length < 1) {\t\t\tCliFrontendParser.printHelp(customCommandLines);\t\t\tSystem.out.println(\"Please specify an action.\");\t\t\treturn 1;\t\t}
org.apache.flink.client.cli.CliFrontend
提交jobaction = run
通过 FLINK_CONF_DIR 变量找到 conf 目录1. find the configuration directory
hasMainMethod(mainClass)
Flink源码(1.11.x) CliFronted 提交流程(客户端提交Flink应用Jar包)
解析好的对应的参数
运行\t\t\t * 解析命令行并并开始请求操作\t\t\t *\tflink run class arg1\t\t\t * args[0] = run
有效配置 1、activeCommandLine2、commandLine3、programOptions\t程序参数4、jobJars\t\t\t依赖jar
去提交一个 jar 到 Fink 集群运行
return createPluginManagerFromRootFolder(PluginConfig.fromConfiguration(configuration));
加入一个: FlinkYarnSessionCli
\tfinal Configuration configuration = GlobalConfiguration.loadConfiguration(configurationDirectory);
依赖jar处理
打印输出一些环境信息
返回 DefaultPluginManager
flink run 提交程序执行的入口类
flink-dist 子项目中,位于 flink-bin 下的 bin 目录
final List<URL> jobJars = program.getJobJarAndDependencies();
加入一个: DefaultCLI
program.invokeInteractiveModeForExecution();
\tprogram = buildProgram(programOptions);
获取第一个参数:flink run 命令中的 run 命令
解析 conf 目录下的 flink-conf.yaml 配置文件2. load the global configuration
构建 Program
初始化DefaultPluginManager
一般在使用 flink run 的时候会指定运行主类,否则不指定的话,就从 mainfest 中解析得到
return PackagedProgram.newBuilder()\t\t\t.setJarFile(jarFile)\t\t\t.setUserClassPaths(classpaths)\t\t\t.setEntryPointClassName(entryPointClass)\t\t\t.setConfiguration(configuration)\t\t\t.setSavepointRestoreSettings(runOptions.getSavepointRestoreSettings())\t\t\t.setArguments(programArgs)\t\t\t.build();
设置主类: .setEntryPointClassName(entryPointClass)
加载:FlinkYarnSessionCli 和 DefaultCLI3、load the custom command lines
调用运行主类的 main 方法跳转到运行主类的 main 方法
customCommandLines.add(new DefaultCLI(configuration));
int retCode = SecurityUtils.getInstalledContext().runSecured(() -> cli.parseParameters(args));\t\t\tSystem.exit(retCode);
main()
final String configurationDirectory = getConfigurationDirectoryFromEnv();
String action = args[0];
\tString entryPointClass = runOptions.getEntryPointClassName();
run(params);
当用户把 Flink 应用程序打成 jar 使用 flink run ... 的 shell 命令提交的时候,底层是通过 CliFrontend来处理。底层的逻辑,就是通过反射来调用用户程序的 main() 方法执行。在刚组建内部,主要有以下几件事要做:1、根据 flink 后面的执行命令来确定执行方法(run ==> run(params)) 2、解析 main 参数,构建 PackagedProgram,然后执行 PackagedProgram 3、通过反射获取应用程序的 main 方法的实例,通过反射调用执行起来总的来说,就是准备执行 Program 所需要的配置,jar包,运行主类等的必要的信息,然后提交执行
exec $JAVA_RUN $JVM_ARGS $FLINK_ENV_JAVA_OPTS \"${log_setting[@]}\" -classpath \"`manglePathList \"$CC_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS\"`\" org.apache.flink.client.cli.CliFrontend \"$@\"
0 条评论
下一页