Flink源码——Job 提交、部署流程源码分析之构建StreamGraph
2022-04-08 17:17:56 23 举报
Flink源码——Job 提交、部署流程源码分析之构建StreamGraph
作者其他创作
大纲/内容
返回 VertexID = StreamNodeID
Y
添加一个 StreamNode
定义该当前 SreamNode 的 入边
把生成好的 StreamGragh 引用给返回对象,然后清空,保持当前这个 StreamGraphGenerator 依然是一个空的可再生利用的
N由于是递归调用的,可能已经完成了转换
transformedIds = transformSink((SinkTransformation<?>) transform);
StreamGraphGenerator.generate();
* 1、如果上游 StreamNode 和 下游 StreamNode 的并行度一样,则使用: ForwardPartitioner 数据分发策略\t\t\t * 2、如果上游 StreamNode 和 下游 StreamNode 的并行度不一样,则使用: RebalancePartitioner 数据分发策略
2
StreamGraph streamGraph = getStreamGraphGenerator().setJobName(jobName).generate();
构建 StreamNode 之间的 边(StreamEdge) 对象
else if(virtualSelectNodes.containsKey(upStreamVertexID))
注释: StreamNode 对应到一个 Operator, * 一个 Operator 对应到一个 Transformation * 一个 Transformation 对应到一个 Function
: 初始化一个容器用来去存储 已经转换过的 Transformation
NOneInputTransformation
見:Job 提交、部署流程源码分析之构建JobGraph
设置各种属性
递归
Flink 的一个 Job,最终,归根结底,还是构建一个高效率的能用于分布式并行执行的 DAG 执行图。1、帮我们把上下游两个相邻算子如果能chain到一起,则chain到一起做优化 2、chain到一起的多个Operator就会组成一个OperatorChain,当OperatorChain执行的时候,到底要 执行多少个 Task,则就需要把 DAG 进行并行化变成实实在在的Task来调度执行一个 Flink 流式作业,从 Client 提交到 Flink 集群,到最后执行,总共会经历四种不同的状态。总的来说:1、Client 首先根据用户编写的代码生成 StreamGraph,然后把 StreamGraph 构建成 JobGraph 提 交给 Flink 集群主节点 2、然后启动的 JobMaster 在接收到 JobGraph 后,会对其进行并行化生成 ExecutionGraph 后调度 启动 StreamTask 执行。 3、StreamTask 并行化的运行在 Flink 集群中的,就是最终的物理执行图状态结构。Flink 中的执行图可以分成四层:StreamGraph ==> JobGraph ==> ExecutionGraph ==> 物理执行图。StreamGraph:是根据用户通过 Stream API 编写的代码生成的最初的图。用来表示程序的拓扑结构。JobGraph:StreamGraph 经过优化后生成了 JobGraph,提交给 JobManager 的数据结构。主要的优化为,将多个符合条件的节点 chain 在一起作为一个节点,这样可以减少数据在节点之间流动所需要的序列化反序列化传输消耗。ExecutionGraph:JobManager 根据 JobGraph 生成 ExecutionGraph。ExecutionGraph 是JobGraph 的并行化版本,是调度层最核心的数据结构。物理执行图:JobManager 根据 ExecutionGraph 对 Job 进行调度后,在各个 TaskManager 上部署Task 后形成的图,并不是一个具体的数据结构。 关于这四层之间的演变,请看下图:
当 StreamGraph 生成好了,则之前各种算子转换得到的 DataStream 就没用了。
Collection<Integer> inputIds = transform(transform.getInput());
设置ShuffleMode
设置并行度
\tgetStreamNode(edge.getSourceId()).addOutEdge(edge);
递归调用 input2 的 Transformation 处理完后才能处理后面
7
生成一个 StreamNode
设置输入类型
将该 StreamEdge 加入到该 StreamNode 的 inEdges 集合中
递归调用 input1 的 Transformation 处理完后才能处理后面
NTwoInputTransformation
streamGraph.setXXX
递归调用 input 的 Transformation 处理完后才能处理后面
如果没有设置 partitioner
alreadyTransformed.sizze()==0
由 算子 生成 Transformation 来构建 StreamGraph
获取执行结果
if(virtualSideOutputNodes.containsKey(upStreamVertexID))
if(alreadyTransformed.containsKey(transform)){return alreadyTransformed.get(transform);}
6
Class<? extends AbstractInvokable> invokableClass = operatorFactory.isStreamSource() ? SourceStreamTask.class : OneInputStreamTask.class;
// 已经 Transform 的 Transformation 会放在这个集合中//关键实现:根据 transform 的类型,做相应不同的转换 //将当前 Transformation 转换成 StreamNode 和 StreamEdge,便于构建 SreamGraphCollection<Integer> transformedIds;
找到 该 StreamNode 的上游顶点
alreadyTransformed.clear(); alreadyTransformed = null; streamGraph = null; return builtStreamGraph;
Collection<Integer> inputIds1 = transform(transform.getInput1());
清空掉所有的 算子
给 上游 StreamNode 设置 出边
outEdges.add(outEdge);
Flink Job 的默认名称: Flink Streaming Job
* 获取 invokableClass, 这是非常重要的信息\t\t * 1、如果是 Source, 则执行: SourceStreamTask\t\t * 2、如果不是 Source, 则执行: OneInputStreamTask\t\t * 到底启动类是谁,当初在 client 中构建 StreamGraph 的时候就已经指定了。
else if(virtualPartitionNodes.containsKey(upStreamVertexID))
5
inEdges.add(inEdge);
transformedIds = transformSource((SourceTransformation<?>) transform);
添加一个 Operator(StreamGraph 端会添加一个 StreamNode)
if(partitioner == null && upstreamNode.getParallelism() == downstreamNode.getParallelism()) {\t\t\t\tpartitioner = new ForwardPartitioner<Object>();\t\t\t} else if(partitioner == null) {\t\t\t\tpartitioner = new RebalancePartitioner<Object>();\t\t\t}
SourceTransformation
getStreamGraph(jobName) 获取 StreamGraph
1
this.transformations.clear();
for(Transformation<?> transformation : transformations) { //注释: 从 Env 对象中,把 Transformation 拿出来,然后转换成 StreamNode // 注释: Function --> Operator --> Transformation --> StreamNode transform(transformation); }
StreamNode upstreamNode = getStreamNode(upStreamVertexID);
return Collections.singleton(source.getId());
添加 SourceTransformation 对应的 StreamNode
Y递归
final JobExecutionResult jobExecutionResult = getJobExecutionResult(jobClient);
return streamGraph;
找到 该 StreamNode 的下游顶点
3
StreamGraph sg = getStreamGraph(jobName);
构建一个StreamGraph
获取 share group, 默认是 default
生成 StreamGraph
设置输出类型
execute(Graph) 执行 StreamGraph
if(shuffleMode == null) {\t\t\t\tshuffleMode = ShuffleMode.UNDEFINED;\t\t\t}
env.execute(\"Streaming WordCount\");
4
transform(transformation);
getStreamNode(edge.getTargetId()).addInEdge(edge);
final JobClient jobClient = executeAsync(streamGraph);
1、getStreamGraphGenerator() = StreamGraphGenerator 2、调用 generate() 方法 生成 StreamGragh
* 注释: 执行各种算子的 transformation: 由 算子 生成 Transformation 来构建 StreamGraph * 当时在执行各种算子的时候,就已经把算子转换成对应的 Transformation 放入 transformations 集合中了 * 自底向上(先遍历 input transformations) 对转换树的每个 transformation 进行转换
给 下游 StreamNode 设置 入边
Flink源码(1.11.x) Job 提交、部署流程源码分析之构建StreamGraph
StreamGraph.addOperator
SinkTransformation
添加边upStreamVertexID = upStreamNodeID
添加 SinkTransformation 对应的 StreamNode
将该 StreamEdge 加入到该 StreamNode 的 outEdges 集合中
* 当时在生成 StreamGraphGenerator 的时候,就已经把各种属性设置到 StreamGraphGenerator 中了\t\t * 现在把 StreamGraphGenerator 中的属性, 设置到 StreamGraph 中
创建 输入和输出 序列化器
else
StreamNode downstreamNode = getStreamNode(downStreamVertexID);
List<Integer> allInputIds = new ArrayList<>();\t\tallInputIds.addAll(inputIds1);\t\tallInputIds.addAll(inputIds2);
StreamGraph.addCoOperator
执行 StreamGragh生成JobGragh
Collection<Integer> inputIds2 = transform(transform.getInput2());
alreadyTransformed = new HashMap<>();
添加一个 StreamEdge
添加 StreamEdge
return execute(sg);
return execute(DEFAULT_JOB_NAME);
getStreamGraphGenerator().setJobName(jobName)
0 条评论
下一页
为你推荐
查看更多