flink源码笔记
2021-06-28 16:01:59 40 举报
AI智能生成
Flink源码笔记主要记录了对Apache Flink的源代码进行深入理解和学习的过程。Flink是一个开源的流处理框架,用于处理无界和有界的数据流。在这份笔记中,详细记录了Flink的核心概念、数据处理模型、任务调度机制、内存管理策略等方面的实现细节。同时,还探讨了Flink与其他大数据处理框架(如Hadoop、Spark)的异同点以及在实际应用中的优劣势。通过对Flink源码的学习,可以更好地理解其设计理念和技术架构,为在实际项目中应用Flink提供有力支持。
作者其他创作
大纲/内容
应用
数据清洗中的字符串转换
alibaba的fastjson
JSONObject
类图
分支主题
JSONArray
类图
分支主题
JSON.parse
public static Object parse(String text) { return parse(text, DEFAULT_PARSER_FEATURE); }
源码
DataStream
transformation
StreamTransformation
outputType
分支主题
分支主题
分支主题
分支主题
typeUsed
get之后类似于不可变类型,get之前可以随意set
TypeInformation
之所以要抽象出这个类是因为不同类型的序列化手段和比较方式都不同;在join group过程中起作用
子类
CompositeType
方法
isKeyType
分支主题
有一个不是keyType就不是keyType
全部都是keyType才是keyType
isSortKeyType
分支主题
重写了equals
分支主题
和canEqual
静态类
FlatFieldDescriptor
主要是封装了基本类型(非CompositeType的类型)和其keyPosition
分支主题
子类
TupleTypeInfoBase
构造器
分支主题
属性
分支主题
判断
分支主题
子类型获取
getTypeAt
分支主题
重写equals
分支主题
子类
TupleTypeInfo
构造器
分支主题
fields最大不能超过25
分支主题
fieldNames的名字是f0,f1等
方法
getGenericParameters
分支主题
返回值的Map的key是T0 T1等等
RowTypeInfo
构造器
分支主题
默认的fieldName和TupleTypeInfo一致
可以自己提供fieldNames
方法
toString
分支主题
projectFields 静态方法
分支主题
AtomicType
与CompositeType相对应,例如int,long等还有generic<Map>
分支主题
this.id = getNewNodeId();
分支主题
通过反射获取
例子
import java.lang.reflect.ParameterizedType;import java.lang.reflect.Type;import java.util.Arrays;public class GenericTypeTest { static class Test1 extends T<Person, Animal> { } static class Test2 implements I<Person, Animal>, I2<Fruit> { } public static void main(String[] args) { //获取类定义上的泛型类型 Test1 test1 = new Test1(); Type types = test1.getClass().getGenericSuperclass(); System.out.println(types); Type[] genericType = ((ParameterizedType) types).getActualTypeArguments(); for (Type t : genericType) { System.out.println(t.getTypeName()); } System.out.println("==============================================="); //获取接口定义上的泛型类型 Test2 test2 = new Test2(); //一个类可能实现多个接口,每个接口上定义的泛型类型都可取到 Type[] interfacesTypes = test2.getClass().getGenericInterfaces(); System.out.println(Arrays.asList(interfacesTypes)); for (Type t : interfacesTypes) { Type[] genericType2 = ((ParameterizedType) t).getActualTypeArguments(); for (Type t2 : genericType2) { System.out.println(t2.getTypeName()); } } }}class T<T1, T2> { public void printT(T1 t1, T2 t2) { System.out.println(t1.getClass()); System.out.println(t2.getClass()); }}interface I<T1, T2> {}interface I2<K> {}class Person { @Override public String toString() { return "Person Type"; }}class Animal { @Override public String toString() { return "Animal Type"; }}class Fruit { @Override public String toString() { return "Fruit Type"; }}
不同的函数不一样
map函数
会getGenericInterfaces
然后遍历看哪个类型属于有泛型(ParameterizedType类型)且getRawType是MapFunction类型
然后getActualArguments[1]
返回即可
source
分支主题
分支主题
分支主题
分支主题
Function接口:用户通过继承该接口的不同子类来实现用户自己的数据处理逻辑,如上述中实现了SourceFunction这个子类,来实现从指定hostname和port来接收数据,并转发字符串的逻辑; StreamOperator接口:数据流操作符的基础接口,该接口的具体实现子类中,会有保存用户自定义数据处理逻辑的函数的属性,负责对userFunction的调用,以及调用时传入所需参数,比如在StreamSource这个类中,在调用SourceFunction的run方法时,会构建一个SourceContext的具体实例,作为入参,用于run方法中,进行数据的转发; StreamTransformation接口:该接口描述了构建一个DataStream的操作,以及该操作的并行度、输出数据类型等信息,并有一个属性,用来持有StreamOperator的一个具体实例; DataStream:描述的是一个具有相同数据类型的数据流,底层是通过具体的StreamTransformation来实现,其负责提供各种对流上的数据进行操作转换的API接口。
operation
union
分支主题
union的datastream都必须是相同的返回值类型,否则会抛异常
union会把所有的transformation取出来放到一个ArrayList中,返回另一个DataStream并将UnionTransformation传入构造器中
map
分支主题
分支主题
通过反射获取返回类型
关联接口
RichFunction
提供open close生命周期方法,以及设置获取IterationRuntimeContext 或RuntimeContext的接口
子类
AbstractRichFunction
对其进行了与context设置相关的最基本的实现
并没有复写open close生命周期方法
StreamMap
分支主题
真正执行者,会把processElement delegate给 mapper
父类
OneInputStreamOperator
分支主题
父类
StreamOperator
分支主题
主要是生命周期方法,和checkPoint方法
flatMap
分支主题
关联类
StreamFlatMap
分支主题
filter
分支主题
关联类
StreamFilter
分支主题
keyBy
分支主题
BasicArrayTypeInfo或PrimitiveArrayTypeInfo
分支主题
分支主题
分支主题
分支主题
分支主题
KeyedStream
分支主题
分支主题
分支主题
分支主题
分支主题
分支主题
分支主题
分支主题
这种方法,相较于直接hash的方法在并行度发生改变的时候,可以减少rehash,效果类似于redis cluster的hash方法
a、先通过key的hashCode,算出maxParallelism的余数,也就是可以得到一个[0, maxParallelism)的整数; b、在通过公式 keyGroupId * parallelism / maxParallelism ,计算出一个[0, parallelism)区间的整数,从而实现分区功能。
timeWindow
分支主题
分支主题
WindowAssigner
分支主题
SlidingEventTimeWindows
分支主题
分支主题
获取了最近一次窗口开始的时间戳
循环找出满足条件的所有窗口
triger
分支主题
分支主题
分支主题
TrigerResult
分支主题
window
WindowAssigner
源码
分支主题
实现类
TumblingEventTimeWindows
分支主题
分支主题
reduce
分支主题
举例
PassThroughWindowFunction
分支主题
WindowFunction
分支主题
例子
分支主题
分支主题
分支主题
分支主题
分支主题
sum
分支主题
分支主题
分支主题
sum函数也是一种特殊的reduce
addSource
分支主题
分支主题
分支主题
只有继承了ParallelSourceFunction,isParallel才为true
如果想要并行的数据源,需要实现ParallelSourceFunction
默认并行度为1
相关类
StreamSource
分支主题
分支主题
分支主题
举例
分支主题
分支主题
分支主题
分支主题
分支主题
SocketTextStreamFunction
run
分支主题
cancel
分支主题
addSink
分支主题
分支主题
split
例子
SplitStream<Integer> split = someDataStream.split(new OutputSelector<Integer>() { @Override public Iterable<String> select(Integer value) { List<String> output = new ArrayList<String>(); if (value % 2 == 0) { output.add("even"); } else { output.add("odd"); } return output; }}); DataStream<Integer> evenStream = splitStream.select("even");DataStream<Integer> oddStream = splitStream.select("odd");evenStream.print();oddStream.print();
OutputSelector
分支主题
源码
分支主题
分支主题
上下游
shuffle
分支主题
broadcast
分支主题
reblance
分支主题
轮询
rescale
分支主题
global
分支主题
Table api
StreamTableEnvironment
注册表
registerTableSource(name: String, tableSource: TableSource[_]): Unit
分支主题
会调用registerTableInternal方法
分支主题
名字不能重复,否则会报错
会存在rootSchema中
只有StreamTableSource可以注册
假如设置了行时间属性,就必须为eventTime类型
registerDataStream[T](name: String, dataStream: DataStream[T], fields: String): Unit
分支主题
正则将逗号分隔的多个字段
调用registerDataStreamInternal
分支主题
步骤
分支主题
正常的列
分支主题
时间列抽取,调整
分支主题
包装成DataStreamTable
分支主题
注册为table
方法
select
sql api
sqlQuery
分支主题
Calcite
1 1. Sql Parser: 将sql语句通过java cc解析成AST(语法树),在calcite中用SqlNode表示AST;2 2. Sql Validator: 结合数字字典(catalog)去验证sql语法;3 3. 生成Logical Plan: 将sqlNode表示的AST转换成LogicalPlan, 用relNode表示;4 4. 生成 optimized LogicalPlan: 先基于calcite rules 去优化logical Plan,5 再基于flink定制的一些优化rules去优化logical Plan;6 5. 生成Flink PhysicalPlan: 这里也是基于flink里头的rules将,将optimized LogicalPlan转成成Flink的物理执行计划;7 6. 将物理执行计划转成Flink ExecutionPlan: 就是调用相应的tanslateToPlan方法转换和利用CodeGen元编程成Flink的各种算子。
parse
分支主题
分支主题
分支主题
分支主题
分支主题
启动
生成Stream图
StreamGraphGenerator.generate(this, transformations);
分支主题
分支主题
分支主题
例子 flatMap
分支主题
分支主题
分支主题
分支主题
分支主题
上下游的并行度一致的话就采用forward的模式,否则就会rebalance
分支主题
分支主题
例子source
分支主题
分支主题
分支主题
分支主题
slotSharingGroup
分支主题
a、如果指定了分组名,则直接返回指定的值; b、如果没有指定分组名,则遍历输入的各个节点的分组名;b.1、如果所有输入的分组名都是一样的,则将这个一样的分组名作为当前节点的分组名; b.2、如果所有输入的分组名有不一样的,则返回默认分组名”default”;
例子keyBy
分支主题
分支主题
例子sink
分支主题
StreamGraph
分支主题
可以看出其构造函数中就是做了一些初始化的操作,给StreamGraph的各个属性设置初始值,都是一些空集合。 在获取到StreamGraphGenerator的实例后,继续看其generatorInternal方法的逻辑:
生成Job图
StreamingJobGraphGenerator.createJobGraph(this, jobID);
分支主题
分支主题
分支主题
通过hash值生成id
如果用户对节点指定了一个散列值,则基于用户指定的值,产生一个长度为16的字节数组; 如果用户没有指定,则根据当前节点所处的位置,产生一个散列值,考虑的因素有:a、在当前StreamNode之前已经处理过的节点的个数,作为当前StreamNode的id,添加到hasher中; b、遍历当前StreamNode输出的每个StreamEdge,并判断当前StreamNode与这个StreamEdge的目标StreamNode是否可以进行链接,如果可以,则将目标StreamNode的id也放入hasher中,且这个目标StreamNode的id与当前StreamNode的id取相同的值; c、将上述步骤后产生的字节数据,与当前StreamNode的所有输入StreamNode对应的字节数据,进行相应的位操作,最终得到的字节数据,就是当前StreamNode对应的长度为16的字节数组。
分支主题
uid —— 这个字段是用户设置的,用来在任务重启时,保障JobVertexID一致,一般是从之前的任务日志中,找出对应的值而设置的; userProvidedNodeHash —— 这个字段也是用户设置的,设置的用户自己产生的散列值。
分支主题
分支主题
串联简化操作
分支主题
分支主题
hashes和legacyHashes就是上面产生的每个StreamNode的ID对应的散列字节数组。 chainedOperatorHashes是一个map:其key是顺序链接在一起的StreamNode的起始那个StreamNode的ID,比如source->flatMap这个两个对应的StreamNode,在这个例子中,key的值就是source对应的id,为1; value是一个列表,包含了这个链上的所有操作符的散列值;这个列表中的每个元素是一个二元组,这个列表的值就是{[source的主hash,source的备用hash_1],[source的主hash,source的备用hash_2],[flatMap的主hash,flatMap的备用hash_1],…},对于这里的例子,列表中只有二个元素,为{[source的主hash,null],[flatMap的主hash,null]}
如果startNodeId已经被构建完成,则直接返回一个空集合; 如果还没有构建,则开始新的构建;显示递归构建链的下游节点,在下游节点都递归构建完成后,再构建当前节点; 如果当前节点是一个链的起始节点,则新建一个JobVertex,并将相关配置都通过StreamConfig提供的接口,配置到JobVertex的configuration属性中; 如果是链的中间节点,则将相关配置添加到其对应的StreamConfig对象中。
分支主题
分支主题
启动脚本分析
start-cluster.sh
jobmanager.sh
flink-daemon.sh
分支主题
分支主题
分支主题
TMSlave start
分支主题
config.sh
分支主题
submitjob流程
handleMessage
监听模式
public enum ListeningBehaviour { DETACHED, // only receive the Acknowledge message about the job submission message EXECUTION_RESULT, // receive additionally the SerializedJobExecutionResult EXECUTION_RESULT_AND_STATE_CHANGES // receive additionally the JobStatusChanged messages}
分支主题
分支主题
buildGraph
getVerticesSortedTopologicallyFromSources
addNodesThatHaveNoNewPredecessors
状态管理
例子
sum
分支主题
reduceFunction
分支主题
分支主题
分支主题
max
分支主题
分支主题
分支主题
StreamGroupedReduce
同sum相同就是個reduceFunction
StreamTask
分支主题
分支主题
分支主题
triggerCheckpoint->performCheckpoint->checkpointState,最终来到了checkpointingOperation。
分支主题
分支主题
分支主题
在这里回调了每个StreamOperator的snapshotstate的方法
StreamOperator的接口里与checkpoint相关的接口方法
分支主题
分支主题
AbstractUdfStreamOperator
分支主题
分支主题
分支主题
分支主题
分支主题
AbstractStreamOperator
分支主题
AbstractStreamOperator是对StreamOperator的基础实现,在它的snapshotState方法中,分别调用了OperatorStateBackend和KeyedStateBackend的snapshot方法。特别注意,在调用这两个方法之前的snapshotState(snapshotContext)这个调用,它一方面实现了Raw的State的snapshot,一方面也实现了用户自定义的函数的State的更新。再说一下,后面的两个函数,snapshotState和initializeState,他们的形参都是一个context,是提供给用户来重新实现用户自己的state的checkpoint的。
分支主题
StateBackend
分支主题
DefaultOperatorStateBackend
非常长。。。。。。
本质上还是内存中的Map
List State本质上是 ArrayList
HeapKeyedStateBackend
分支主题
performSnapshot 很长
分支主题
StreamingRuntimeContext
分支主题
分支主题
分支主题
分支主题
AbstractKeyedState
分支主题
分支主题
分支主题
分支主题
分支主题
分支主题
分支主题
分支主题
分支主题
是个hashmap
AbstractKeyedStateBackend
StateDiscriptor
分支主题
Keyed State:ValueState<T>:保持一个可以更新和获取的值(每个Key一个value),可以用来update(T)更新,用来T value()获取。ListState<T>: 保持一个值的列表,用add(T) 或者 addAll(List<T>)来添加,用Iterable<T> get()来获取。ReducingState<T>: 保持一个值,这个值是状态的很多值的聚合结果,接口和ListState类似,但是可以用相应的ReduceFunction来聚合。AggregatingState<IN, OUT>:保持很多值的聚合结果的单一值,与ReducingState相比,不同点在于聚合类型可以和元素类型不同,提供AggregateFunction来实现聚合。FoldingState<T, ACC>: 与AggregatingState类似,除了使用FoldFunction进行聚合。MapState<UK, UV>: 保持一组映射,可以将kv放进这个状态,使用put(UK, UV) or putAll(Map<UK, UV>)添加,或者使用get(UK)获取。
watermark
kafka source
分支主题
调用了sourceContext.collectWithTimestamp(record, timestamp);这个方法
分支主题
分支主题
consumer.assignTimestampsAndWatermarks(new CanalEntryWatermark(TimeUnit.MINUTES.toMillis(1)));通过传入的
assignTimestamps
分支主题
ExtractTimestampsOperator
分支主题
分支主题
假如比当前大就发出去,否则忽略
相当于增加了一个operator transformation
StreamTask的一个component
StreamInputProcessor
分支主题
window
window内部组件(WindowOperator)
构造方法
分支主题
也是OneInputStreamOperator
processElement
分支主题
分支主题
分配window
分支主题
state
分支主题
获取keyedState对应当前的key
分支主题
windowState
以简单的内存HeapKeyedStateBackend为例
分支主题
List
分支主题
底层是ArrayList
Reducing
分支主题
分支主题
分支主题
分支主题
种类
分支主题
TrigerContext
分支主题
分支主题
EventTimeTrigger
分支主题
\
分支主题
假如要是fire的话就会调用process方法,否则就继续执行
watermark
AbstractStreamOperator
分支主题
分支主题
分支主题
WindowOperator的OnElement
分支主题
清除窗口状态的代码逻辑
分支主题
分支主题
分支主题
allowLateness
分支主题
分支主题
分支主题
triger的触发时机
window.maxTimestamp() <= ctx.getCurrentWatermark()
drop的触发时机
分支主题
分支主题
综上,当allowlateness设置好后一个窗口可能会被多次触发
例子connect(stream2).process(new CoProcessFunction())
分支主题
分支主题
CoprocessedOperator
分支主题
分支主题
TwoInputStreamTask
分支主题
StreamTwoInputPorcessor
分支主题
JoinedStreasm.where(keySelector).equalTo(keySelector).window.apply()
分支主题
分支主题
分支主题
分支主题
分支主题
分支主题
分支主题
分支主题
最终转换成了coGroup
分支主题
分支主题
where.equalTo.window都一样属于复制粘贴的代码
分支主题
首先把对象封装成用TaggedUnion包装的类,这样两者数据类型一致就可以进行union了
分支主题
然后会把他当成一简单的keyBy之后的window apply操作
分支主题
分支主题
如果trigger和evictor不存在也很简单,就会用默认提供的trigger,然而evictor是不必须的
JoinCoGroupoFunction的实现
分支主题
双重循环,笛卡尔join
userfuction
全量 apply
分支主题
增量 reduce
分支主题
反压
LocalBufferPool
分支主题
分支主题
当Buffer满了之后自己就会被阻塞在向localBufferPool申请内存块的过程中
ui
分支主题
分支主题
getStackrace
分支主题
分支主题
最佳实践
配置获取
ParameterTool
api
/**parse parameters*/public static void main(String[] args) {ParameterTool params = ParameterTool.fromArgs(args);String input = params.getRequired("input");String elements = params.getRequired("elements");...params.get("output", "myDefaultValue");params.getLong("expectedCount", -1L);params.getNumberOfParameters()// .. regular code ..
载入
通过property文件载入
String propertiesFile = "/home/sam/flink/myjob.properties";ParameterTool parameter = ParameterTool.fromPropertiesFile(propertiesFile);
args
ParameterTool parameters = ParameterTool.fromArgs(args);
jvm参数
从系统属性获取您的配置值。启动JVM时,可以将系统属性传递给它:-Dinput=hdfs:///mydata。您也可以ParameterTool从这些系统属性初始化:
ParameterTool parameter = ParameterTool.fromSystemProperties();
传递
open方法中获取
ParameterTool parameters = ParameterTool.fromArgs(args);DataSet<Tuple2<String, Integer>> counts = text.flatMap(new Tokenizer()).withParameters(parameters.getConfiguration())
public static final class Tokenizer extends RichFlatMapFunction<String, Tuple2<String, Integer>> { @Override public void open(Configuration parameters) throws Exception { parameters.getInteger("myInt", -1); // .. do
全局参数,通过runtime获取
ParameterTool parameters = ParameterTool.fromArgs(args); // set up the execution environmentfinal ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();env.getConfig().setGlobalJobParameters(parameters);
public static final class Tokenizer extends RichFlatMapFunction<String, Tuple2<String, Integer>> { @Override public void open(Configuration parameters) throws Exception { ParameterTool parameters = (ParameterTool)getRuntimeContext().getExecutionConfig().getGlobalJobParameters(); parameters.getRequired("input"); // .. do more ..
特点
本身可序列化
TupleX
如果你往下游传入的参数较多,例如你有11个字段,不建议你使用Tuple11,建议你使用POJO(普通Java对象),而不是TupleX具有多个字段的数据类型。此外,使用POJO可以用于给大型Tuple命名,而不是显示的显示一大堆内容。例如下面:
CustomType var = new ...; public static class CustomType extends Tuple11<String, String, ..., String> { // constructor matching super}
window
但是:使用窗口函数的窗口 transformation不如其他情况高效,因为Flink必须在调用函数之前在内部缓冲窗口中的所有元素。窗口函数获取包含窗口所有元素的Iterable。不过这是以性能和资源消耗为代价的,因为元素不能增量地聚合,而要在内部缓冲,直到窗口就绪才能进行处理。解决方案:可以通过将窗口函数与ReduceFunction或FoldFunction相结合来进行缓解,(也就是来一个数据计算一次)以获得窗口元素的增量聚合和窗口函数接收到的其他窗口元数据。在Flink1.3中FoldFunction已经被抛弃。推荐使用aggreator的方式对窗口进行增量聚合。也就是在 windowWindowedStream.aggregate(...,...);
DataStream<T> input = ...;=// 基于event-time的滚动窗口input .keyBy(<key 选择器>) .window(TumblingEventTimeWindows.of(Time.seconds(5))) .aggregate(new .aggregate(new AggregateFunction<String,LongCounter,Long>(){ @Override public LongCounter createAccumulator() { return new LongCounter(); //定义一个累加器的类型, } @Override public void add(String value, LongCounter accumulator) { accumulator.add(Long.valueof(value)) } @Override public Long getResult(LongCounter accumulator) { return accumulator.getLocalValue(); } @Override public LongCounter merge(LongCounter a, LongCounter b) { a.merge(b); return a; }},new WindowFunction<Long, String, String, TimeWindow>() { @Override public void apply(String key, TimeWindow window, Iterable<Long> input, Collector<Long> out) throws Exception { out.collect(key+":" + input.iterator.next()) }});
上面的代码的意思是,首先AggregateFunction负责进行每来一条数据就进行计算的逻辑。内部使用了flink的state编程模型。其中LongCounter是flink自带的累加器。方法:createAccumulator,定义了存在state中的数据类型,数据每来一条都将调用add方法进行计算,然后调用merge来合并数据存回state,最后窗口触发的时候调用getResult方法输出结果。最后的结果进入WindowFunction,这时的WindowFunction里面Iterable<Long> input,其实只包含一个元素。就是AggregateFunction的计算结果。考虑这样的场景,我们定义一个大小为5分钟,甚至是1h的窗口,我们期望的结果是,5m的聚合结果或者是1h的聚合结果。但通常情况下,我不想等待这么久才得到结果怎么办?也就是说我想要一个增量的结果。flink提供一个名为ContinuousEventTimeTrigger的窗口触发器,该触发器是基于事件时间的按照指定时间间隔持续触发的触发器,它的首次触发取决于Watermark。首次触发的判断位于onElement中,它注册下一次(也是首次)触发eventTime 定时器的时机,然后将其first状态标识为false。概念我翻译一下,就是你可以定义一个间隔的触发时间,该触发器周期的触发窗口计算,输出增量结果。使用方法如下:
DataStream<T> input = ...;// 基于event-time的滚动窗口input.assignTimestampsAndWatermarks(new AssignWatermark(Time.seconds(60))) .keyBy(<key 选择器>) .window(TumblingEventTimeWindows.of(Time.seconds(5))) .<windowed transformation>(<窗口函数>) .trigger(ContinuousEventTimeTrigger.of(<time>));
allowedLateness
默认情况下,当watermark通过end-of-window之后,再有之前的数据到达时,这些数据会被删除。为了避免有些迟到的数据被删除,因此产生了allowedLateness的概念。默认情况下,如果不指定allowedLateness,其值是0,简单来讲,allowedLateness就是针对event time而言,对于watermark超过end-of-window之后,还允许有一段时间(也是以event time来衡量)来等待之前的数据到达,以便再次处理这些数据。这里提一下window的触发条件:watermark < end-of-window + allowedLateness 之内到达的数据,都会被再次触发窗口的计算。注意只要晚到的数据,且满足条件是会持续不断触发窗口计算的!!
DataStream<T> input = ...;=// 基于event-time的滚动窗口input.assignTimestampsAndWatermarks(new AssignWatermark(Time.seconds(60))) .keyBy(<key 选择器>) .window(TumblingEventTimeWindows.of(Time.seconds(5))) .allowedLateness(<time>); .<windowed transformation>(<窗口函数>);
注意:对于trigger是默认的EventTimeTrigger的情况下,allowedLateness会再次触发窗口的计算,而之前触发的数据,会buffer起来,直到watermark超过end-of-window + allowedLateness()的时间,窗口的数据及元数据信息才会被删除。当数据还是晚于 allowedLateness怎么办?如果你不想丢弃这部分数据你可以使用Flink的side output功能时,你可以获取到因为晚到被丢弃的元素流。之后你可以针对此流做进一步处理。首先你需要在windowed流上通过sideOutputLateData(OutputTag)指明你想要获取晚到的元素,然后你就能在windowed operation的结果中获取到side-output流:
final OutputTag<T> lateOutputTag = new OutputTag<T>("late-data"){};DataStream<T> input = ...;DataStream<T> result = input .keyBy(<key 选择器>) .window(<窗口 分配器>) .allowedLateness(<time>) .sideOutputLateData(lateOutputTag) .<windowed transformation>(<窗口函数>);DataStream<T> lateStream = result.getSideOutput(lateOutputTag);
其中,allowedLateness只针对Event Time有效;allowedLateness可用于TumblingEventTimeWindow、SlidingEventTimeWindow以及EventTimeSessionWindows,要注意这可能使得窗口再次被触发,相当于对前一次窗口的窗口的修正(累加计算或者累加撤回计算);要注意再次触发窗口时,UDF中的状态值的处理,要考虑state在计算时的去重问题。最后要注意的问题,就是sink的问题,由于同一个key的同一个window可能被sink多次,因此sink的数据库要能够接收此类数据。
延迟控制
默认情况下,元素不会逐个传输到每个算子的,如果来一个传输一个势必导致不必要的网络流量,flink缓存待发送到网络的元素。虽然这种方法对于优化吞吐量有好处,但是当输入流不够快时或者输入流过小的时候,它可能会导致延迟问题(因为要等待buffer慢的时候才会发送)。要控制吞吐量和延迟,你可以在execution environment(或单个operator)上使用env.setBufferTimeout(timeoutMillis)来设置缓冲区填满的最大等待时间。如果超过该最大等待时间,即使缓冲区未满,也会被自动发送出去。该最大等待时间默认值为100 ms。除非你的应用对数据的实时性精确到毫秒,你可以调整这个参数。一般默认就好。
LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();env.setBufferTimeout(timeoutMillis); // 设置这个参数env.generateSequence(1,10).map(new MyMapper()).setBufferTimeout(timeoutMillis); // 设置这个参数
为了最大化吞吐量,可以设置setBufferTimeout(-1),这样就没有了超时机制,缓冲区只有在满时才会发送出去。为了最小化延迟,可以把超时设置为接近0的值(例如5或10 ms)。应避免将该超时设置为0,因为这样可能导致性能严重下降。
常用的配置
private static StreamExecutionEnvironment getExecutionEnvironment(ParameterTool params) throws IOException { //int parallelism = params.getInt("parallelism", 1); //获取流处理的执行环境 final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //使用rockdb,并使用增量检查点的方式 env.setStateBackend(new RocksDBStateBackend("hdfs://hadoop-meituan/user/hadoop-launcher/flink/checkpoints", true)); //设置每6秒触发一次检查点,检查点是异步的,当腰读取kafka的时候,这个时间间隔6-10s为宜。 env.enableCheckpointing(6000, CheckpointingMode.EXACTLY_ONCE); //设置并行度 //env.setParallelism(parallelism); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); //这是检查点的最大个数 env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); //设置operate的缓冲时间,The default value for this timeout is 100 ms. env.setBufferTimeout(100); //设置job的全局参数 env.getConfig().setGlobalJobParameters(params); return env;}
aggregator
public class PreAggregateFunction implements AggregateFunction<MultiValueMetric, Map<String, Accumulator>, Map<String, List<Double>>> { @Override public Map<String, Accumulator> createAccumulator() { return new HashMap<>(); } @Override public void add(MultiValueMetric strData, Map<String, Accumulator> accumulatorMap) { String metric = strData.getMetric(); String type = strData.getType(); String key = metric + "," + type + "," + strData.getTime_granularity() +"," +strData.getTags() +"," + strData.getAppName(); //原始数据中的值 List<Double> currentValues = strData.getValues(); if (accumulatorMap.containsKey(key)) { Accumulator valueMap = accumulatorMap.get(key); for (Double currentValue : currentValues) { valueMap.add(currentValue); } } else { Accumulator accumulator = null; if (type.equals("counter")) { accumulator = new DoubleCounter(); } else if (type.equals("max")) { accumulator = new DoubleMaximum(); } else if (type.equals("mix")) { accumulator = new DoubleMinimum(); } else if (type.equals("avg")) { accumulator = new AverageAccumulator(); } else if (type.equals("timer")) { accumulator = new ListAccumulator<>(); } if (accumulator != null) { for (Double currentValue : currentValues) { accumulator.add(currentValue); } accumulatorMap.put(key, accumulator); } } } @Override public Map<String, List<Double>> getResult(Map<String, Accumulator> accumulator) { Map<String, List<Double>> result = new HashMap<>(); for (Map.Entry<String, Accumulator> entry : accumulator.entrySet()) { List<Double> values = new ArrayList<>(); String key = entry.getKey(); String type = key.split(",")[1]; if ((type.equals("timer")) && entry.getValue() instanceof ListAccumulator) { values = (ArrayList) entry.getValue().getLocalValue(); result.put(key, values); } else { values.add((double) entry.getValue().getLocalValue()); result.put(key, values); } } return result; } /** * Merge two collections of accumulators. The second will be merged into the first. * * @param target The collection of accumulators that will be updated * @param toMerge The collection of accumulators that will be merged into the other */ @Override public Map<String, Accumulator> merge(Map<String, Accumulator> target, Map<String, Accumulator> toMerge) { for (Map.Entry<String, Accumulator> otherEntry : toMerge.entrySet()) { Accumulator ownAccumulator = target.get(otherEntry.getKey()); if (ownAccumulator == null) { // Create initial counter (copy!) target.put(otherEntry.getKey(), otherEntry.getValue().clone()); } else { // Both should have the same type AccumulatorHelper.compareAccumulatorTypes(otherEntry.getKey(), ownAccumulator.getClass(), otherEntry.getValue().getClass()); // Merge target counter with other counter ownAccumulator.merge(otherEntry.getValue()); } } return target; }}
0 条评论
下一页