Flink总结
2020-07-21 10:21:07 12 举报
AI智能生成
Apache Flink 是一个分布式大数据处理引擎,可对有限数据流和无限数据流进行有状态或无状态的计算,能够部署在各种集群环境,对各种规模大小的数据进行快速计算。
作者其他创作
大纲/内容
Flink
Flink基础知识
Flink简介
特点
批流通一
高吞吐
低延迟
保证Exactly-Once
丰富的编程API
Flink编程模型
抽象数据集
DataStream(实时)
Dataset(离线)
Flink的重要概念
JobManager
Flink中的管理进程
TaskManager
Flink中负责执行管理资源和执行subTask的进程
Client
Flink用来提交任务和客户端,可以是用命令提交,也可以用浏览器提交
Task
Task是一个阶段多个功能相同subtask的集合,类似Spark中的TaskSet
subTask
subTask是Flink中任务最小执行单元,是一个Java类的实例,这个java类中有属性和方法,完成具体的计算逻辑
Operator Chain
没有shuffle的多个算子合并在一个subTask中就形成了Operator Chain,类似Spark中的Pipeline
Slot
Flink中计算(内存)资源进行隔离的单元,一个Slot中可以运行多个subTask,但是这些subTask必须是来自同一个application的不同阶段的subTask
State
Flink任务在运行过程中的中间结果
Checkpoint
Flink用来将中间计算结果持久化的指定的存储系统的一种定期执行的机制
StateBacked
Flink用来存储中间计算结果的存储系统,Flink支持三种StateBackend,分别是Memory、FsBackend、RocksDB
Flink阶段划分
调用KeyBy这样的产生Shuffle的算子
Task的并行度发送变化
调用startNewChain方法会划分
调用disableChainning
Flink共享资源槽
Flink的任务资源槽的默认名称为default
可以通过调用slotSharingGroup方法指定槽位的名称
如果改变共享槽位的名称后,后面的没有在设置共享槽位的名称,那么跟上一次改变槽位的名称一致;槽位名称不同的subTask不能同在一个槽位中执行
槽位名称不同的subTask不能同在一个槽位中执行
Flink重启策略
Flink开启checkpoint功能,同时就开启了重启策略,默认是不开重启
Flink的重启策略可以配置成启动固定次数且每次延迟指定时间启动
Flink任务出现异常后,会根据配置的重启策略重新启动,即将原来的subTask释放,从新生成subTask并调度到TaskManager的slot中运行
Flink任务重启后,从新生成的subTask被调度到TaskManager中,会从Stagebackend中恢复上一次checkpoint的状态
Flink的时间
ProcessingTime
即数据被Operator处理时的时间
IngestionTime
数据从消息中间件读取,以后要将数据的产生时间提取出来
EventTime
数据产生的时间,以后要将数据的产生时间提取出来
Flink的Window
CountWindow
按照数据的条数触发窗口的执行
TimeWindow
滚动窗口
滑动窗口
会话窗口
Flink的WaterMark
Watermark中文翻译叫水位线,是Flink中一种延迟触发任务机制,跟EventTime结合使用
要设置使用EventTime作为时间标准env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime
要提取数据的EventTime作为TimeStamp
可以设置最大乱序延迟时间
watermark = 数据所携带的最大EventTime - 延迟时间
触发时机: waterMark的时间 >= 一个窗口的结束边界
Flink侧输出流
数据流拆分
Flink中,可以将一个流中的数据根据数据的不同属性进行if判断或模式匹配,然后给各个流打上标签,以后可以根据标签的名字,取出想要的类型的数据
侧输出流的好处要比filter的效率高,不必对数据进行多次处理,就可以将不同类型的数据拆分
首先定义一个标签:OutputTag<LogBean> flowOutputTag = new OutputTag<LogBean>("flow-date"){};
最后可以通过标签名称从主流中获取想要的侧流: mainDataStream.getSideOutput(flowOutputTag);
获取窗口延迟数据
首先定义一个窗口延迟数据的标签:OutputTag<LogBean> lateOutputTag = new OutputTag<LogBean>("late-date");
在调用完窗口后调用.sideOutputLateData(lateOutputTag)
获取延迟数据:.getSideOutput(lateOutputTag);
Flink的State
State的概念
State是Flink计算过程的中间结果和状态信息,为了容错,必须把状态持久化到一个外部的系统中
State的分类
KeyState
调用keyBy方法后,每个分区中相互独立的state
OperateState
没有分组,每一个subTask自己维护一个状态
Broadcast state
广播state,一个可以通过connect方法获取广播流的数据
广播state通常做为字典数据、维度数据关联。广播到属于该任务的所有TaskManager中,类似map side join,提高效率
State的使用
先定义一个状态描述器
通过context获取state
对数据处理后要更新数据
自定义Operater
Source
通常继承RichSourceFunction或RichParallelSourceFunction
Tranformation
通常继承RichMapFunction
重写open方法
初始化工作,通常打开数据库连接或HttpClient等
重写invoke方法
对数据进行处理,可以使用open方法中定义的数据库连接和API请求接口
新写close方法
释放资源
Sink
通常继承RichSinkFunction
Flink保证ExactlyOnce
是使用执行ExactlyOnce的数据源,比如Kafka
使用FlinkKafkaConsumer,开启CheckPointing,偏移量会保存通过CheckPoint保存到StateBackend中,并且默认会将偏移量写入Kafka的特殊topic中,即:__consumer_offsets
FlinkKafkaConsumr的setCommitOffsetsOnCheckpoints参数默认true,即将偏移量写入到Kafka特殊的Topic中,目的是为了监控或重启任务没有指定savePoint时可以接着一起的偏移量继续消费
并且设置CheckpointingMode.EXACTLY_ONCE
存储系统支持覆盖(Redis、HBase、ES)
使用幂等性,将原来的数据覆盖
Barrier【隔离带】可以保证一个流水线中的所有算子都处理完成了在对该条数据做CheckPoint
存储系统不支持覆盖
要支持事务,成功了提交事务和更新偏移量,如果失败可以回滚且不更新偏移量,并且放序
Flink的JOIN
window join
Wind join如果并行的不为,任务不会触发执行,自定义一个Tigger,Trigger中有两个方法onElement (来一条数据)
interval join
先keyBy,再进行join
Flink的RocksDB StateBackend
作业:为了存储大量的state的数据,可以将state先在TaskManager中保存在一个rocksDB的数据库中,触发CheckPoint
在程序中设置
导入rocksDB的maven依赖
全局设置
Flink结合布隆过滤器
Flink流处理简介
Flink是什么
Apache Flink是一个框架和分布式处理引擎,用于对无界和有界数据流进行状态计算
为什么要用Flink
流数据更真实地反映了我们的生活方式传统的数据架构是基于有限数据集的
结果的准确性和良好的容错性
哪些行精需要处理流数据
电商和市场营销
数据报表、广告投放、业务流程需要
物联网
传感器实时数据采集和显示、实时报警,交通运输
电信业
基站流量调配
银行和金融业
实时结算和通知推送,实时检测异常行为
流处理的发展和演变
传统数据处理架构
事务处理
分析处理
将数据从业务数据库复制到数仓,再进行分析和查询
有状态的流式处理
流处理的演变
lambda架构
用两套系统,同时保证低延迟和结果准确
Storm
优点:低延迟、时间正确/语义化窗口
缺点:不支持高吞吐,在压力下保持正确
Spark Streaming
优点:高吞吐、在压力下保持正确
缺点:延迟高
优点:高吞吐、低延迟、时间正确/语义化窗口、在压力下保持正确、操作简单/表现力好
Flink的主要特点
事件驱动(EventTime)
基于流的世界观
在Flink的世界观中,一切都是由流组成的,离线数据是有界的流;实时数据是一个没有界限的流:这就是所谓的有界流和无界流
分层API
越顶层越抽象,表达含义越简明,使用越方便越底层越具体,表达能力越丰富,使用越灵活
SQL/Table API(dynamic tables)
Flink的其它特点
支持事件时间(event-time)和处理时间(processing-time)
语义
精确一次(exactly-once)的状态一致性保证
低延迟,每秒处理数百万个事件,毫秒级延迟
与众多常用存储系统的连接
高可用,动态扩展,实现7*24小时全天候运行
Flink vs Spark Streaming
区别1
SparkStreaming微批(mirro-batching)
Flink流(stream)
区别2
数据模型
spark采用RDD模型,spark streaming的DStream实际上也就是一组组小指数据RDD的集合
flink基本数据模型是数据流,以及事件(Event)序列
运行时架构
spark是批计算,将DAG划分为不同的stage,一个完成后才可以计算下一个
flink是标准的流执行模式,一个事件在一个节点处理完后可以直接发往下一个节点进行处理
Flink运行架构
Flink运行时的组件
ResourceManager
资源管理器
Flink为不同的环境和资源管理工具提供了不同资源管理器,比如YARN、Mesos、K8S,以及standalone部署
当JobManager申请插槽资源时,ResourceManager会将空闲插槽的TaskManager分配给JobManager。如果ResourceManager没有足够的插槽来满足JobManager的请求,它还可以向资源提供平台发起会话,以提供启动TaskManager进程的容器。
作业管理器
控制一个应用程序执行的主进程,也就是说,每个应用程序都会被一个不同的JobManager所控制执行
JobManager会先接收到要执行的应用程序,这个应用程序会包括:作业图(JobGraph)、逻辑数据流图(logical dataflow graph)和打包了所有的类、库和其他资源的JAR包。
JobManager会把JobGraph转换成一个物理层面的数据流图,这个图被叫做“执行图”(ExecutionGraph),包含 了所有可以并发执行的任务。
JobManager会向资源管理器(ResourceManager)请求执行任务必要的资源,也就是任务管理器(TaskManager)上的插槽(slot)。一旦它获取到了足够的资源,就会将执行图分发到真正运行它们的TaskManager上。而在运行过程中,JobManager会负责所有需要中央协调的操作,比如说检查点(checkpoints)的协调
任务管理器
Flink中的工作进程。通常在Flink中会有多个TaskManager运行,每个TaskManager都包含一定数量的插槽(slots)。插槽的数量限制了TaskManager能够执行的任务数量。
启动之后,TaskManager会向资源管理器注册它的插槽;收到资源管理器的指令后,TaskManager就会将一个或多个插槽提供给JobManager调用。JobManager就可以向插槽分配任务(tasks)来执行了。
在执行过程中,一个TaskManager可以跟其它运行同一应用程序的TaskManager交换数据
Dispacher
分发器
可以跨作业运行,它为应用提交提供了REST接口。
当一个应用被提交执行时,分发器就会启动并将应用移交给一个JobManager。
Dispatcher也会启动一个WebUI,用来方便地展示和监控作业的执行的信息。
Dispatcher在架构中可能并不是必需的,这取决于应用提交运行的方式。
任务提交流程
YARN
1.Flink Client 上传Flink的Jar包和配置到HDFS
2.Flink Client提交Job给ResourceManager
4.AM向RM申请资源
5.RM反回可用资源列表给AM
6.AM根据资源列表在对应的NodeManager启动TaskManager
分支主题
任务调度原理
TaskManager和Slots
Flink中每个TaskManager都是一个JVM进程,它可能会在独立的线程上执行一个或多个subtask
为了控制一个TaskManager能接收多少个task,TaskManager通过task slot来进行控制(一个TaskManager至少有一个slot)
默认情况下,Flink允许子任务共享slot,即使它们是不同任务的子任务。这样的结果是,一个slot可以保存作业的整个管道。
Task Slot是静态的概念,是指TaskManager具有的并发执行能力
程序与数据流(DataFlow)
所有的Flink程序都是由三部分组成的:Source、Transfromation和Sink
Source负责读取数据源,Transformation利用各种算子进行处理加工,Sink负责输出
在运行时,Flink上运行的程序会被映射成“逻辑数据流”(dataflows),它包含了这三部分
每一个dataflow以一个或多个source开始以一个或多个sink结束。dataflow类似于任意的有向无环图(DAG)
在大部分情况下,程序中的转换运算(transformations) 跟dataflow中的算子(operator)是一一对应的关系
执行图(ExecutionGraph)
Flink中的执行图可以分成四层:StreamGraph -> JobGraph -> ExecutionGraph -> 物理执行图
StreamGraph: 是根据用户通过Stream API编写的代码生成的最初的图。用来表示程序的拓扑结构。
JobGraph:StreamGraph经过优化后生成了JobGraph,提交给JobManager的数据结构。主要的优化为,将多个符合条件的节点chain在一起作为一个节点
ExecutionGraph: JobManager根据JobGraph生成ExecutionGraph。ExecutionGraph是JobGraph的并行化版本,是调度层最核心的数据结构。
物理执行图:JobManager根据ExecutionGraph对Job进行调度后,在各个TaskManager上部署Task后形成的“图”,并不是一个具体的数据结构。
并行度(Parallelism)
一个特定算子的子任务(subtask)的个数被称之为其并行度(parallelism)。一般情况下,一个stream的并行度,可以认为就是其所有算子中最大的并行度。
一个程序中,不同的算子可能具有不同的并行度
算子之间传输数据的形式可以是one-to-one(forwarding)模式也可以是redistributing的模式,具体是哪一种形式,取决于算子的种类
one-to-one: stream维护着分区以及元素的顺序(比如source和map之间)。这意味着map算子的子作务看到的元素的个数以及顺序跟source算子的子任务生产的元素的个数、顺序相同。map、filter、flatMap等算子都是one-to-one的对应关系。
Redistributing: stream的分区会发生改变。每一个算子的子任务依据所选择的transformation发送数据到不同的目标任务。例如,keyBy基于hashCode重分区、而broadcast和rebalance会随机重新分区,这些算子都会引起redistribute过程,而redistribute过程就类似于Spark中的shuffle过程。
任务链(Operator Chains)
Flink采用了一种称为任务链的优化技术,可以在特定条件下减少本地通信的开销。为了满足任务链的要求,必须将两个或多个算子设为相同的并行度,并通过本地转发(local forward)的方式进行连接
相同并行度的one-to-one操作,Flink这样相连的算子链接在一起形成一个task,原来的算子成为里面的subtask
并行度相同、并且是one-to-one操作,两个条件缺一不可
Flink window API
window概念
窗口(window)
一般真实的流都是无界的,怎样处理无界的数据?
可以把无限的数据流进行切分,得到有限的数据集进行处理--也就是得到有界流
窗口(window)就是将无限流切割为有限流的一种方式,它会将流数据分发到有限大小的桶(bucket)中进行分析
window类型
时间窗口(Time Window)
滚动时间窗口(Tumbling Windows)
将数据依所固定的窗口长度对数据进行切分
时间对齐,窗口长度固定,没有重叠
滑动时间窗口(Sliding Windows)
滑动窗口是固定窗口的更广义的一种形式,滑动窗口由长度和滑动间隔组成
窗口长度固定,可以有重叠
会话窗口(Session Windows)
由一系列事件组合一个指定时间长度的timeout间隙组成,也就是一段时间没有接收到新数据就会生成新的窗口
特点:时间无对齐
计数窗口(Count Window)
滚动计数窗口
滑动计数窗口
window API
窗口分配器--window()方法
我们可以用.window()来定义一个窗口,然后基于这个window去做一些聚合或者其它处理操作。注意window()方法必须在keyBy之后才能用。
Flink提供了更加简单的.timeWindow和.countWindow方法,用于定义时间窗口和计数窗口
窗口分配器(window assigner)左闭右开【)包含开始,不包含结束
window()方法接收的输入参数是一个WindowAssigner
WindowAssigner负责将每条输入数据分发到正确的window中
Flink提供了通用的WindowAssigner
滚动窗口(tumbling window)
滑动窗口(sliding window)
会话窗口(session window)
全局窗口(global window)
创建不同类型的窗口
滚动时间窗口(tumbling time window)
.timeWindow(Time.seconds(15))
滑动时间窗口(sliding time window)
会话窗口(session window)
.window(EventTimeSessionWindows.withGap(Time.minutes(10))
滚动计数窗口(tumbling count window)
.countWindow(5)
滑动计数窗口(sliding count window)
窗口函数(window function)
富函数(Rich Functions)
富函数(Rich Functions) “富函数”是 DataStream API 提供的一个函数类的接口,所有 Flink 函数类都 有其 Rich 版本。它与常规函数的不同在于,可以获取运行环境的上下文,并拥有一 些生命周期方法,所以可以实现更复杂的功能。 RichMapFunction RichFlatMapFunction RichFilterFunction … Rich Function 有一个生命周期的概念。典型的生命周期方法有: open()方法是 rich function 的初始化方法,当一个算子例如 map 或者 filter被调用之前 open()会被调用。 close()方法是生命周期中的最后一个调用的方法,做一些清理工作。 getRuntimeContext()方法提供了函数的 RuntimeContext 的一些信息,例如函数执行的并行度,任务的名字,以及 state 状态
window function定义了要对窗口中收集的数据做的计算操作
增量聚合函数(incremental aggregation functions)
每条数据到来就进行计算,保持一个简单的状态
全窗口函数(full window functions)
先把窗口所有数据收集起来,等到计算的时候遍历所有数据
ProcessWindowFunction
其它可选API
.trigger() -- 触发器
定义window什么时候关闭,触发计算并输出结果
.evitor() -- 移除器
定义移除某些数据的逻辑
.allowedLateness() -- 允许处理迟到的数据
.sideOutputLateData() -- 将迟到的数据放入侧输出流
.getSideOutput() -- 获取侧输出流
Flink 中的时间语义和watermark
Flink中的时间语义
EventTime: 事件创建的时间
Ingestion Time: 数据进入Flink的时间
Processing Time: 执行操作算子的本地系统时间,与机器无关
哪种时间语义更重要
不同的时间语义有不同的应用场合
我们往往更关心事件时间(Event Time)
某些应用场合,不应该使用Processing Time
Event Time可以从日志数据的时间戳(timestamp)中提取
设置EventTime
我们可以直接在代码中,对执行环境调用setStreamTimeCharacteristic方法,设置流的时间特性
具体的时间,还需要从数据中提取时间戳(timestamp)
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment // 默认时间语义是:processes time // 从调用时刻开始给env创建的每一个stream追加时间特征 //env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
乱序数据的影响
Flink 以Event Time模式处理数据流时,它会根据数据里的时间戳来处理基于时间的算子
由于网络、分布式等原因,会导致乱序数据的产生
乱序数据会让窗口计算不准确
水位线(Watermark)
怎样避免乱序数据带来计算不正确?
遇到一个时间戳达到了窗口关闭时间,不应该立刻触发窗口计算,而是等待一段时间,等迟到的数据来了再关闭窗口
Watermark是一种衡量EventTime进展的机制,可以设定延迟触发
Watermark是用于处理乱序事件的,而正确的处理乱序事件,通常用Watermark机制结合window来实现;
数据流中的Watermark用于表示timestamp小于Watermark的数据,都已经到过了,因此,window的执行也是由Watermark触发的。
watermark用来让程序自己平衡延迟和结果正确性
watermark的特点
watermark是一条特殊的数据记录
watermark必须单调递增,以确保任务的事件时间时钟在向前推进,而不是在后退
watermark与数据的时间戳相关
watermark的传递、引入和设定
watermark的传递
watermark的引入
EventTime的使用一定要指定数据源中的时间戳
调用assignTimestampAndWatermarks方法,传入一个BoundedOutOfOrdernessTimestampExtractor,就可以指定watermark
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[SensorReading](Time.seconds(1)) { override def extractTimestamp(element: SensorReading): Long = { element.timestamp * 1000 } })
对于排好序的数据,不需要延迟触发,可以只指定时间戳就行了
// 注意单位是毫秒,所以根据时间戳的不同,可能需要乘1000dataStream.(_.timestamp * 1000)
Flink暴露了TimestampAssigner接口供我们实现,使我们可以自定义如何从事件数据中抽取时间戳和生成watermark
// dataStream.assignTimestampsAndWatermarks(new MyAssigner())
MyAssigner可以有两种类型,都继承自TimestampAssigner
TimestampAssigner
定义了抽取时间戳,以及生成watermark的方法,有两种类型
AssignerWithPeriodicWatermarks
周期性的生成watermark: 系统会周期性的将watermark插入到流中
默认周期是200毫秒,可以使用ExecutionConfig.setAutoWatermarkInterval()方法进行设置
升序和前面乱序的处理BoundedOutOfOrderness,都是基于周期性watermark的。
AssignerWithPunctuatedWatermarks
没有时间周期规律,可打断的生成watermark
watermark的设定
在Flink中,watermark由应用程序开发人员生成,这通常需要对相应的领域有一定的了解
如果watermark设置的延迟太久,收到结果的速度可能就会很慢,解决办法是在水位线到达之前输出一个近似结果
而如果watermark到达得太早,则可能收到错误结果,不过Flink处理迟到数据的机制可以解决这个问题
Flink的容错机制
一致性检查点(checkpoint)
Flink故障恢复机制的核心,就是应用状态的一致性检查点
有状态流应用的一致检查点,其实就是所有任务的状态,在某个时间点的一份拷贝(一份快照);这个时间点,应该是所有任务恰好处理完一个相同的输入数据的时候
从检查点恢复状态
在执行流应用程序期间,Flink会定期保存状态的一致检查点
如果发生故障,Flink将会使用最近的检查点来一致恢复应用程序的状态,并重新启动处理流程
从检查点恢复状态步骤
遇到故障之后,第一步就是重启应用
第二步是从checkpoint中读取状态,将状态重置从检查点重新启动应用程序后,其内部状态与检查点完成时的状态完全相同
第三步:开始消费并处理检查点到发生故障之间的所有数据这种检查点的保存和恢复机制可以为应用程序状态提供“精确一次”(exactly-once)的一致性,因为所有算子都会保存检查点并恢复其所有状态,这样一来所有的输入流就都会被重置到检查点完成时的位置
Flink检查点算法
检查点分界线(Checkpoint Barrier)
Flink的检查点算子用到了一种称为分界线(barrier)的特殊数据形式,用来把一条流上数据按照不同的检查点分开
分界线之前到来的数据导致的状态更改,都会被包含在当前分界线所属的检查点中;而基于分界线之后的数据导致的所有更改,就会被包含在之后的检查点中
原理示例
现在是一个有两个输入流的应用程序,用并行的两个Source任务来读取
JobManager会向每个source任务发送一条带有新检查点ID的消息,通过这种方式来启动检查点
数据源将它们的状态写入检查点,并发出一个检查点barrier状态后端在状态存入检查点之后,会返回通知给source任务,source任务就会向JobManager确认检查点完成
分界线对齐:barrier向下游传递,sum任务会等待所有输入分区的barrier到达对于barrier已经到达的分区,继续到达的数据会被缓存而barrier尚未到达的分区,数据会被正常处理
当收到所有输入分区的barrier时,任务就将其状态保存到状态后端的检查点中,然后将barrier继续向下游转发
向下游转发检查点barrier后,任务继续正常的数据处理
Sink任务向JobManager确认状态保存到checkpoint完毕当所有任务都确认已成功将状态保存到检查点时,检查点就真正完成了
保存点(save points)
Flink还提供了可以自定义的镜像保存功能,就是保存点(savepoints)
原则上,创建保存点使用的算法与检查点完全相同,因此保存点可以认为就是具有一些额外元数据的检查点
Flink不会自动创建保存点,因此用户(或者外部调度程序)必须明确地触发创建操作
保存点是一个强大的功能。除了故障恢复外,保存点可以用于:有计划的手动备份,更新应用程序,版本迁移,暂停和重启应用,等等
Flink的状态一致性
状态一致性
什么是状态一致性
有状态的流处理,内部每个算子任务都可以有自己的状态
对于流处理器内部来说,所谓的状态一致性,其实就是我们所说的计算结果要保证准确。
一条数据不应该丢失,也不应该重复计算
在遇到故障时可以恢复状态,恢复以后的重新计算,结果应该也是完全正确的
状态一致性分类
AT-MOST-ONCE(最多一次)
当任务故障时,最简单的做法是什么都不干,即不恢复丢失的状态,也不重播丢失的数揣。At-most-once语义的含义是最多处理一次事件。
AT-LEAST-ONCE(至少一次)
在大多数的真实应用场景,我们希望不丢失事件。这种类型的保障称为at-least-once,意思是所有的事件都得到了处理,而一些事件还可能被处理多次。
EXACTLY-ONCE(精确一次)
恰好处理一次是最严格的保证,也是最难实现的。恰好处理一次语义不仅仅意味着没有事件丢失,还意味着针对每一个数据,内部状态仅仅更新一次。
Flink使用了一种轻量级快照机制--检查点(checkpoint)来保证exactly-once语义
有状态流应用的一致检查点,其实就是:所有任务的状态,在某个时间点的一份拷贝(一份快照)。而这个时间点,应该是所有任都恰好处理完一个相同的输入数据的时候。
应用状态的一致检查点,是Flink帮障恢复机制的核心
端到端(end-to-end)状态一致性
目前我们看到的一致性保证都 是流处事器实现的,也就是说都是在Flink流处理器内部保证的;而在真实应用中,流处理应用除了流处理器以后还包含了数据源(例如Kafka)和输出到持久化系统
端到端的一致性保证,意味着结果的正确性贯穿了整个流处理应用的始终;每一个组件都保证了它自己的一致性
整个端到端的一致性级别取决于所有组件一致性最弱的组件
端到端的精确一次(exactly-once)保证
内部保证--checkpoint
source端--可重设数据的读取位置
幂等写入(Idempotent Writes)
所谓幂等操作,是说一个操作,可以重复执行很多次,但只导致一次结果更改,也就是说,后面再重复执行就不起作用了
事务写入(Transactional Writes)
事务(Transaction)
应用程序中一系列严密的操作,所有操作必须成功完成,否则在每个操作中所作的所有更改都会被撤销
具有原子性:一个事务中的一系列的操作要么全部成功,要么一个都不做
实现思想
构建的事务对应着checkpoint,等到checkpoint真正完成的时候,才把所有对应的结果写入sink系统中
实现方式
把结果数据当成状态保存,然后在收到checkpoint完成的通知时,一次性写入sink系统
简单易于实现,由于数据提前在状态后端中做了缓存,所以无论什么sink系统,都能这种方式一批搞定
DataStream API提供了一个模板类:GenericWriteAheadSink,来实现这种事务性sink
实现步骤示例
对于每个checkpoint,sink任务会启动一个事务,并将接下来所有接收的数据添加到事务里
然后将这些数据写入外部sink系统,但不提交它们--这时只是“预提交”
当它收到checkpoint完成的通知时,它才正式提交事务,实现结果的真正写入
这种方式真正实现了exactly-once,它需要一个提供事务支持的外部sink系统。Flink提供了TwoPhaseCommitSinkFunction接口。
2PC对外部sink系统的要求
外部sink系统必须提供事务支持,或者sink任务必须能够模拟外部系统上的事务
在checkpoint的间隔期间里,必须能够开启一个事务并接受数据写入
在收到checkpoint完成的通知之前,事务必须是“等待提交”的状态。在故障恢复的情况下,这可能需要一些时间。如果这个时候sink系统关闭事务(例如超时了),那么未提交的数据就会丢失
sink任务必须能够在进程失败后恢复事务
提交事务必须是幂等操作
不同Source和Sink的一致性保证
Flink + Kafka端到端状态一致性的保证
内部--利用checkpoint机制,把状态存盘,发生故障的时候可以恢复,保证内部的状态一致性
source--kafka consumer作为source,可以将偏移量保存下来,如果后续任务出现了故障,恢复的时候可以由连接器重置偏移量,重新消费数据,保证一致性
sink--kafka producer作为sink,采用两阶段提交sink,需要实现一个TwoPhaseCommitSinkFunction
Exactly-once两阶段提交
JobManager协调各个TaskManager进行checkpoint存储
checkpoint保存在StateBackend中,默认StateBackend是内存级的,也可以改为文件级的进行持久化保存
当checkpoint启动时,JobManager会将检查点分界钱(barrier)注入数据流
barrier会在算子间传递下去
每个算子会对当前的状态做个快照,保存到状态后端
checkpoint机制可以保证内部的状态一致性
每个内部的transform任务遇到barrier时,都会把状态存到checkpoint里
sink任务首先把数据写入外部kafka,这些数据都属于预提交的事务;遇到barrier时,把状态保存到状态后端,并开启新的预提交事务
当所有算子任务的快照完成,也就是这次的checkpoint完成时,JobManager会向所有任务发通知,确认这次checkpoint完成
sink任务收到确认通知,正式提交之前的事务,kafka中未确认数据改为“已确认”
Exactly-once两阶段提交步骤
第一条数据来了之后,开启一个kafka的事务(transaction),正常写入kafka分区日志但标记为未提交,这就是“预提交”
jobmanager触发checkpoint操作,barrier从source开始向下传递,遇到barrier的算子将状态存入状态后端,并通知jobmanager
sink连接器收到barriter,保存当前状态,存入checkpoint,通知jobmanager,并开启下一阶段的事务,用于提交下个检查点的数据
jobmanager收到所有任务的通知,发出确认信息,表示checkpoint完成
sink任务收到jobmanager的确认信息,正式提交这段时间的数据
外部kafka关闭事务,提交的数据可以正常消费了
Flink CEP简介
什么是CEP
Flink CEP是在Flink 中实现的复杂事件处理(CEP)库
CEP允许在无休止的事件流中检测事件模式,让我们有机会掌握数据中重要的部分
一个或多个由简单事件构成的事件流通过一定的规则匹配,然后输出用户想得到的数据--满足规则的复杂事件
CEP的特点
目标:从有序的简单事件流中发现一些高阶特征
输入:一个或多个由简单事件构成的事件流
处理:识别简单事件之彰的内在联系,多个符合一定规则的简单事件构成复杂事件
输出:满足规则的复杂事件
Pattern API
处理事件的规则,被叫做“模式”(Pattern)
Flink CEP提供了Pattern API,用于对输入流数据进行复杂事件规则定义,用来提取符合规则的事件序列
模式组 --> 组合模式/模式序列 --> 个体模式
个体模式(Individual Patterns)
组成复杂规则的每一个单独的模式定义,就是“个体模式”
start.times(3).where(_.behavior.startsWith("fav"))
个体模式可以包括“单例(singleton)模式”和“循环(looping)模式”单例模式只接收一个事件,而循环模式可以接收多个
量词(Quantifier)
可以在一个个体模式后追加量词,也就是指定循环次数
// 匹配出现4次start.times(4)
// 匹配出现0或4次start.times(4).optional
// 匹配出现1次或多次start.oneOrMore
// 匹配出现0次、2次或多次,并且尽可能多地重复匹配start.timesOrMore(2).optional.greedy
个体模式的条件
每个模式都需要指定触发条件,作为模式是否接受事件进入的判断依据
CEP中的个体模式主要通过调用.where() .or()和 .until()来指定条件
按不同的调用方式,可以分成以下几类:
简单条件(Simple Condition)
通过.where() 方法对事件中的字段进行判筛选,决定是否接受该事件
start.where(event => event.getName.startsWith("foo"))
组合条件(Combining Condition)
将简单条件进行合并; .or() 方法表示或逻辑相连,where的直接组合就是AND
pattern.where(event => ... /* come condition */).or(event => ... /* or condition */)
终止条件(Stop Condition)
如果使用了oneOrMore或者oneOrMore.optional,建议使用.until() 作为终止条件,以便清理状态
迭代条件(Iterative Condition)
能够对模式之前所有接收的事件进行处理
组合模式/模式序列(Combining Patterns)
很多个体模式组合起来,就形成了整个的模式序列
模式序列必须以一个“初始模式”开始:val start = Pattern.begin("start")
不同的“近邻”模式
严格近邻(Strict Contiguity)
所有事件按照严格的顺序出现,中间没有任何不匹配的事件,由.next()指定
宽松近邻(Relaxed Contiguity)
允许中间出现不匹配的事件,由.followBy() 指定
非确定性宽松近邻(Non-Deterministic Relaxed Contiguity)
进一步放宽条件,之前已经匹配过的事件也可以再次使用,由.followedByAny()指定
不希望出现某种近邻关系
.notNext() -- 不想让某个事件严格紧邻前一个事件发生
.notFollowedBy() -- 不想让某个事件在两件事件之间发生
需要注意:
所有模式序列必须以.begin()开始
模式序列不能以.notFollowedBy() 结束
“not”类型的模式不能被optional 所修饰
此外,还可以为模式指定时间约束,用来要求在多长时间内匹配有效
next.within(Time.seconds(10))
模式的检测
指定要查找的模式序列后,就可以将其应用于输入流以检测潜在匹配
调用CEP.pattern(),给定输入流和模式,就能得到一个PatternStream
匹配事件的提取
创建PatternStream之后,就可以应用select或者flatselect方法,从检测到的事件序列中提取事件了
select() 方法需要输入一个 select function作为参数,每个成功匹配的事件序列都会调用它
超时事件的提取
当一个模式通过within关键词定义了检测窗口时间时,部分事件序列可能因为超过窗口长度而被丢弃; 为了能够处理这些超时的部分匹配,select和flatSelect API调用允许指定超时处理程序
超时处理程序会接收到目前为止由模式匹配到的所有事件,由一个OutputTag定义接收到的超时事件序列
模式组(Groups of patterns)
将一个模式序列作为条件嵌套在个体模式里,成为一组模式
0 条评论
下一页