Flink
2021-01-17 03:28:26 1 举报
AI智能生成
flink总结
作者其他创作
大纲/内容
(一)、flink简介
架构图
子主题
内存模型
rpc机制
(二)、flink部署
yarn
Session-Cluster
子主题
Per-Job-Cluster
Mesos
k8s
standalone模式
(三)、flink运行时架构
运行组件
作业管理器 JobManager
资源管理器 ResourceManager
分发器 Dispatcher
Dispatcher 主要负责提供一个 REST 接口,用来提交应用,并且负责为每一个新提交的作业启动一个新的 JobMaster 组件。
Dispatcher 也会启动一个 Web UI,用来方便地展示和监控作业执行的信息。
jobMaster
JobMaster和具体的 Job 是一一对应的,多个 Job 可以同时运行在一个 Flink 集群中, 每个 Job 都有一个自己的 JobMaster。
在作业提交时,JobMaster 会先接收到要执行的应用。这里所说“应用”一般是客户端提交来的,包括:Jar 包,数据流图(dataflow graph),和作业图(JobGraph)。
JobMaster 会把 JobGraph 转换成一个物理层面的数据流图,这个图被叫作“执行图”
( ExecutionGraph ), 它包含了所有可以并发执行的任务。 JobMaster 会向资源管理器
(ResourceManager)发出请求,申请执行任务必要的资源。一旦它获取到了足够的资源,就会将执行图分发到真正运行它们的 TaskManager 上。
而在运行过程中,JobMaster 会负责所有需要中央协调的操作,比如说检查点(checkpoints)的协调。
JobMaster 会把 JobGraph 转换成一个物理层面的数据流图,这个图被叫作“执行图”
( ExecutionGraph ), 它包含了所有可以并发执行的任务。 JobMaster 会向资源管理器
(ResourceManager)发出请求,申请执行任务必要的资源。一旦它获取到了足够的资源,就会将执行图分发到真正运行它们的 TaskManager 上。
而在运行过程中,JobMaster 会负责所有需要中央协调的操作,比如说检查点(checkpoints)的协调。
任务管理器 TaskManager
任务提交流程
执行图
StreamGraph 逻辑流图
jobGraph 作业图
从StreamGraph 逻辑流图到jobGraph 作业图 做的主要事情是operator chain优化
ExecutionGraph 执行图
物理执行图
执行图流程
并行度
设置全局并行度 env.setParallelism(2);
设置算子并行度 wordCount.print().setParallelism(1);
提交应用时设置
配置文件中设置
operator chain触发策略
1、数据传输策略是 forward strategy
2、在同一个 TaskManager 中运行
flink任务提交
(四)、flink流处理API
获取执行环境 Environment
createRemoteEnvironment(host: String, port: Int, jarFiles: String*)
getExecutionEnvironment()
createLocalEnvironment()
createRemoteEnvironment(host: String, port: Int, jarFiles: String*)
Source
从集合中读取数据
直接读取
从文件中读取
从Kafka中获取
从自定义的Source中 读取
Transform算子
map算子
flatMap算子
filter算子
keyBy算子
聚合算子
这些算子针对于KeyedStream的每一个支流做聚合。例如:sum()、min()、max()、minBy()和maxBy()。
reduce算子
split算子和select算子
connect算子和coMap算子
union算子
UDF函数
函数类
其实在我们常用的算子如map、filter等都暴露了对应的接口。例如:MapFunction, FilterFunction, ProcessFunction 等等。可以自定义实现
匿名函数
val flinkTweets = tweets.filter(_.contains("flink"))
富函数
富函数是DataStream API提供的一个函数类的接口,所有Flink函数都有其Rich版本。它与常规函数的不同在于,可以获取运行环境的上下文,并拥有一些生命周期方法,所以可以实现更复杂的功能。例如:RichMapFunction、RichFlatMapFunction和RichFilterFunction等。
富函数有一个生命周期的概念。典型的生命周期方法有:
①open()方法是富函数的初始化方法,当一个算子例如map或者filter被调用之前该
方法会被调用
方法会被调用
②close()方法是生命周期中的最后一个调用方法,做一些清理工作。
③getRuntimeContext()方法提供了函数的RuntimeContext的一些信息,例如函数执行的并行度,任务的名字,以及state状态。
创建MyMapper
创建MyMapper
Sink
参考九connectors
(五)、flink中的window
窗口推荐文章
官网链接
window的一般操作流程
窗口化的Flink程序的一般结构如下,第一个代码段中是分组的流,而第二段是非分组的流。正如我们所见,唯一的区别是分组的stream调用keyBy(…)和window(…),而非分组的stream中window()换成了windowAll(…)
Keyed Windows
Non-Keyed Windows
在上面的例子中,方括号[]内的命令是可选的,这表明Flink允许你根据最符合你的要求来定义自己的window逻辑
Window 概述
streaming 流式计算是一种被设计用于处理无限数据集的数据处理引擎,而无限数据集是指一种不断增长的本质上无限的数据集,而 window 是一种切割无限数据为有限块进行处理的手段。
Window 是无限数据流处理的核心,Window 将一个无限的 stream 拆分成有限大小的” buckets” 桶, 我们可以在这些桶上做计算操作。
窗口分配器(Window Assingers)
Window Assinger是干啥的
当你决定stream是否keyby之后,window是没有构建的,你还需要指定一个window Assinger用于定义元素如何分配到窗口中
window Assinger的作用:负责将每个传入的元素分配给一个或多个窗口
Flink 提供了一些常用的预定义窗口分配器,即:滚动窗口、滑动窗口、会话窗口和全局窗口。你也可以通过继承WindowAssigner类来自定义自己的窗口。
源码中滚动、滑动、会话等窗口都继承了windowAssigner
public class TumblingProcessingTimeWindows extends WindowAssigner<Object, TimeWindow> {}
public class TumblingProcessingTimeWindows extends WindowAssigner<Object, TimeWindow> {}
指定完你的数据流是分组的还是非分组的之后,接下来你需要定义一个窗口分配器(window assigner),窗口分配器定义了元素如何分配到窗口中,这是通过在分组数据流中调用window(...)或者非分组数据流中调用windowAll(...)时你选择的窗口分配器(WindowAssigner)来指定的
Window Assingers 类型
CountWindow(计数窗口)
按照指定的数据条数生成一个 Window,与时间无关。
滚动计数窗口
滑动计数窗口
TimeWindow(时间窗口)
1. 滚动窗口(Tumbling Windows)
滚动事件时间窗口( tumbling event-time windows )
input
.keyBy(<key selector>)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.<windowed transformation>(<window function>);
.keyBy(<key selector>)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.<windowed transformation>(<window function>);
滚动处理时间窗口(tumbling processing-time windows)
input
.keyBy(<key selector>)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.<windowed transformation>(<window function>);
.keyBy(<key selector>)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.<windowed transformation>(<window function>);
每日偏移8小时的滚动事件时间窗口(daily tumbling event-time windows offset by -8 hours. )
input
.keyBy(<key selector>)
.window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8)))
.<windowed transformation>(<window function>);
.keyBy(<key selector>)
.window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8)))
.<windowed transformation>(<window function>);
2. 滑动窗口(Sliding Windows)
滑动事件时间窗口
input
.keyBy(<key selector>)
.window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
.<windowed transformation>(<window function>);
.keyBy(<key selector>)
.window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
.<windowed transformation>(<window function>);
滑动处理时间窗口
input
.keyBy(<key selector>)
.window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
.<windowed transformation>(<window function>);
.keyBy(<key selector>)
.window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
.<windowed transformation>(<window function>);
偏移8小时的滑动处理时间窗口(sliding processing-time windows offset by -8 hours)
input
.keyBy(<key selector>)
.window(SlidingProcessingTimeWindows.of(Time.hours(12), Time.hours(1), Time.hours(-8)))
.<windowed transformation>(<window function>);
.keyBy(<key selector>)
.window(SlidingProcessingTimeWindows.of(Time.hours(12), Time.hours(1), Time.hours(-8)))
.<windowed transformation>(<window function>);
3. 会话窗口(Session Windows)
事件时间会话窗口(event-time session windows)
input
.keyBy(<key selector>)
.window(EventTimeSessionWindows.withGap(Time.minutes(10)))
.<windowed transformation>(<window function>);
.keyBy(<key selector>)
.window(EventTimeSessionWindows.withGap(Time.minutes(10)))
.<windowed transformation>(<window function>);
处理时间会话窗口(processing-time session windows)
input
.keyBy(<key selector>)
.window(ProcessingTimeSessionWindows.withGap(Time.minutes(10)))
.<windowed transformation>(<window function>);
.keyBy(<key selector>)
.window(ProcessingTimeSessionWindows.withGap(Time.minutes(10)))
.<windowed transformation>(<window function>);
窗口函数(Window Functions)
增量聚合函数( incremental aggregation functions)
全窗口函数( full window functions)
热门商品统计topN代码链接
(六)、flink时间语义与watermark
时间语义
Event Time:事件时间
Ingestion Time:数据进入flink的时间
Processing Time:算子计算时间
EventTime 的引入
EventTime 的引入方式
Watermark
基本概念
watermark是一种衡量Event Time进展的机制,它是数据本身的一个隐藏属性
通常基于Event Time的数据,自身都包含一个timestamp.watermark是用于处理乱序事件的,而正确的处理乱序事件,通常用watermark机制结合window来实现。我们知道,流处理从事件产生,到流经source,再到operator,中间是有一个过程和时间的。虽然大部分情况下,流到operator的数据都是按照事件产生的时间顺序来的,但是也不排除由于网络、背压等原因,导致乱序的产生(out-of-order或者说late element)。但是对于late element,我们又不能无限期的等下去,必须要有个机制来保证一个特定的时间后,必须触发window去进行计算了。此时就是watermark发挥作用了,它表示当达到watermark到达之后,在watermark之前的数据已经全部达到(即使后面还有延迟的数据)
watermark引入
dataStream.assignTimestampsAndWatermarks( new
BoundedOutOfOrdernessTimestampExtractor[SensorReading](Time.millisecond s(1000)) {
override def extractTimestamp(element: SensorReading): Long = { element.timestamp * 1000
}
} )
BoundedOutOfOrdernessTimestampExtractor[SensorReading](Time.millisecond s(1000)) {
override def extractTimestamp(element: SensorReading): Long = { element.timestamp * 1000
}
} )
watermark面试陈述
1、介绍三种时间语义概念
2、watermark由来
流处理从事件产生,到流经source,再到operator,中间是有一个过程和时间的,虽然大部分情况下,流到operator的数据都是按照事件产生的时间顺序来的,但是也不排除由于网络、分布式等原因,导致乱序的产生,所谓乱序,就是指Flink接收到的事件的先后顺序不是严格按照事件的Event Time顺序排列的。
那么此时出现一个问题,一旦出现乱序,如果只根据eventTime决定window的运行,我们不能明确数据是否全部到位,但又不能无限期的等下去,此时必须要有个机制来保证一个特定的时间后,必须触发window去进行计算了,这个特别的机制,就是Watermark。
3、什么是Watermark
Watermark被称为水位线,是一种衡量事件时间(Event time)进度的机制,本质上是一种时间戳,可以在读取 Source时候指定或者在transformation操作之前,用Watermark生成器按照需求指定。
特点:
Watermark是一种衡量事件时间(Event time)进度的机制,本质上是一种时间戳。
Watermark是用于处理乱序事件的,而正确的处理乱序事件,通常用Watermark机制结合window来实现。也就是当最大的事件时间maxEventTime小于Watermark之前,window仍然可以接收数据,根据Event time将其分配到指定的窗口。
Watermark也可以理解成一个延迟触发机制,每个Watermark都可以设置一个延时时长,表示事件延迟触发的程度。假设Watermark的延时时长t,每次系统会校验已经到达的数据中最大的maxEventTime,然后认定eventTime小于maxEventTime - t的所有数据都已经到达,如果有窗口的停止时间等于maxEventTime – t,那么这个窗口被触发执行。
Flink对乱序数据的三重保障
Watermark
能够保证迟到很短的时间的数据到来后(一般是迟到毫秒级别内的数据,最大不超过1s),触发窗口关闭并输出。(即能够hold住短时间内迟到的数据)
allowedLateness
如果设置allowedLateness,比如设置2min,这样Watermark时间到之后会先输出一个近似的结果,然后在2min内来一个数据更新一次结果,等待时间2min到了之后,关闭窗口。
sideOutputLateData
再之后迟到的数据,可以通过sideOutputLateData(侧输出流)来兜底。
(七)、processFunction API(low-level API)
ProcessFunction API其实是Flink底层的(low-level)API。
StreamData除了为我们提供了我们之前介绍过的高层转换算子还为我们提供了底层转换算子。与高层转换算子不同,通过这些底层转换算子我问可以访问数据的时间戳、watermark、ontimer定时器以及注册定时事件等等。
Flink为我们提供了8中process function:
ProcessFunction
KeyedProcessFunction
示例链接
重在实战,其他几个使用方法大同小异
CoProcessFunction
ProcessJoinFunction
BroadcastProcessFunction
KeyedBroadcastProcessFunction
ProcessWindowFunction
ProcessAllWindowFunction
(八)、状态编程和容错机制
state
operation state和keyed state官网链接,有使用案例
operator state
Flink 为算子状态提供三种基本数据结构:
列表状态(List state)
val descriptor = new ListStateDescriptor[(String, Long)](
"buffered-elements",
TypeInformation.of(new TypeHint[(String, Long)]() {})
)
checkpointedState = context.getOperatorStateStore.getListState(descriptor)
"buffered-elements",
TypeInformation.of(new TypeHint[(String, Long)]() {})
)
checkpointedState = context.getOperatorStateStore.getListState(descriptor)
联合列表状态(Union list state)
广播状态(Broadcast state)
keyed state
Flink 的 Keyed State 支持以下数据类型
ValueState[T]保存单个的值,值的类型为 T。
ListState[T]保存一个列表,列表里的元素的数据类型为 T。
MapState[K, V]保存 Key-Value 对。
ReducingState[T]
AggregatingState[I, O]
state backend 状态数据存储位置
用来保存 State 的存储后端就叫做StateBackend。StateBackend 默认是保存在 JobManager 的内存中,也可以保存在 本地文件系统 或者 HDFS 分布式文件系统中
存储类型
MemoryStateBackend
缺点
状态数据有可能会丢失
只能保存数据量小的状态
优点
开发测试很方便
默认情况下,状态信息是存储在 TaskManager 的堆内存中的,checkpoint 的时候将状态保存到
JobManager 的堆内存中。
JobManager 的堆内存中。
FsStateBackend
缺点
状态大小受TaskManager内存限制(默认支持5M)
优点
状态访问速度很快
状态信息不会丢失
状态信息存储在 TaskManager 的堆内存中的,checkpoint 的时候将状态保存到指定的文件中 (HDFS
等文件系统)
等文件系统)
RocksDBStateBackend
缺点
状态访问速度有所下降
优点
可以存储超大量的状态信息
状态信息不会丢失
用于: 生产,可以存储超大量的状态信息
状态信息存储在 RocksDB 数据库 (key-value 的数据存储服务), 最终保存在本地文件中
checkpoint 的时候将状态保存到指定的文件中 (HDFS 等文件系统)
checkpoint 的时候将状态保存到指定的文件中 (HDFS 等文件系统)
设置
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(...);
env.setStateBackend(...);
checkpoint
Checkpoint通过允许从状态和相应流的位置进行恢复,从而使Flink中的状态具备容错能力,从而使应用程序具有与无故障执行相同的语义。
checkpoint设置
//启动检查点 设置checkpoint的周期,建议每5分钟checkpoint一次,下面的单位是毫秒
env.enableCheckpointing(30000)
env.enableCheckpointing(30000)
// 高级选项:
// 设置模式为exactly-once (这是默认值)
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
// 设置模式为exactly-once (这是默认值)
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
// 确保检查点之间有至少500 ms的间隔【checkpoint最小间隔】
env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)
env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)
// 检查点必须在一分钟内完成,或者被丢弃【checkpoint的超时时间】
env.getCheckpointConfig.setCheckpointTimeout(60000)
env.getCheckpointConfig.setCheckpointTimeout(60000)
//设置状态后端
env.setStateBackend(new FsStateBackend("hdfs:///user/fts/flink/checkpoint"))
env.setStateBackend(new FsStateBackend("hdfs:///user/fts/flink/checkpoint"))
// 同一时间只允许进行一个检查点
env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
//系统异常退出或人为 Cancel 掉,不删除checkpoint数据
env.getCheckpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
env.getCheckpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
恢复数据
从checkpoint恢复数据,启动时指定参数-s
flink run -s
hdfs://192.168.123.102:9000/flink/checkpoint/815141e4bb2c353f791c8fd856060e2b/ch
k-81/_metadata -c com.nx.streaming.lesson05.TestCheckpoint original-nx-flink-
state-1.0-SNAPSHOT.jar --hostname 192.168.123.102 --port 9999
hdfs://192.168.123.102:9000/flink/checkpoint/815141e4bb2c353f791c8fd856060e2b/ch
k-81/_metadata -c com.nx.streaming.lesson05.TestCheckpoint original-nx-flink-
state-1.0-SNAPSHOT.jar --hostname 192.168.123.102 --port 9999
重启策略
Flink支持不同的重启策略,以在故障发生时控制作业如何重启,集群在启动时会伴随一个默认的重启策
略,在没有定义具体重启策略时会使用该默认策略。 如果在工作提交时指定了一个重启策略,该策略会
覆盖集群的默认策略,默认的重启策略可以通过 Flink 的配置文件 flink-conf.yaml 指定。配置参数
restart-strategy 定义了哪个策略被使用。
略,在没有定义具体重启策略时会使用该默认策略。 如果在工作提交时指定了一个重启策略,该策略会
覆盖集群的默认策略,默认的重启策略可以通过 Flink 的配置文件 flink-conf.yaml 指定。配置参数
restart-strategy 定义了哪个策略被使用。
常用的重启策略
固定延迟重启策略 (Fixed delay)
第一种:全局配置 flink-conf.yaml
restart-strategy: fixed-delay
restart-strategy.fixed-delay.attempts: 3
restart-strategy.fixed-delay.delay: 10 s
restart-strategy: fixed-delay
restart-strategy.fixed-delay.attempts: 3
restart-strategy.fixed-delay.delay: 10 s
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// 5表示最大重试次数为5次,10s为延迟时间
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(5,Time.of(10, TimeUnit.SECONDS)));
// 5表示最大重试次数为5次,10s为延迟时间
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(5,Time.of(10, TimeUnit.SECONDS)));
失败率 (Failure rate)
第一种:全局配置 flink-conf.yaml
restart-strategy: failure-rate
restart-strategy.failure-rate.max-failures-per-interval: 3
restart-strategy.failure-rate.failure-rate-interval: 5 min
restart-strategy.failure-rate.delay: 10 s
restart-strategy: failure-rate
restart-strategy.failure-rate.max-failures-per-interval: 3
restart-strategy.failure-rate.failure-rate-interval: 5 min
restart-strategy.failure-rate.delay: 10 s
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// 3为最大失败次数;5min为测量的故障时间;10s为2次间的延迟时间
env.setRestartStrategy(RestartStrategies.failureRateRestart(3,Time.of(5, TimeUnit.MINUTES),Time.of(10, TimeUnit.SECONDS)));
// 3为最大失败次数;5min为测量的故障时间;10s为2次间的延迟时间
env.setRestartStrategy(RestartStrategies.failureRateRestart(3,Time.of(5, TimeUnit.MINUTES),Time.of(10, TimeUnit.SECONDS)));
无重启 (No restart)
第一种:全局配置 flink-conf.yaml
restart-strategy: none
restart-strategy: none
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setRestartStrategy(RestartStrategies.noRestart());
env.setRestartStrategy(RestartStrategies.noRestart());
如果没有启用 checkpointing,则使用无重启 (no restart) 策略。
如果启用了 checkpointing,但没有配置重启策略,则使用固定间隔 (fixed-delay) 策略, 尝试重启次数默认值是:Integer.MAX_VALUE,重启策略可以在flink-conf.yaml中配置,表示全局的配置。也可以在
应用代码中动态指定,会覆盖全局配置。
如果启用了 checkpointing,但没有配置重启策略,则使用固定间隔 (fixed-delay) 策略, 尝试重启次数默认值是:Integer.MAX_VALUE,重启策略可以在flink-conf.yaml中配置,表示全局的配置。也可以在
应用代码中动态指定,会覆盖全局配置。
SavePoint
savepoint的使用
1:在flink-conf.yaml中配置Savepoint存储位置
不是必须设置,但是设置后,后面创建指定Job的Savepoint时,可以不用在手动执行命令时指定
Savepoint的位置
state.savepoints.dir: hdfs://namenode:9000/flink/savepoints
不是必须设置,但是设置后,后面创建指定Job的Savepoint时,可以不用在手动执行命令时指定
Savepoint的位置
state.savepoints.dir: hdfs://namenode:9000/flink/savepoints
2:触发一个savepoint【直接触发或者在cancel的时候触发】
停止程序:bin/flink cancel -s [targetDirectory] jobId [-yid yarnAppId]【针对on yarn
模式需要指定-yid参数】
停止程序:bin/flink cancel -s [targetDirectory] jobId [-yid yarnAppId]【针对on yarn
模式需要指定-yid参数】
3:从指定的savepoint启动job
bin/flink run -s savepointPath [runArgs]
bin/flink run -s savepointPath [runArgs]
(九)、flink connectors
kafka
hdfs
BucketingSink
StreamFileSink
MySQL
hbase
Redis
(十)、Table API 与SQL
(十一)flink常见面试题与汇总
概念
flink序列化过程
flink job提交流程
flink task之间数据交互如何保证高性能的
子主题
flink内存模型
子主题
框架堆内存(Framework Heap Memory)
taskmanager.memory.framework.heap.size
用于 Flink 框架的 JVM 堆内存(进阶配置)
任务堆内存(Task Heap Memory)
taskmanager.memory.task.heap.size
用于 Flink 应用的算子及用户代码的 JVM 堆内存。
托管内存(Managed memory)
taskmanager.memory.managed.size
taskmanager.memory.managed.fraction
taskmanager.memory.managed.fraction
由 Flink 管理的用于排序、哈希表、缓存中间结果及 RocksDB State Backend 的本地内存。
框架堆外内存(Framework Off-heap Memory)
taskmanager.memory.framework.off-heap.size
用于 Flink 框架的堆外内存(直接内存或本地内存)(进阶配置)。
任务堆外内存(Task Off-heap Memory)
taskmanager.memory.task.off-heap.size
用于 Flink 应用的算计及用户代码的堆外内存(直接内存或本地内存)。
网络内存(Network Memory)
taskmanager.memory.network.min
taskmanager.memory.network.max
taskmanager.memory.network.fraction
taskmanager.memory.network.max
taskmanager.memory.network.fraction
用于任务之间数据传输的直接内存(例如网络传输缓冲)。该内存部分为基于 Flink 总内存的受限的等比内存部分。
JVM Metaspace
taskmanager.memory.jvm-metaspace.size
Flink JVM 进程的 Metaspace。
JVM 开销
taskmanager.memory.jvm-overhead.min
taskmanager.memory.jvm-overhead.max
taskmanager.memory.jvm-overhead.fraction
taskmanager.memory.jvm-overhead.min
taskmanager.memory.jvm-overhead.max
taskmanager.memory.jvm-overhead.fraction
用于其他 JVM 开销的本地内存,例如栈空间、垃圾回收空间等。该内存部分为基于进程总内存的受限的等比内存部分。
Flink的三种时间语义是什么?(NX)
Flink集群有哪些角色?各自有什么作用?(SGG-1.14.7)
作业管理器(JobManager)
资源管理器(ResourceManager)
任务管理器(TaskManager)
分发器(Dispatcher)
Flink场景的重启策略有哪几种?(NX)
说说Flink的状态存储(NX)
说说flink的三层模型(NX)
checkpoint机制
Flink 实现容错主要靠一致性检查点CheckPoint机制和State机制来实现。一致性检查点Checkpoint 负责定时制作分布式快照、对程序中某个时间点的状态进行备份,这个时间点,应该是所有任务都恰好处理完一个相同的输入数据的时候;State 用来存储,计算过程中的中间状态。
checkpoint机制主要靠chandy-labport算法实现的
是将检查点的保存和流式作业数据处理分开,不需要暂停整个应用,这里引入状态后端的概念,状态保存在状态后端中
需要介绍一个概念Checkpoint Barrier
flink的检查点算法利用到了一个,检查点分界线(barrier),用来把一条流上的数据按照不同的检查点分开
分界线之前到来的数据导致的状态变更,都会被包含在当前分界线所属的检查点中;而分界线之后的数据导致的所有变更,就会被包含在之后的检查点中
例子
1、两条流使用2个source来读取数据
2、jobmanager会向每个source任务发送一个带有检查点id的消息,启动检查点(假设ID=2)
3、source将读取的数据偏移量状态写入检查点id为2的状态后端中,并发送一个barrier(检查点屏障–对应的id也为2);
状态后端保存完成会通知source任务,source任务会向jobmanager确认检查点完成(此处为异步操作)
状态后端保存完成会通知source任务,source任务会向jobmanager确认检查点完成(此处为异步操作)
4、每个source的barrier–id=2会向下游传递,当下游算子收到所有的barrier,算子会将处理数据的结果,缓存到状态后端中,barrier会接着向下传递
5、直到sink向jobmanager确认,状态保存到状态后端,此次的cp就正式完成了,这就是flink的分布式快照实现。
checkpoint的原理使用的是chandy-labport 算法
说说Flink任务的运行流程(NX)
Flink是如何处理反压的(NX)
重启策略有哪几种?
分区策略
架构
Operator Chains(算子链)这个概念你了解吗(NX)
深入浅出flink1
为了更高效地分布式执行,Flink会尽可能地将operator的subtask链接(chain)在一起形成task。每个task在一个线程中执行。将operators链接成task是非常有效的优化:它能减少线程之间的切换,减少消息的序列化/反序列化,减少数据在缓冲区的交换,减少了延迟的同时提高整体的吞吐量。这就是我们所说的算子链。
Flink 中水印是什么概念,起到什么作用?(NX)
深入浅出flink5
说说Flink的窗口(NX)
Flink相比SparkStreaming有什么区别
flink的map和spark的map的区别
场景
flink kafka保持exactly once
Flink通过实现两阶段提交和状态保存来实现端到端的一致性语义,详说一下两阶段提交过程
数据倾斜
Flink怎么去重?考虑一个实时场景:双十一场景,滑动窗口长度为 1 小时, 滑动距离为 10 秒钟,
亿级用户,怎样计算 UV?(NX)
亿级用户,怎样计算 UV?(NX)
双流场景下,flink的map会同时包含两个流吗
任务延迟高, 优化思路?
出现了反压,从反压角度去思考
slot&并行度
并行度
操作算子层面(Operator Level)
可以在程序中的具体算子中进行灵活控制并行度,在每个算子的后面进行并行度的设置
val env =ExecutionEnvironment.getExecutionEnvironment
env.setParallelism(2)
val data = env.fromElements(1,2,3,4)
data.map(x=>x*2).setParallelism(4)
env.setParallelism(2)
val data = env.fromElements(1,2,3,4)
data.map(x=>x*2).setParallelism(4)
执行环境层面(Execution Environment Level)
在程序中进行控制,对env进行设置并行度,这样设置的话,程序中的算子都是根据env.setParallelism指定的并行度
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(8); // 设置并行度为8
env.setParallelism(8); // 设置并行度为8
客户端层面(Client Level)
在提交job的时候知道,通过-p参数来控制
bin/flink run -p 10 xxxx.jar
系统层面(System Level)
在配置文件flink-conf.yaml中进行设置,默认值是1,如果在启动任务或者代码中没有进行设置,那么就并行度就是配置文件中的值。
parallelism.default: 1
需要注意的优先级:算子层面>环境层面>客户端层面>系统层面。
slot
每个task slot表示TaskManager拥有资源的一个固定大小的子集。假如一个TaskManager有三个slot,那么它会将其管理的内存分成三份给各个slot。资源slot化意味着一个subtask将不需要跟来自其他job的subtask竞争被管理的内存,取而代之的是它将拥有一定数量的内存储备。需要注意的是,这里不会涉及到CPU的隔离,slot目前仅仅用来隔离task的受管理的内存。
通过调整task slot的数量,允许用户定义subtask之间如何互相隔离。如果一个TaskManager一个slot,那将意味着每个task group运行在独立的JVM中(该JVM可能是通过一个特定的容器启动的),而一个TaskManager多个slot意味着更多的subtask可以共享同一个JVM。而在同一个JVM进程中的task将共享TCP连接(基于多路复用)和心跳消息。它们也可能共享数据集和数据结构,因此这减少了每个task的负载。
通过调整task slot的数量,允许用户定义subtask之间如何互相隔离。如果一个TaskManager一个slot,那将意味着每个task group运行在独立的JVM中(该JVM可能是通过一个特定的容器启动的),而一个TaskManager多个slot意味着更多的subtask可以共享同一个JVM。而在同一个JVM进程中的task将共享TCP连接(基于多路复用)和心跳消息。它们也可能共享数据集和数据结构,因此这减少了每个task的负载。
双流JOIN
flink CDC
flink 反压
当flink前面的算子计算速度快,后面的算子计算速度慢,就会出现反压,出现反向压力
flink反压解决方式
通过 Flink Web UI 自带的反压监控面板
通过 Flink 内置 Metrics接口
flink反压
视频链接
resultPartition
发送端:ResultSubPartition
inputGate
接收端:inputChannel
flink1.5后基于credit机制
1
2
3
4
5
6
7
8
9
flink CDC
1、flink-cdc首次加载数据是否为全量加载
2、小量数据加载到内存的场景
flink 双流join
可以实现双流join的方式有
1、union
2、connect
3、window join
4、Interval join
子主题
5、CoGroup
然后根据业务逻辑选择不同的实现方式
0 条评论
下一页