Flink源码——Job 提交、部署流程源码分析之构建JobGraph
2022-04-08 16:41:33 13 举报
Flink源码——Job 提交、部署流程源码分析之构建JobGraph
作者其他创作
大纲/内容
获取一个 StreamGragh Translator
当前要处理的 StreamNode
获取 startStreamNode
return new StreamConfig(jobVertex.getConfiguration());
PlanTranslator planTranslator = new PlanTranslator();
\tfinal ClusterClientProvider<ClusterID> clusterClientProvider = clusterDescriptor.retrieve(clusterID);
jobVertex.setMaxParallelism(streamNode.getMaxParallelism());
! currentNodeId.equals(startNodeId)
添加 UserArtifactEntries
创建一个 JobVertex
6、两个节点间物理分区逻辑是 ForwardPartitioner
2
设置 SlotSharingAndCoLocation
9、用户没有禁用 chain
new StreamConfig(new Configuration())
treamGraphTranslator streamGraphTranslator = new StreamGraphTranslator();
\tStreamNode streamNode = streamGraph.getStreamNode(streamNodeId);
设置 PhysicalEdges
!downStreamOperator.getChainingStrategy() != ChainingStrategy.ALWAYS;
设置slot的共享和coLocation。同一个coLocationGroup的task需要在同一个slot中运行CoLocationGroup =设置标识共存组的键。具有相同共定位键的操作符将由调度器,将它们对应的子任务放到相同的槽中
if(currentNodeId.equals(startNodeId))
用于创建 RestClusterClient 的Provider: ClusterClientProvider
edge.getShuffleMode() != ShuffleMode.BATCH;
递归
生产JobGraph pipeline 其实就是 StreamGraph
设置托管内存权重比(托管内存是由 Flink 管理的用于排序、哈希表、缓存中间结果及 RocksDB State Backend 的本地内存。)
生成一个 JobVertexID
return clusterClient.submitJob(jobGraph)
chain 结束
2、上下游节点都在同一个 slot group 中
见:Flink源码——Job 提交、部署流程源码分析之提交jobGraph文件以及Job资源到集群
FlinkPipelineTranslator pipelineTranslator = getPipelineTranslator(pipeline);
setPhysicalEdges();
1
configureCheckpointing();
是个递归方法,当一个 SteamEdge chain 在一起了之后,也有可能继续与前一个 Operator 或者 后一个 Operator chain 在一起
(edge.getPartitioner() instanceof ForwardPartitioner);
return jobGraph;
List<StreamEdge> chainableOutputs = new ArrayList<StreamEdge>();
添加 OperatorCoordinator
jobGraph.setScheduleMode(streamGraph.getScheduleMode());
int parallelism = streamNode.getParallelism();
循环
返回 JobGragh
!(downStreamOperator == null || upStreamOperator == null);
ClusterClient<ClusterID> clusterClient = clusterClientProvider.getClusterClient();
\tchainableOutputs.add(outEdge);
创建访问器,公开与基础的执行相关的配置设置。
chain 在一起的多条边 connect 在一起
StreamExecutionEnvironment.executeAsync()
4
streamGraph.isChainingEnabled();
判断是否能 chain 在一起九种条件全部满足
return streamGraph.getJobGraph(null);
JobClient jobClient = jobClientFuture.get();
vertexConfigs.get(vertex).setInPhysicalEdges(edgeList);
设置最大并行度
return getJobGraph(null);
for(StreamEdge chainable : chainableOutputs) 把可以 chain 在一起的 StreamEdge 两边的 Operator chain 在一个形成一个 OperatorChain\t
N
将生成好的 JobVertex 加入到: JobGraph
//注释: 将每个 JobVertex 的入边集合也序列化到该 JobVertex 的 StreamConfig 中\t\t// 注释: 出边集合,已经在 上面的代码中,已经搞定了
setSlotSharingAndCoLocation();
生成obGragh
设置 SavepointRestoreSettings
upStreamVertex.isSameSlotSharingGroup(downStreamVertex);
设置 SnapshotSettings, checkpoint 相关的设置
传递执行环境配置, 设置 ExecutionConfig
7、两个算子间的shuffle方式不等于批处理模式
\tjobGraph.setExecutionConfig(streamGraph.getExecutionConfig());
异步提交执行 StreamGraph跳转到: AbstractSessionClusterExecutor 的 execute() 方法
*注释: StreamGraph 经过优化后生成了 JobGraph,提交给 JobManager 的数据结构 *注释: 关于 JobGraph 的三个重要的概念: * 注释: 1. JobVertex: JobVertex 相当于是 JobGraph 的顶点,跟 StreamNode 的区别是,它是 Operator Chain 之后的顶点,会包含多个 StreamNode; * 注释: 2. IntermediateDataSet: 它是由一个 Operator(可能是 source,也可能是某个中间算子)产生的一个中间数据集; * 注释: 3. JobEdge: 它相当于是 JobGraph 中的边(连接通道),这个边连接的是一个 IntermediateDataSet 跟一个要消费的 JobVertex。
存储可 chain 的 StreamEdge
jobGraph.addVertex(jobVertex);
clusterClient = RestClusterClient
currentNodeId.equals(startNodeId)
* 1、一个 StreamNode 也可以认为是 做了 chain 动作 StreamNode -> JobVertex\t\t * 2、两个 StreamNode 做了 chain 动作 StreamNode + StreamNode -> JobVertex
跳转
List<StreamEdge> transitiveOutEdges = new ArrayList<StreamEdge>();
根据 StreamGraph 通过 StreamingJobGraphGenerator 来创建一个 JobGragh
Y
JobGraph:StreamGraph 经过优化后生成了 JobGraph,提交给 JobManager 的数据结构它包含的主要抽象概念有:1、JobVertex:经过优化后符合条件的多个 StreamNode 可能会 chain 在一起生成一个 JobVertex,即一个JobVertex 包含一个或多个 operator,JobVertex 的输入是 JobEdge,输出是 IntermediateDataSet。 2、IntermediateDataSet:表示 JobVertex 的输出,即经过 operator 处理产生的数据集。 producer 是JobVertex,consumer 是 JobEdge。 3、JobEdge:代表了job graph中的一条数据传输通道。source 是 IntermediateDataSet,target 是 JobVertex。即数据通过JobEdge由IntermediateDataSet传递给目标JobVertex。
获取 JobVertex 的并行度
JobVertexID jobVertexId = new JobVertexID(hash);
jobGraph.setSavepointRestoreSettings(streamGraph.getSavepointRestoreSettings());
StreamGraphTranslator.translateToJobGraph()
8、上下游的并行度一致
if(chainableOutputs.isEmpty()) {\t\t\t\tconfig.setChainEnd();\t\t\t}
config.setChainStart();
通过 StreamGraph 转换得到 JobGragh
3
* 1、MiniClusterClient 本地执行\t\t\t * 2、RestClusterClient 提交到 Flink Rest 服务接收处理
1、下游节点的入度为1 (也就是说下游节点没有来自其他节点的输入)
return jobClient;
设置 ScheduleMode
final PipelineExecutorFactory executorFactory = executorServiceLoader.getExecutorFactory(configuration);
StreamingJobGraphGenerator.createJobGraph()
\tconfig.setChainIndex(chainIndex);
downStreamVertex.getInEdges().size() == 1
nonChainableOutputs.add(outEdge);
setManagedMemoryFraction()
4、上游节点的 chain 策略为 ALWAYS 或 HEAD(只能与下游链接,不能与上游链接,Source 默认 是 HEAD)
不能 chain 一起的话,这里的 chainIndex 是从 0 开始算的,后面也肯定会走到 createJobVertex 的逻辑\t\t\t\t
Flink源码(1.11.x) Job 提交、部署流程源码分析之构建JobGraph
chain在一起的多个 Operator 创建成一个 JobVertex
获取一个 Plan 翻译器
List<StreamEdge> nonChainableOutputs = new ArrayList<StreamEdge>();
通过 FlinkPipelineTranslator 来转换获取到 JobGragh pipeline = StreamGraph
构建 JobGragh 返回
AbstractSessionClusterExecutor.execute()
!upStreamOperator.getChainingStrategy() == ChainingStrategy.NEVER;
executorServiceLoader = DefaultExecutorServiceLoader
存储不可 chain 的 StreamEdge
upStreamVertex.getParallelism() == downStreamVertex.getParallelism();
为节点生成确定性哈希,以便在提交未发生变化的情况下对其进行标识。
final ExecutionConfigAccessor executionConfigAccessor = ExecutionConfigAccessor.fromConfiguration(configuration);
获取 FlinkPipelineTranslator
5、下游节点的 chain 策略为 ALWAYS(可以与上下游链接,map、flatmap、filter 等默认是 ALWAYS)
提交jobGraph到集群
jobVertex.addOperatorCoordinator(new SerializedValue<>(coordinatorProvider));
jobGraph.addJars(executionConfigAccessor.getJars());\t\tjobGraph.setClasspaths(executionConfigAccessor.getClasspaths());\t\tjobGraph.setSavepointRestoreSettings(executionConfigAccessor.getSavepointRestoreSettings());
3、前后算子不为空
完成所谓的 StreamGraph 到 JobGraph 的转换
判断一个 StreamGraph 中的一个 StreamEdge 链接的上下游 Operator(StreamNode) 是否可以 chain 在一起
阻塞获取 StreamGraph 的执行结果
接:Flink源码——Job 提交、部署流程源码分析之构建StreamGraph
构建一个 JobGragh 对象
设置托管内存权重比
0 条评论
下一页