flink1.2详细知识点
2021-03-17 21:03:57 0 举报
AI智能生成
加强记忆,梳理框架。涵盖flink开发最新最全知识点
作者其他创作
大纲/内容
编程指南
运行架构
运行时组件
作业管理器(JobManger)
控制一个应用程序执行的主进程,也就是说,每个应用程序都会被一个不同的JobManager所控制执行。JobManager会先接收到要执行的应用程序,这个应用程序会包括:作业图(JobGraph)、逻辑数据流图(logical dataflow graph)和打包了所有的类、库和其它资源的JAR包。JobManager会把JobGraph转换成一个物理层面的数据流图,这个图被叫做“执行图”(ExecutionGraph),包含了所有可以并发执行的任务。JobManager会向资源管理器(ResourceManager)请求执行任务必要的资源,也就是任务管理器(TaskManager)上的插槽(slot)。一旦它获取到了足够的资源,就会将执行图分发到真正运行它们的TaskManager上。而在运行过程中,JobManager会负责所有需要中央协调的操作,比如说检查点(checkpoints)的协调
资源管理器(ResourceManager)
主要负责管理任务管理器(TaskManager)的插槽(slot),TaskManger插槽是Flink中定义的处理资源单元。Flink为不同的环境和资源管理工具提供了不同资源管理器,比如YARN、Mesos、K8s,以及standalone部署。当JobManager申请插槽资源时,ResourceManager会将有空闲插槽的TaskManager分配给JobManager。如果ResourceManager没有足够的插槽来满足JobManager的请求,它还可以向资源提供平台发起会话,以提供启动TaskManager进程的容器。另外,ResourceManager还负责终止空闲的TaskManager,释放计算资源。
任务管理器(TaskManager)
Flink中的工作进程。通常在Flink中会有多个TaskManager运行,每一个TaskManager都包含了一定数量的插槽(slots)。插槽的数量限制了TaskManager能够执行的任务数量。启动之后,TaskManager会向资源管理器注册它的插槽;收到资源管理器的指令后,TaskManager就会将一个或者多个插槽提供给JobManager调用。JobManager就可以向插槽分配任务(tasks)来执行了。在执行过程中,一个TaskManager可以跟其它运行同一应用程序的TaskManager交换数据。
分发器(Dispatcher)
可以跨作业运行,它为应用提交提供了REST接口。当一个应用被提交执行时,分发器就会启动并将应用移交给一个JobManager。由于是REST接口,所以Dispatcher可以作为集群的一个HTTP接入点,这样就能够不受防火墙阻挡。Dispatcher也会启动一个Web UI,用来方便地展示和监控作业执行的信息。Dispatcher在架构中可能并不是必需的,这取决于应用提交运行的方式
任务提交流程
Flink任务提交后,Client向HDFS上传Flink的Jar包和配置,之后向Yarn ResourceManager提交任务,ResourceManager分配Container资源并通知对应的NodeManager启动ApplicationMaster,ApplicationMaster启动后加载Flink的Jar包和配置构建环境,然后启动JobManager,之后ApplicationMaster向ResourceManager申请资源启动TaskManager,ResourceManager分配Container资源后,由ApplicationMaster通知资源所在节点的NodeManager启动TaskManager,NodeManager加载Flink的Jar包和配置构建环境并启动TaskManager,TaskManager启动后向JobManager发送心跳包,并等待JobManager向其分配任务
任务调度原理
客户端
客户端不是运行时和程序执行的一部分,但它用于准备并发送dataflow(JobGraph)给Master(JobManager),然后,客户端断开连接或者维持连接以等待接收计算结果
可以是运行在任何机器上
Client 可以结束进程(Streaming的任务),也可以不结束并等待结果返回
JobManager
主要负责调度 Job 并协调 Task 做 checkpoint
从 Client 处接收到 Job 和 JAR 包等资源后,会生成优化后的执行计划,并以 Task 的单元调度到各个 TaskManager 去执
TaskManager
启动的时候就设置好了槽位数(Slot)
每个 slot 能启动一个 Task,Task 为线程
首先会启动一个 JobManger,和一个或多个的 TaskManager
Client 提交任务给 JobManager,JobManager 再调度任务到各个 TaskManager 去执行
TaskManager 将心跳和统计信息汇报给 JobManager
TaskManager 之间以流的形式进行数据的传输。上述三者均为独立的 JVM 进程
TaskManger与Slots
每一个worker(TaskManager)都是一个JVM进程,它可能会在独立的线程上执行一个或多个subtask
task slot表示TaskManager拥有资源的一个固定大小的子集
假如一个TaskManager有三个slot,那么它会将其管理的内存分成三份给各个slot
slots好处:一个subtask将不需要跟来自其他job的subtask竞争被管理的内存,取而代之的是它将拥有一定数量的内存储备
这里不会涉及到CPU的隔离,slot目前仅仅用来隔离task的受管理的内存
如果一个TaskManager一个slot,那将意味着每个task group运行在独立的JVM中(该JVM可能是通过一个特定的容器启动的)
一个TaskManager多个slot意味着更多的subtask可以共享同一个JVM。而在同一个JVM进程中的task将共享TCP连接(基于多路复用)和心跳消息。它们也可能共享数据集和数据结构,因此这减少了每个task的负载
一个slot可以保存作业的整个管道
Task Slot是静态的概念,是指TaskManager具有的并发执行能力,可以通过参数taskmanager.numberOfTaskSlots进行配置
并行度parallelism是动态概念,即TaskManager运行程序时实际使用的并发能力,可以通过参数parallelism.default进行配置
假设一共有3个TaskManager,每一个TaskManager中的分配3个TaskSlot,也就是每个TaskManager可以接收3个task,一共9个TaskSlot,如果我们设置parallelism.default=1,即运行程序默认的并行度为1,9个TaskSlot只用了1个,有8个空闲,因此,设置合适的并行度才能提高效率。
taskmanager、slot、并行度之间的关系
1.TM个数*每个TMslots=最大并行度=总slots数
2.source、transform、sink中任意并行度的设置不能超过最大并行度
3.source的并行度<=最大并行度
flink 读取kafka 数据,partition分配
1.采取取模运算
2.partition个数为6;并行度为3
程序与数据流(DataFlow)
source\\transform\\sink
执行图(ExecutionGraph)
分为四层
StreamGraph
是根据用户通过 Stream API 编写的代码生成的最初的图。用来表示程序的拓扑结构
JobGraph
StreamGraph经过优化后生成了 JobGraph,提交给 JobManager 的数据结构
主要的优化为,将多个符合条件的节点 chain 在一起作为一个节点,这样可以减少数据在节点之间流动所需要的序列化/反序列化/传输消耗。
ExecutionGraph
JobManager 根据 JobGraph 生成ExecutionGraph。ExecutionGraph是JobGraph的并行化版本,是调度层最核心的数据结构
物理执行图
:JobManager 根据 ExecutionGraph 对 Job 进行调度后,在各个TaskManager 上部署 Task 后形成的“图”,并不是一个具体的数据结构
子主题
并行度(Parallelism)
Stream在算子之间传输数据的形式可以是one-to-one(forwarding)的模式也可以是redistributing的模式,具体是哪一种形式,取决于算子的种类。One-to-one:stream(比如在source和map operator之间)维护着分区以及元素的顺序。那意味着map 算子的子任务看到的元素的个数以及顺序跟source 算子的子任务生产的元素的个数、顺序相同,map、fliter、flatMap等算子都是one-to-one的对应关系。\t类似于spark中的窄依赖Redistributing:stream(map()跟keyBy/window之间或者keyBy/window跟sink之间)的分区会发生改变。每一个算子的子任务依据所选择的transformation发送数据到不同的目标任务。例如,keyBy() 基于hashCode重分区、broadcast和rebalance会随机重新分区,这些算子都会引起redistribute过程,而redistribute过程就类似于Spark中的shuffle过程。\t类似于spark中的宽依赖
任务链(Operator Chains)
流处理api
Enviroment
流处理
StreamExecutionEnvironment.
getExecutionEnvironment
如果程序是独立调用的,则此方法返回本地执行环境
如果命令行客户端调用程序以提交到集群,则此方法返回此集群的执行环境
没有设置并行度,则以flink-conf.yaml为准,默认为1
createLocalEnvironment
返回本地执行环境,需要在调用时指定默认的并行度
createRemoteEnvironment
将Jar提交到远程服务器。需要在调用时指定JobManager的IP和端口号,并指定要在集群中运行的Jar包val env = ExecutionEnvironment.createRemoteEnvironment(\"jobmanage-hostname\
批处理
ExecutionEnvironment
Source
从集合读取数据(fromCollection)
从element读取数据(fromElements)
从文件读取数据
val stream2 = env.readTextFile(\"YOUR_FILE_PATH\")
从kafka读取数据
kafka依赖<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka-0.11_2.11</artifactId> <version>1.10.0</version></dependency>
val properties = new Properties()properties.setProperty(\"bootstrap.servers\
从自定义source读取数据
1.传入SourceFunction
2.val stream4 = env.addSource( new MySensorSource() )
3.class MySensorSource extends SourceFunction[SensorReading]{// flag: 表示数据源是否还在正常运行var running: Boolean = trueoverride def cancel(): Unit = {running = false}override def run(ctx: SourceFunction.SourceContext[SensorReading]): Unit = {// 初始化一个随机数发生器val rand = new Random()var curTemp = 1.to(10).map(i => ( \"sensor_\
transform
简单转换
map
val streamMap = stream.map { x => x * 2 }
可以返回转换成一个样例类类型val ds = inputStream.map(data=>{ val arr = data.split(\
flatmap
List(\"a b\
filter
val streamFilter = stream.filter{ x => x == 1}
多流转换
Split+SELECT
DataStream → SplitStreamDataStream拆分成两个或者多个DataStream
SplitStream→DataStream从一个SplitStream中获取一个或者多个DataStream
像sql语句单独加一列标签字段一样,然后去选择每个标签
传lambda表达式
val splitStream = stream2 .split( sensorData => { if (sensorData.temperature > 30) Seq(\"high\") else Seq(\"low\") } )val high = splitStream.select(\"high\")val low = splitStream.select(\"low\")val all = splitStream.select(\"high\
也可传入类
connect和comap
连接两个数据流,他们的数据类型保持不变。放在一个流中,统一处理,但内部各自的数据和形式不发生变化
两个流相互独立,但可以按同一个规则同时处理
connect的流的类型可以不一样,只能操作两个,connect之后可以调整为一样的
union
类型必须是一样
可以操作多个流
val unionStream: DataStream[StartUpLog] = appStoreStream.union(otherStream)unionStream.print(\"union:::\")
keyBy转换
DataStream → KeyedStream
将一个流分成不相交的分区,每个分区有相同的key.每个key分在一个分区里
stream.keyBy(new DeviceIdPartitioner(initPartitions))
返回keyedStream,也是继承DataStream
rolling aggregation滚动聚合算子
针对keyedStream的每一个支流做聚合
sum/min/max/minBy/maxBy:底层继承的都是一个私有的aggregate方法
min如果并行计算,可能得不到想要的数据,如果全局排序只能设置并行度为1; 或者加入窗口;或者用reduce
Reduce
keyedStream->DataStream
分组数据流聚合操作
合并当前元素(当前这一条数据)和上次聚合(之前所有条数据)结果
包含每一次聚合的结果,而不是返回最后一次结果
val stream2 = env.readTextFile(\"YOUR_PATH\\\\sensor.txt\") .map( data => { val dataArray = data.split(\
支持的数据类型
基础数据类型
Java和Scala元组
scala样例类
JAVA简单对象
UDF函数
函数类
sourceFunction(特定)
对应富函数:RichSourceFunction
MapFunction(特定)
对应富函数:RichMapFunction
FilterFunction(特定)
对应富函数:RichFilterFunction
SinkFunction(特定)
对应富函数:RichSinkFunction
ProcessFunction(普遍适用)
对应富函数:RichProcessFunction(de)
.....
....
如何使用
1.继承或实现函数类/接口FilterFunction接口class FilterFilter extends FilterFunction[String] { override def filter(value: String): Boolean = { value.contains(\"flink\") }}val flinkTweets = tweets.filter(new FlinkFilter)
2.用匿名类的方式实现val flinkTweets = tweets.filter(new RichFilterFunction[String] {override def filter(value: String): Boolean = {value.contains(\"flink\")}})
3.实现函数类或者接口时候还可以传入参数,即定义构造器val tweets: DataStream[String] = ...val flinkTweets = tweets.filter(new KeywordFilter(\"flink\"))class KeywordFilter(keyWord: String) extends FilterFunction[String] {override def filter(value: String): Boolean = {value.contains(keyWord)}}
2.匿名函数
val tweets: DataStream[String] = ...val flinkTweets = tweets.filter(_.contains(\"flink\"))
逻辑相对比较简单时使用
Sink
通用 addSink
简单 writeAsCsv等
sinkRedis
1.引入连接器依赖<dependency> <groupId>org.apache.bahir</groupId> <artifactId>flink-connector-redis_2.11</artifactId> <version>1.0</version></dependency>
3.主函数调用val conf = new FlinkJedisPoolConfig.Builder().setHost(\"localhost\
JDBC自定义sink
1.引入依赖<dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.44</version></dependency>
2.添加myjdbcsink,实现RichSinkFunction
class MyJdbcSink() extends RichSinkFunction[SensorReading]{ var conn: Connection = _ var insertStmt: PreparedStatement = _ var updateStmt: PreparedStatement = _ // open 主要是创建连接 override def open(parameters: Configuration): Unit = { super.open(parameters) conn = DriverManager.getConnection(\"jdbc:mysql://localhost:3306/test\
3.主函数调用 dataStream.addSink(new MyJdbcSink())
sinkKafka
sinkEs
window
window是一种切割无线数据为有限数块进行处理的手段
无线的stream拆分成有限大小的buckets桶
window分为两类
countwindow
window size指相同Key的元素的个数,不是输入的所有元素的总数
滚动窗口
滑动窗口
每收到两个相同的key数据就计算一次,每一次计算window范围是10
timewindow
滚动窗口tumbling window
特点:时间对齐,窗口长度固定,没有重叠
适合BI统计,每个时间段的聚合
滑动窗口sliding window
特点:时间对齐,窗口长度固定,可以有重叠。
对最近一个时间段内的统计,求某个接口5min的失败率
传入两个参数,一个是窗口大小,一个是步长
会话窗口 session window
由一系列事件组合一个指定事件长度的timeout间隔组成,时间无对齐
windowapi
.window()方法
一般用.window()来定一个窗口,基于这个window取操作。必须在keyBy之后才能使用
flink提供更加简单的.timewindow和.countWindow方法
如果需要定义偏移量等,则必须要用底层的.window(WindowAssigner)
在测试中经常遇到一个问题,拿一个文件测试,窗口设置15s怎么都没有输出,原因是文件几条数据,还没等到15s,瞬间计算完毕。
窗口分配器(window assigner)
window()方法接收的输入参数是一个WindowAssigner
WindowAssigner负责将每条输入的数据分发到正确的windwo
通用的是可调用window(),返回WindowStream类型传入WindowAssigner(窗口分配器)TumblingEventTimeWindows是Procet类型,不能直接new.调用of方法,传入相当于调用构造方法WindowStream又可以有max/min等操作
创建不同类型定位窗口
滚动时间窗口(无法指定偏移量) .timewindow(Time.second(15))
会话窗口(无简写,要传入WindowAssigner):.window(EventTimeSessionWindows.withGap(Time.minutes(10)))
计数窗口
window function
定义了要对窗口中收集的数据做的计算操作
两类
\t增量聚合函数(incremental aggregation functions)
\t全窗口函数(full window functions)
先把窗口所有数据收集起来,等到计算的时候会遍历所有数据。ProcessWindowFunction就是一个全窗口函数。
其它可选api
\t.trigger() —— 触发器
定义 window 什么时候关闭,触发计算并输出结果
\t.evitor() —— 移除器
定义移除某些数据的逻辑
.allowedLteness()
允许处理迟到的数据,可传入1min,就是允许处理迟到1分钟的数据
\t.sideOutputLateData()
将迟到的数据放入侧输出流
\t.getSideOutput()
获取侧输出流
时间语义与watermark
时间语义
event time
如果要使用EventTime,那么需要引入EventTime的时间属性
val env = StreamExecutionEnvironment.getExecutionEnvironment// 从调用时刻开始给env创建的每一个stream追加时间特征env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime).需要结合WaterMark,指定消息中哪个字段是event_time
Ingestion Time
数据进入Flink的时间
Processing Time
每一个执行基于时间操作的算子的本地系统时间,与机器相关,默认的时间属性就是Processing Time
watermark
概念
乱序-Flink接收到的事件的先后顺序不是严格按照事件的Event Time顺序排列的。
如果只根据eventTime决定window的运行,我们不能明确数据是否全部到位,但又不能无限期的等下去此时必须有一个机制来保证一个特定的时间后,必须触发window进行统计,这个机制就是watermark
Watermark是一种衡量Event Time进展的机制
watermark是一种处理乱序的机制,通常用watermark+window来处理乱序
数据流中的watermark用于表示,timestamp小于Watermark的数据,都已经到达了
举例:我们设置的允许最大延迟到达时间为2s,所以时间戳为7s的事件对应的Watermark是5s,时间戳为12s的事件的Watermark是10s,如果我们的窗口1是1s~5s,窗口2是6s~10s,那么时间戳为7s的事件到达时的Watermarker恰好触发窗口1,时间戳为12s的事件到达时的Watermark恰好触发窗口2。也就是说,数据的时间戳不断变化,不断递增,当增大到一个【时间戳-t】恰好等于窗口时间,则窗口被触发关闭。
只要没有达到水位那么不管现实中的时间推进了多久都不会触发关窗
原理和特点
是一条特殊的数据记录,本身会有时间戳
watermark必须单调递增,确保事件时间时钟向前推进,而不是后退
watermark与数据的时间戳有关,即延迟
如何传递呢
如果是顺序的单个任务很好传递
并行任务,上游多个子任务,下游多个子任务。那么这些子任务如何更新和分配watermark呢
上游往下游传递时,直接广播,如上游任务watermark是4,直接广播给下游,表明4以前的数据都已接收处理完毕,你们下游不会再接收到4以前的数据了。那么问题来了,上游有多个并行子任务(分区),每个分区都会有自己的watermark,这个广播给下游的4是如何来决策的呢?我们举例来说明这个问题。假设现在有依次是 A到B到C的任务. B作为C的上游,会有多个分区,每个分区有一个WM,会把这些分区中最小的WM传递给C。当A的WM广播给B时,B用A的WM去更新其多分区WM中最小的值,并再更新完毕之后取其中WM最小的值发送给C。以此不断推移
Watermark的引入
1.assignAscendingTimestamps(升序数据,也不需要引入)
2.assignTimestampsAndWatermarks(乱序数据)
val env = StreamExecutionEnvironment.getExecutionEnvironment// 从调用时刻开始给env创建的每一个stream追加时间特性env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)val readings: DataStream[SensorReading] = env.addSource(new SensorSource).assignTimestampsAndWatermarks(new MyAssigner())MyAssigner有两种类型 AssignerWithPeriodicWatermarks AssignerWithPunctuatedWatermarks以上两个接口都继承自TimestampAssigner。
1.AssignerWithPunctuatedWatermarks这个flink没有封装好的类,只能自己去实现。间断式地生成,根据数据的规则
和周期性生成的方式不同,这种方式不是固定时间的,而是可以根据需要对每条数据进行筛选和处理
Event Time的使用一定要指定数据源中的时间戳。否则程序无法知道事件的事件时间是什么(数据源里的数据没有时间戳的话,就只能使用Processing Time了)。
系统会周期性的将watermark插入到流中(水位线也是一种特殊的事件!)。默认周期是200毫秒。可以使用ExecutionConfig.setAutoWatermarkInterval()方法进行设置。
适用比较密集,周期性的生成。为什么不每条数据都产生一个watermark?因为大数据量性能问题
val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)// 每隔5秒产生一个watermarkenv.getConfig.setAutoWatermarkInterval(5000)
产生watermark的逻辑:每隔5秒钟,Flink会调用AssignerWithPeriodicWatermarks的getCurrentWatermark()方法。如果方法返回一个时间戳大于之前水位的时间戳,新的watermark会被插入到流中。这个检查保证了水位线是单调递增的。如果方法返回的时间戳小于等于之前水位的时间戳,则不会产生新的watermark
EvnetTime在window中的使用
滚动窗口(TumblingEventTimeWindows)
def main(args: Array[String]): Unit = { // 环境 val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) env.setParallelism(1) val dstream: DataStream[String] = env.socketTextStream(\"localhost\
滑动窗口(SlidingEventTimeWindows)
def main(args: Array[String]): Unit = { // 环境 val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) env.setParallelism(1) val dstream: DataStream[String] = env.socketTextStream(\"localhost\
会话窗口(EventTimeSessionWindows)
def main(args: Array[String]): Unit = { // 环境 val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) env.setParallelism(1) val dstream: DataStream[String] = env.socketTextStream(\"localhost\
ProcessFunction API(底层API)
转换算子无法访问时间戳信息和水位线信息
Low-Level转换算子
可以访问时间戳
可以访问watermark
可以注册定时事件
8个Process Function
ProcessFunction
•\tKeyedProcessFunction
用来操作KeyedStream
处理流的每一个元素,输出为0个、1个或者多个元素。所有的Process Function都继承自RichFunction接口
都有open()、close()和getRuntimeContext()等方法
额外提供两个方法
流中的每一个元素都会调用这个方法
调用结果将会放在Collector数据类型中输出
Context可以访问元素的时间戳,元素的key,以及TimerService时间服务
Context还可以将结果输出到别的流(side outputs)
当之前注册的定时器触发时调用
参数timestamp为定时器所设定的触发的时间戳。Collector为输出结果的集合
OnTimerContext和processElement的Context参数一样,提供了上下文的一些信息,例如定时器触发的时间信息(事件时间或者处理时间)
•\tCoProcessFunction
•\tProcessJoinFunction
•\tBroadcastProcessFunction
•\tKeyedBroadcastProcessFunction
•\tProcessWindowFunction
•\tProcessAllWindowFunction
TimerService 和 定时器(Timers)
Context和OnTimerContext所持有的TimerService对象拥有以下方法
•\tcurrentProcessingTime(): Long 返回当前处理时间
•\tcurrentWatermark(): Long 返回当前watermark的时间戳
•\tregisterProcessingTimeTimer(timestamp: Long): Unit 会注册当前key的processing time的定时器。当processing time到达定时时间时,触发timer。
•\tregisterEventTimeTimer(timestamp: Long): Unit 会注册当前key的event time 定时器。当水位线大于等于定时器注册的时间时,触发定时器执行回调函数。
•\tdeleteProcessingTimeTimer(timestamp: Long): Unit 删除之前注册处理时间定时器。如果没有这个时间戳的定时器,则不执行。
•\tdeleteEventTimeTimer(timestamp: Long): Unit 删除之前注册的事件时间定时器,如果没有此时间戳的定时器,则不执行。
当定时器timer触发时,会执行回调函数onTimer()。注意定时器timer只能在keyed streams上面使用。
举例
侧输出流(SideOutPut)
大部分的datastream api是单一输出,是某种数据类型的流
split可以一条流拆分多流,但流的数据类型相同
process fucntion的sideoutput可产生多流,并且每个流的数据类型可以不一样
一个side output可以定义为OutputTag[X]对象
X是输出流的数据类型。process function可以通过Context对象发射一个事件到一个或者多个side outputs。
val monitoredReadings: DataStream[SensorReading] = readings .process(new FreezingMonitor)monitoredReadings .getSideOutput(new OutputTag[String](\"freezing-alarms\")) .print()readings.print()
CoprocessFunction
两条输入流,提供了操作每个输入流的方法:processElement1和processElement2
状态编程和容错机制
无状态
无状态的计算观察每个独立事件,并根据最后一个事件输出结果。例如,流处理应用程序从传感器接收温度读数,并在温度超过90度时发出警
有状态
有状态的计算则会基于多个事件输出结果
所有类型的窗口。例如,计算过去一小时的平均温度,就是有状态的计算
所有用于复杂事件处理的状态机。例如,若在一分钟内收到两个相差20度以上的温度读数,则发出警告,这是有状态的计
流与流之间的所有关联操作,以及流与静态表或动态表之间的关联操作,都是有状态的计算
可以认为状态就是一个本地变量,可被任务的业务逻辑访问
为了使运行时的Flink了解算子的状态,算子需要预先注册其状态
两种类型的状态
算子状态(operator state)
作用范围限定为算子任务,
由同一并行任务所处理的所有数据都可以访问到相同的状态
状态对于同一任务而言是共享的
算子状态不能由相同或不同算子的另一个任务访问
三种基本数据结构
列表状态(List state)将状态表示为一组数据的列表
联合列表状态(Union list state)也将状态表示为数据的列表。它与常规列表状态的区别在于,在发生故障时,或者从保存点(savepoint)启动应用程序时如何恢复
广播状态(Broadcast state)如果一个算子有多项任务,而它的每项任务状态又都相同,那么这种特殊情况最适合应用广播状态
键控状态(keyed state)
根据输入数据流中定义的键(key)来维护和访问的,不同的key有不同的状态
Flink为每个键值维护一个状态实例,并将具有相同键的所有数据,都分区到同一个算子任务中,这个任务会维护和处理这个key对应的状态
当任务处理一条数据时,它会自动将状态的访问范围限定为当前数据的key,具有相同key的所有数据都会访问相同的状态
Keyed State很类似于一个分布式的key-value map数据结构,只能用于KeyedStream(keyBy算子处理之后)
N种数据类型
ValueState[T]保存单个的值,值的类型为T。o\tget操作: ValueState.value()o\tset操作: ValueState.update(value: T)
ListState[T]保存一个列表,列表里的元素的数据类型为T。基本操作如下:o ListState.add(value: T)o ListState.addAll(values: java.util.List[T])o ListState.get()返回Iterable[T]o ListState.update(values: java.util.List[T])
状态编程
RichFlatMapFunction
flatMapState
状态一致性
引入状态,自然引入一致性
假设要对最近一小时登录的用户计数。在系统经历故障之后,计数结果是多少?如果有偏差,是有漏掉的计数还是重复计数?
一致性级别
at-most-once: 这其实是没有正确性保障的委婉说法——故障发生之后,计数结果可能丢失。既不恢复丢失的状态,也不重播丢失的数据。语义含义是最多处理一次时间。 举例:视频流丢一帧没关系,如TCP握手连接,目标是侦测网络连接
at-least-once: 也就是说,计数程序在发生故障后可能多算,但是绝不会少算,这表示计数结果可能大于正确值,但绝不会小于正确值 举例:1)统计当前数据出现过没有,出现了就true,没出现就false,多算没关系只要出现即可
exactly-once: 这指的是系统保证在发生故障后得到的计数结果与正确值一致。没有事件丢失,只处理一次,恰好处理一次,内部状态也仅仅更新一次
storm和spark streaming保证exactly-once:无法单独地对每条记录运用应用逻辑,而是同时处理多条(一批)记录,保证对每一批的处理要么全部成功,要么全部失败。这就导致在得到结果前,必须等待一批记录处理结束
Flink的一个重大价值在于,它既保证了exactly-once,也具有低延迟和高吞吐的处理能力。
端到端(end-to-end)状态一致性
除了流处理器以外还包含了数据源(kafka)和输出到持久化系统
各环节一致性划分
内部保证:依赖checkpoint
source端:外部源可重设数据的读取位置
sink端:保证从故障回复时,数据不会重复写入外部系统
事务写入:1.原子性,一系列操作要么全部成功,要么一个都不做。比如银行转账,要等到转入成功之后才可以转出2.需要构建事务来写入外部系统,构建的事务对应着 checkpoint,等到 checkpoint 真正完成的时候,才把所有对应的结果写入 sink 系统中
两种实现方式
两阶段提交(2pc)1.sink任务启动一个事务,将接下来的数据全部写入事务2.写入外部sink时,只是预提交3.收到cp通知时,才正式提交事务,实现真正写入4.DataStream API 提供了TwoPhaseCommitSinkFunction 接口
必须提供事务支持,sink任务必须能够模拟外部系统上的事务
提交事务必须是幂等操作
cp的间隔时间里,必须能够开启一个事务并接受数据写入
收到cp通知前,事务必须是等待提交状态
一致性检查点checkpoint
核心作用是确保状态正确,即使遇到程序中断,也要正确
数项链珠子
在项链上每隔一段就松松地系上一根有色皮筋
当珠子被拨动的时候,皮筋也可以被拨动;
你安排一个助手,让他在你和朋友拨到皮筋时记录总数。用这种方法,当有人数错时,就不必从头开始数
你向其他人发出错误警示,然后你们都从上一根皮筋处开始重数,助手则会告诉每个人重数时的起始数值,例如在粉色皮筋处的数值是多少
类似于皮筋标记,关键点是: 对于指定的皮筋而言,珠子的相对位置是确定的。 这让皮筋成为重新计数的参考点。总状态(珠子的总数)在每颗珠子被拨动之后更新一次,助手则会保存与每根皮筋对应的检查点状态。如当遇到粉色皮筋时一共数了多少珠子,当遇到橙色皮筋时又是多少。当问题出现时,这种方法使得重新计数变得简单
检查点算法(barrier对齐)
检查点算法
简单的想法:暂停应用,保存状态到检查点
flink改进:chandy算法的分布式快照。检查点的保存和数据处理分离。
检查点分界线(checkpoint barrier),相当于数据流插入一条特殊数据。检查点如何保存呢,是jobmanager发出指令
checkpoint配置
flink+kafka
内部 —— 利用checkpoint机制,把状态存盘,发生故障的时候可以恢复,保证内部的状态一致性
source —— kafka consumer作为source,可以将偏移量保存下来,如果后续任务出现了故障,恢复的时候可以由连接器重置偏移量,重新消费数据,保证一致性
sink —— kafka producer作为sink,采用两阶段提交 sink,需要实现一个 TwoPhaseCommitSinkFunction
flink由jobmanager协调各个tm进行checkpoint的存储,保存在statebackend中,默认statebackend是内存
1.第一条数据来了之后,开启一个 kafka 的事务(transaction)2.正常写入 kafka 分区日志但标记为未提交,这就是“预提交”3.jobmanager 触发 checkpoint 操作,barrier 从 source 开始向下传递,遇到 barrier 的算子将状态存入状态后端,并通知 jobmanager4.sink 连接器收到 barrier,保存当前状态,存入 checkpoint,通知 jobmanager,并开启下一阶段的事务,用于提交下个检查点的数据5.jobmanager 收到所有任务的通知,发出确认信息,表示 checkpoint 完成6.sink 任务收到 jobmanager 的确认信息,正式提交这段时间的数据7.外部kafka关闭事务,提交的数据可以正常消费了。
3种状态后端
•\tMemoryStateBackend内存级的状态后端,会将键控状态作为内存中的对象进行管理,将它们存储在TaskManager的JVM堆上;而将checkpoint存储在JobManager的内存中。
特点:快速、低延迟、但是不稳定。一般生产环境不用
• FsStateBackend将checkpoint存到远程的持久化文件系统(FileSystem)上。而对于本地状态,跟MemoryStateBackend一样,也会存在TaskManager的JVM堆上。
val env = StreamExecutionEnvironment.getExecutionEnvironmentval checkpointPath: String = ???val backend = new RocksDBStateBackend(checkpointPath)env.setStateBackend(backend)env.setStateBackend(new FsStateBackend(\"file:///tmp/checkpoints\
特点:内存级别的访问速度,一定的容错性。但是当本地的状态需要特别大时,本地TM的JVM堆无法存储这么大的内存
• RocksDBStateBackend将所有状态序列化后,存入本地的RocksDB中存储。
div style=\
保存点(Savepoints)
自定义的镜像保存功能
算法与checkpoints完全相同,可以认为就是具有一些额外元数据的检查点
与checkpoints不同,不会自动创建保存点,需要明确触发创建操作
优势
有计划的手动备份:时间点,什么状态,什么功能保存
版本迁移:如flink1.9迁移到1.20
暂停和重启应用:比如不重要的应用给重要应用让路资源时
更新应用程序
Table API与SQL
pom依赖
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner_2.11</artifactId> <version>1.10.0</version></dependency><dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-scala-bridge_2.11</artifactId> <version>1.10.0</version></dependency>
简单举例
def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) val inputStream = env.readTextFile(\"..\\\\sensor.txt\") val dataStream = inputStream .map( data => { val dataArray = data.split(\
基本程序结构
//创建val tableEnv = ...
//创建一张读表tableEnv.connect(...).createTemporaryTable(\"inputTable\")
//注册一张输出表tableEnv.connect(...)..createTemporaryTable(\"outputTable\")
//通过table api查询算子,得到一张结果表val result = tableEnv.from(\"inputTable\").select(...)
//通过sql查询语句,得到一张表val sqlResult = tableEnv.sqlQuery(\"select .. from ..\")
//将结果写入输出表sqlResult.insertInto(\"outputTable\")
执行环境
基于老版本planner流处理
val settings = EnvrionmentSettings.newInstance().font color=\"#c41230\
基于老版本planner批处理
val batchEnv = ExecutionEnvironment.getExecutionEnvironmentval oldBatchTableEnv =BatchTableEnvironment.create(batchEnv)
基于新版本blink流处理
val settings = EnvrionmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()val blinkStreamTableEnv = font color=\"#c41230\
基于新版本blink批处理
val settings = EnvrionmentSettings.newInstance().useBlinkPlanner().inBatchMode().build()val blinkBatchTableEnv = TableEnvironment.create(settings)
表的概念
表由一个identifier来制定
Catalog名
数据库
对象名
创建表
调用.connect()方法,连接外部系统
tableEnv.connect().withFormat(..).withSchema(...).createTemporaryTbale(..)
读取文件
val filePath=\"/../../file\"tableEnv.connect(new FileSystem().path(filepath)).withFormat(new OldCsv())//OldCsv弃用,引入flink-csv包,切换成新的csv接口.withSchema(new Schema() .field(\"id\
读取kafka
tableEnv.connect(new Kafka() .version(\"0.11\") .topic(\"sensor\") .property(\"zk.connect\
查询转换
DataStream转换成表
数据类型与schema的对应
1.基于名称(name-based)-都可改名
2.基于位置(position-based)-都可改名
表转换成DataStream
需要制定数据类型,要将每一行转换成数据类型
表作为流式查询结果,是动态更新
转换有两种模式:追加(Append)撤回(Retract)
创建临时视图
createTemporaryView
表的输出
tableEnv.connect() .createTemporaryTable(\"outputTable\")val rt:Table = ..rt.insertInto(\"outtable\")
输出到文件
输出到hive
输出到kafka
输出到es
upsert模式
输出到mysql
table.toAppendStream
table.toRetractStream
聚合,有状态数据,类似拉链,对过期的数据作废。
key不断加和,过期的值打个false的标记
更新模式
追加(Append)模式1.只做插入操作
撤回(Retract)模式1.添加add和撤回(retract)消息2.插入、删除、更新
更新插入(Upsert)
查看执行计划
TableEnvironment.explain()方法,返回一个字符串,描述三个计划1.优化的逻辑查询计划2.优化后的逻辑查询计划3.实际执行的计划val explain:String = tableEnv.explain(rstable)println(explain)
动态表
动态表随时间变化
持续查询
永不终止,并会生成另一个动态表
字段
tableapi窗口聚合操作
// 统计每10秒中每个传感器温度值的个数def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val inputStream = env.readTextFile(\"..\\\\sensor.txt\") val dataStream = inputStream .map( data => { val dataArray = data.split(\
GROUP BY
函数
内置函数
比较函数
sql
v1 = v2v1 > v2
tableapi
any1 === any2any1 > any2
逻辑函数
boolean1 or boolean2boolean1 IS FALSENOT boolean
||.isFalse!BOOLEAN
算数函数
+n1.power(n2)
其他函数
用户自定义函数(UDF)
标量函数(Scalar Functions)一对一
表函数(Table Functions)一对多
聚合函数(UDAF)把一个表中数据,聚合成一个标量值(多对一)
继承AggregationFunction
工作原理
首先,他需要一个累加器,保存聚合中间结果的数据结构。可以createAccumulator()方法创建空累加器
随后,对每个输入行调用函数的accumulate()方法来更新累加器
处理完所有行,调用函数的getValue()方法计算并返回最终结果
必须实现三个方法
getValue()
accumulate()
createAccumulator()
表聚合函数(TableAggreatieFunctions)(多对多) top值
时间窗口
定义处理时间(Processing Time)-三种方法
2.DDL定义
3.定义TableSchema时指定
3.定义 Tableschema时指定
窗口,主要两种
Group Windows(分组窗口)根据时间或行计数间隔
Table API中使用Group Windows
会话窗口
SQL中使用Group Windows
Over Windows针对每个输入行,计算相邻行范围内的聚合
TableAPI使用over window
无界over windows
有界over windows
sql使用over window
编写SQL
Flink的监控和优化
metrics
metric类型
counter
对一个计数器进行累加,对于多条或多兆数据一直往上加的过程
Gauge
meter
Meter 是指统计吞吐量和单位时间内发生“事件”的次数。它相当于求一种速率,即事件次数除以使用的时间
Histogram
Histogram 用于统计一些数据的分布,比如说 Quantile、Mean、StdDev、Max、Min 等
metric group
Metric 在 Flink 内部有多层结构,以 Group 的方式组织,它并不是一个扁平化的结构,Metric Group + Metric Name 是 Metrics 的唯一标识
•TaskManagerMetricGroup •TaskManagerJobMetricGroup •TaskMetricGroup •TaskIOMetricGroup •OperatorMetricGroup •${User-defined Group} / ${User-defined Metrics} •OperatorIOMetricGroup•JobManagerMetricGroup •JobManagerJobMetricGroup
Table API & SQL
表api
SQL
统一流和批,很容易被人接受
依赖
表api: flink-table-planner_2.11
批处理查询:flink-tabl-api-java/scala-brige_2.11
流处理:flink-streaming-scala_2.11
kafka依赖:flink-connector-kafka
json依赖:flink-json
Stream SQL
获取运行环境(sEnv)
获取表api运行环境(sTableEnv)
StreamTableEnvironment.create
读取数据源
数据转换
DataStream转为table
注册为一个表
registerTable
sqlQuery
Table table2 = sTableEnv.sqlQuery().select(\"fields\")
表转换为流
执行
Batch SQL
获取运行环境
创建Batch环境
读取 数据返回DataSet
tEnv.registerDataSet(name=\"user1\
Table table= tEnv.sqlQuery(\"select * from user1\")
表转为Dataset
toDataSet
Batch Table
Table table2 =table1.select(\"name\")
Table API & SQL流处理
动态表和连续查询
动态表会随时间变化
查询动态表生成的是连续查询
查询不断更新其结果表
流上定义表
连续查询
表转流
流查询的结果表被动态更新,随着新纪录到达发生变化
两种模式
Append-only Modeinsert,且以前结果永不更新
流转为表
Table table sTableEnv.fromDataStream(ds2)
sTableEnv.registerTable(name=\"wc\
append mod
分组统计涉及到更新,如果用append则报错
Retract Mode
始终都可以使用此模式
返回false,代表数据撤回
flink sql connector读取kafka
sTableEnv.connect(new Kafka().version() .topci().startEaliest() .property(\"group.id\
查询转换为table
table转为流
追加模式
Table API
tEnv.scan
filter/groupBy/select
SQL操作
基于Apache Calcite
flink知识点
系统架构
组件栈
Deploy层
flink部署模式 Single JVM
Clusterstandalone\\yarn
Runtime层
主要负责flink的核心实现,支持分布式stream处理,jobgraph到executiongraph的映射、调度
API层
DataStream API
DataSetAPI
Library层
CEP
EventProcessing对复杂事件处理的库
FlinkML机器学习\\Gelly图计算
0 条评论
回复 删除
下一页