flink源码笔记
2021-06-28 16:01:59 40 举报
AI智能生成
Flink源码笔记主要记录了对Apache Flink的源代码进行深入理解和学习的过程。Flink是一个开源的流处理框架,用于处理无界和有界的数据流。在这份笔记中,详细记录了Flink的核心概念、数据处理模型、任务调度机制、内存管理策略等方面的实现细节。同时,还探讨了Flink与其他大数据处理框架(如Hadoop、Spark)的异同点以及在实际应用中的优劣势。通过对Flink源码的学习,可以更好地理解其设计理念和技术架构,为在实际项目中应用Flink提供有力支持。
作者其他创作
大纲/内容
flink
应用
数据清洗中的字符串转换
alibaba的fastjson
JSONObject
类图
分支主题
JSONArray
JSON.parse
源码
DataStream
transformation
StreamTransformation
outputType
typeUsed
get之后类似于不可变类型,get之前可以随意set
TypeInformation
之所以要抽象出这个类是因为不同类型的序列化手段和比较方式都不同;在join group过程中起作用
子类
CompositeType
方法
isKeyType
有一个不是keyType就不是keyType
全部都是keyType才是keyType
isSortKeyType
重写了equals
和canEqual
静态类
FlatFieldDescriptor
主要是封装了基本类型(非CompositeType的类型)和其keyPosition
TupleTypeInfoBase
构造器
属性
判断
子类型获取
getTypeAt
重写equals
TupleTypeInfo
fields最大不能超过25
getGenericParameters
返回值的Map的key是T0 T1等等
RowTypeInfo
默认的fieldName和TupleTypeInfo一致
可以自己提供fieldNames
toString
projectFields 静态方法
AtomicType
this.id = getNewNodeId();
通过反射获取
例子
不同的函数不一样
map函数
会getGenericInterfaces
然后遍历看哪个类型属于有泛型(ParameterizedType类型)且getRawType是MapFunction类型
然后getActualArguments[1]
返回即可
source
Function接口:用户通过继承该接口的不同子类来实现用户自己的数据处理逻辑,如上述中实现了SourceFunction这个子类,来实现从指定hostname和port来接收数据,并转发字符串的逻辑; StreamOperator接口:数据流操作符的基础接口,该接口的具体实现子类中,会有保存用户自定义数据处理逻辑的函数的属性,负责对userFunction的调用,以及调用时传入所需参数,比如在StreamSource这个类中,在调用SourceFunction的run方法时,会构建一个SourceContext的具体实例,作为入参,用于run方法中,进行数据的转发; StreamTransformation接口:该接口描述了构建一个DataStream的操作,以及该操作的并行度、输出数据类型等信息,并有一个属性,用来持有StreamOperator的一个具体实例; DataStream:描述的是一个具有相同数据类型的数据流,底层是通过具体的StreamTransformation来实现,其负责提供各种对流上的数据进行操作转换的API接口。
operation
union
union的datastream都必须是相同的返回值类型,否则会抛异常
union会把所有的transformation取出来放到一个ArrayList中,返回另一个DataStream并将UnionTransformation传入构造器中
map
通过反射获取返回类型
关联接口
RichFunction
提供open close生命周期方法,以及设置获取IterationRuntimeContext 或RuntimeContext的接口
AbstractRichFunction
对其进行了与context设置相关的最基本的实现
并没有复写open close生命周期方法
StreamMap
真正执行者,会把processElement delegate给 mapper
父类
OneInputStreamOperator
StreamOperator
主要是生命周期方法,和checkPoint方法
flatMap
关联类
StreamFlatMap
filter
StreamFilter
keyBy
BasicArrayTypeInfo或PrimitiveArrayTypeInfo
KeyedStream
timeWindow
WindowAssigner
SlidingEventTimeWindows
获取了最近一次窗口开始的时间戳
循环找出满足条件的所有窗口
triger
TrigerResult
window
实现类
TumblingEventTimeWindows
reduce
举例
PassThroughWindowFunction
WindowFunction
sum
sum函数也是一种特殊的reduce
addSource
只有继承了ParallelSourceFunction,isParallel才为true
如果想要并行的数据源,需要实现ParallelSourceFunction
默认并行度为1
相关类
StreamSource
SocketTextStreamFunction
run
cancel
addSink
split
SplitStream<Integer> split = someDataStream.split(new OutputSelector<Integer>() { @Override public Iterable<String> select(Integer value) { List<String> output = new ArrayList<String>(); if (value % 2 == 0) { output.add("even"); } else { output.add("odd"); } return output; }}); DataStream<Integer> evenStream = splitStream.select("even");DataStream<Integer> oddStream = splitStream.select("odd");evenStream.print();oddStream.print();
OutputSelector
上下游
shuffle
broadcast
reblance
轮询
rescale
global
Table api
StreamTableEnvironment
注册表
会调用registerTableInternal方法
名字不能重复,否则会报错
会存在rootSchema中
只有StreamTableSource可以注册
假如设置了行时间属性,就必须为eventTime类型
正则将逗号分隔的多个字段
调用registerDataStreamInternal
步骤
正常的列
时间列抽取,调整
包装成DataStreamTable
注册为table
select
sql api
sqlQuery
Calcite
parse
启动
生成Stream图
例子 flatMap
上下游的并行度一致的话就采用forward的模式,否则就会rebalance
例子source
slotSharingGroup
a、如果指定了分组名,则直接返回指定的值; b、如果没有指定分组名,则遍历输入的各个节点的分组名;b.1、如果所有输入的分组名都是一样的,则将这个一样的分组名作为当前节点的分组名; b.2、如果所有输入的分组名有不一样的,则返回默认分组名”default”;
例子keyBy
例子sink
StreamGraph
可以看出其构造函数中就是做了一些初始化的操作,给StreamGraph的各个属性设置初始值,都是一些空集合。 在获取到StreamGraphGenerator的实例后,继续看其generatorInternal方法的逻辑:
生成Job图
通过hash值生成id
如果用户对节点指定了一个散列值,则基于用户指定的值,产生一个长度为16的字节数组; 如果用户没有指定,则根据当前节点所处的位置,产生一个散列值,考虑的因素有:a、在当前StreamNode之前已经处理过的节点的个数,作为当前StreamNode的id,添加到hasher中; b、遍历当前StreamNode输出的每个StreamEdge,并判断当前StreamNode与这个StreamEdge的目标StreamNode是否可以进行链接,如果可以,则将目标StreamNode的id也放入hasher中,且这个目标StreamNode的id与当前StreamNode的id取相同的值; c、将上述步骤后产生的字节数据,与当前StreamNode的所有输入StreamNode对应的字节数据,进行相应的位操作,最终得到的字节数据,就是当前StreamNode对应的长度为16的字节数组。
uid —— 这个字段是用户设置的,用来在任务重启时,保障JobVertexID一致,一般是从之前的任务日志中,找出对应的值而设置的; userProvidedNodeHash —— 这个字段也是用户设置的,设置的用户自己产生的散列值。
串联简化操作
hashes和legacyHashes就是上面产生的每个StreamNode的ID对应的散列字节数组。 chainedOperatorHashes是一个map:其key是顺序链接在一起的StreamNode的起始那个StreamNode的ID,比如source->flatMap这个两个对应的StreamNode,在这个例子中,key的值就是source对应的id,为1; value是一个列表,包含了这个链上的所有操作符的散列值;这个列表中的每个元素是一个二元组,这个列表的值就是{[source的主hash,source的备用hash_1],[source的主hash,source的备用hash_2],[flatMap的主hash,flatMap的备用hash_1],…},对于这里的例子,列表中只有二个元素,为{[source的主hash,null],[flatMap的主hash,null]}
如果startNodeId已经被构建完成,则直接返回一个空集合; 如果还没有构建,则开始新的构建;显示递归构建链的下游节点,在下游节点都递归构建完成后,再构建当前节点; 如果当前节点是一个链的起始节点,则新建一个JobVertex,并将相关配置都通过StreamConfig提供的接口,配置到JobVertex的configuration属性中; 如果是链的中间节点,则将相关配置添加到其对应的StreamConfig对象中。
启动脚本分析
start-cluster.sh
jobmanager.sh
flink-daemon.sh
TMSlave start
config.sh
submitjob流程
handleMessage
监听模式
buildGraph
getVerticesSortedTopologicallyFromSources
addNodesThatHaveNoNewPredecessors
状态管理
reduceFunction
max
StreamGroupedReduce
同sum相同就是個reduceFunction
StreamTask
triggerCheckpoint->performCheckpoint->checkpointState,最终来到了checkpointingOperation。
在这里回调了每个StreamOperator的snapshotstate的方法
StreamOperator的接口里与checkpoint相关的接口方法
AbstractUdfStreamOperator
AbstractStreamOperator
AbstractStreamOperator是对StreamOperator的基础实现,在它的snapshotState方法中,分别调用了OperatorStateBackend和KeyedStateBackend的snapshot方法。特别注意,在调用这两个方法之前的snapshotState(snapshotContext)这个调用,它一方面实现了Raw的State的snapshot,一方面也实现了用户自定义的函数的State的更新。再说一下,后面的两个函数,snapshotState和initializeState,他们的形参都是一个context,是提供给用户来重新实现用户自己的state的checkpoint的。
StateBackend
DefaultOperatorStateBackend
非常长。。。。。。
本质上还是内存中的Map
List State本质上是 ArrayList
HeapKeyedStateBackend
performSnapshot 很长
StreamingRuntimeContext
AbstractKeyedState
是个hashmap
AbstractKeyedStateBackend
StateDiscriptor
watermark
kafka source
consumer.assignTimestampsAndWatermarks(new CanalEntryWatermark(TimeUnit.MINUTES.toMillis(1)));通过传入的
assignTimestamps
ExtractTimestampsOperator
假如比当前大就发出去,否则忽略
相当于增加了一个operator transformation
StreamTask的一个component
StreamInputProcessor
window内部组件(WindowOperator)
构造方法
也是OneInputStreamOperator
processElement
分配window
state
获取keyedState对应当前的key
windowState
以简单的内存HeapKeyedStateBackend为例
List
底层是ArrayList
Reducing
种类
TrigerContext
EventTimeTrigger
\\
WindowOperator的OnElement
清除窗口状态的代码逻辑
allowLateness
triger的触发时机
window.maxTimestamp() <= ctx.getCurrentWatermark()
drop的触发时机
综上,当allowlateness设置好后一个窗口可能会被多次触发
例子connect(stream2).process(new CoProcessFunction())
CoprocessedOperator
TwoInputStreamTask
StreamTwoInputPorcessor
JoinedStreasm.where(keySelector).equalTo(keySelector).window.apply()
最终转换成了coGroup
where.equalTo.window都一样属于复制粘贴的代码
然后会把他当成一简单的keyBy之后的window apply操作
如果trigger和evictor不存在也很简单,就会用默认提供的trigger,然而evictor是不必须的
JoinCoGroupoFunction的实现
双重循环,笛卡尔join
userfuction
全量 apply
增量 reduce
反压
LocalBufferPool
当Buffer满了之后自己就会被阻塞在向localBufferPool申请内存块的过程中
ui
getStackrace
最佳实践
配置获取
ParameterTool
api
载入
通过property文件载入
String propertiesFile = "/home/sam/flink/myjob.properties";ParameterTool parameter = ParameterTool.fromPropertiesFile(propertiesFile);
args
ParameterTool parameters = ParameterTool.fromArgs(args);
jvm参数
从系统属性获取您的配置值。启动JVM时,可以将系统属性传递给它:-Dinput=hdfs:///mydata。您也可以ParameterTool从这些系统属性初始化:
ParameterTool parameter = ParameterTool.fromSystemProperties();
传递
open方法中获取
全局参数,通过runtime获取
ParameterTool parameters = ParameterTool.fromArgs(args); // set up the execution environmentfinal ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();env.getConfig().setGlobalJobParameters(parameters);
特点
本身可序列化
TupleX
如果你往下游传入的参数较多,例如你有11个字段,不建议你使用Tuple11,建议你使用POJO(普通Java对象),而不是TupleX具有多个字段的数据类型。此外,使用POJO可以用于给大型Tuple命名,而不是显示的显示一大堆内容。例如下面:
DataStream<T> input = ...;// 基于event-time的滚动窗口input.assignTimestampsAndWatermarks(new AssignWatermark(Time.seconds(60))) .keyBy(<key 选择器>) .window(TumblingEventTimeWindows.of(Time.seconds(5))) .<windowed transformation>(<窗口函数>) .trigger(ContinuousEventTimeTrigger.of(<time>));
allowedLateness
默认情况下,当watermark通过end-of-window之后,再有之前的数据到达时,这些数据会被删除。为了避免有些迟到的数据被删除,因此产生了allowedLateness的概念。默认情况下,如果不指定allowedLateness,其值是0,简单来讲,allowedLateness就是针对event time而言,对于watermark超过end-of-window之后,还允许有一段时间(也是以event time来衡量)来等待之前的数据到达,以便再次处理这些数据。这里提一下window的触发条件:watermark < end-of-window + allowedLateness 之内到达的数据,都会被再次触发窗口的计算。注意只要晚到的数据,且满足条件是会持续不断触发窗口计算的!!
DataStream<T> input = ...;=// 基于event-time的滚动窗口input.assignTimestampsAndWatermarks(new AssignWatermark(Time.seconds(60))) .keyBy(<key 选择器>) .window(TumblingEventTimeWindows.of(Time.seconds(5))) .allowedLateness(<time>); .<windowed transformation>(<窗口函数>);
注意:对于trigger是默认的EventTimeTrigger的情况下,allowedLateness会再次触发窗口的计算,而之前触发的数据,会buffer起来,直到watermark超过end-of-window + allowedLateness()的时间,窗口的数据及元数据信息才会被删除。当数据还是晚于 allowedLateness怎么办?如果你不想丢弃这部分数据你可以使用Flink的side output功能时,你可以获取到因为晚到被丢弃的元素流。之后你可以针对此流做进一步处理。首先你需要在windowed流上通过sideOutputLateData(OutputTag)指明你想要获取晚到的元素,然后你就能在windowed operation的结果中获取到side-output流:
final OutputTag<T> lateOutputTag = new OutputTag<T>("late-data"){};DataStream<T> input = ...;DataStream<T> result = input .keyBy(<key 选择器>) .window(<窗口 分配器>) .allowedLateness(<time>) .sideOutputLateData(lateOutputTag) .<windowed transformation>(<窗口函数>);DataStream<T> lateStream = result.getSideOutput(lateOutputTag);
其中,allowedLateness只针对Event Time有效;allowedLateness可用于TumblingEventTimeWindow、SlidingEventTimeWindow以及EventTimeSessionWindows,要注意这可能使得窗口再次被触发,相当于对前一次窗口的窗口的修正(累加计算或者累加撤回计算);要注意再次触发窗口时,UDF中的状态值的处理,要考虑state在计算时的去重问题。最后要注意的问题,就是sink的问题,由于同一个key的同一个window可能被sink多次,因此sink的数据库要能够接收此类数据。
延迟控制
默认情况下,元素不会逐个传输到每个算子的,如果来一个传输一个势必导致不必要的网络流量,flink缓存待发送到网络的元素。虽然这种方法对于优化吞吐量有好处,但是当输入流不够快时或者输入流过小的时候,它可能会导致延迟问题(因为要等待buffer慢的时候才会发送)。要控制吞吐量和延迟,你可以在execution environment(或单个operator)上使用env.setBufferTimeout(timeoutMillis)来设置缓冲区填满的最大等待时间。如果超过该最大等待时间,即使缓冲区未满,也会被自动发送出去。该最大等待时间默认值为100 ms。除非你的应用对数据的实时性精确到毫秒,你可以调整这个参数。一般默认就好。
为了最大化吞吐量,可以设置setBufferTimeout(-1),这样就没有了超时机制,缓冲区只有在满时才会发送出去。为了最小化延迟,可以把超时设置为接近0的值(例如5或10 ms)。应避免将该超时设置为0,因为这样可能导致性能严重下降。
常用的配置
aggregator
0 条评论
回复 删除
下一页