flink1.2详细知识点
2021-03-17 21:03:57 0 举报
AI智能生成
加强记忆,梳理框架。涵盖flink开发最新最全知识点
作者其他创作
大纲/内容
编程指南
运行架构
运行时组件
作业管理器(JobManger)
控制一个应用程序执行的主进程,也就是说,每个应用程序都会被一个不同的JobManager所控制执行。JobManager会先接收到要执行的应用程序,这个应用程序会包括:作业图(JobGraph)、逻辑数据流图(logical dataflow graph)和打包了所有的类、库和其它资源的JAR包。JobManager会把JobGraph转换成一个物理层面的数据流图,这个图被叫做“执行图”(ExecutionGraph),包含了所有可以并发执行的任务。JobManager会向资源管理器(ResourceManager)请求执行任务必要的资源,也就是任务管理器(TaskManager)上的插槽(slot)。一旦它获取到了足够的资源,就会将执行图分发到真正运行它们的TaskManager上。而在运行过程中,JobManager会负责所有需要中央协调的操作,比如说检查点(checkpoints)的协调
资源管理器(ResourceManager)
主要负责管理任务管理器(TaskManager)的插槽(slot),TaskManger插槽是Flink中定义的处理资源单元。Flink为不同的环境和资源管理工具提供了不同资源管理器,比如YARN、Mesos、K8s,以及standalone部署。当JobManager申请插槽资源时,ResourceManager会将有空闲插槽的TaskManager分配给JobManager。如果ResourceManager没有足够的插槽来满足JobManager的请求,它还可以向资源提供平台发起会话,以提供启动TaskManager进程的容器。另外,ResourceManager还负责终止空闲的TaskManager,释放计算资源。
任务管理器(TaskManager)
Flink中的工作进程。通常在Flink中会有多个TaskManager运行,每一个TaskManager都包含了一定数量的插槽(slots)。插槽的数量限制了TaskManager能够执行的任务数量。启动之后,TaskManager会向资源管理器注册它的插槽;收到资源管理器的指令后,TaskManager就会将一个或者多个插槽提供给JobManager调用。JobManager就可以向插槽分配任务(tasks)来执行了。在执行过程中,一个TaskManager可以跟其它运行同一应用程序的TaskManager交换数据。
分发器(Dispatcher)
可以跨作业运行,它为应用提交提供了REST接口。当一个应用被提交执行时,分发器就会启动并将应用移交给一个JobManager。由于是REST接口,所以Dispatcher可以作为集群的一个HTTP接入点,这样就能够不受防火墙阻挡。Dispatcher也会启动一个Web UI,用来方便地展示和监控作业执行的信息。Dispatcher在架构中可能并不是必需的,这取决于应用提交运行的方式
任务提交流程
Flink任务提交后,Client向HDFS上传Flink的Jar包和配置,之后向Yarn ResourceManager提交任务,ResourceManager分配Container资源并通知对应的NodeManager启动ApplicationMaster,ApplicationMaster启动后加载Flink的Jar包和配置构建环境,然后启动JobManager,之后ApplicationMaster向ResourceManager申请资源启动TaskManager,ResourceManager分配Container资源后,由ApplicationMaster通知资源所在节点的NodeManager启动TaskManager,NodeManager加载Flink的Jar包和配置构建环境并启动TaskManager,TaskManager启动后向JobManager发送心跳包,并等待JobManager向其分配任务
任务调度原理
客户端
客户端不是运行时和程序执行的一部分,但它用于准备并发送dataflow(JobGraph)给Master(JobManager),然后,客户端断开连接或者维持连接以等待接收计算结果
可以是运行在任何机器上
Client 可以结束进程(Streaming的任务),也可以不结束并等待结果返回
JobManager
主要负责调度 Job 并协调 Task 做 checkpoint
从 Client 处接收到 Job 和 JAR 包等资源后,会生成优化后的执行计划,并以 Task 的单元调度到各个 TaskManager 去执
TaskManager
启动的时候就设置好了槽位数(Slot)
每个 slot 能启动一个 Task,Task 为线程
首先会启动一个 JobManger,和一个或多个的 TaskManager
Client 提交任务给 JobManager,JobManager 再调度任务到各个 TaskManager 去执行
TaskManager 将心跳和统计信息汇报给 JobManager
TaskManager 之间以流的形式进行数据的传输。上述三者均为独立的 JVM 进程
TaskManger与Slots
每一个worker(TaskManager)都是一个JVM进程,它可能会在独立的线程上执行一个或多个subtask
一个TM能启动多少个task,通过slots来控制
task slot表示TaskManager拥有资源的一个固定大小的子集
假如一个TaskManager有三个slot,那么它会将其管理的内存分成三份给各个slot
slots好处:一个subtask将不需要跟来自其他job的subtask竞争被管理的内存,取而代之的是它将拥有一定数量的内存储备
这里不会涉及到CPU的隔离,slot目前仅仅用来隔离task的受管理的内存
如果一个TaskManager一个slot,那将意味着每个task group运行在独立的JVM中(该JVM可能是通过一个特定的容器启动的)
一个TaskManager多个slot意味着更多的subtask可以共享同一个JVM。而在同一个JVM进程中的task将共享TCP连接(基于多路复用)和心跳消息。它们也可能共享数据集和数据结构,因此这减少了每个task的负载
一个slot可以保存作业的整个管道
Task Slot是静态的概念,是指TaskManager具有的并发执行能力,可以通过参数taskmanager.numberOfTaskSlots进行配置
并行度parallelism是动态概念,即TaskManager运行程序时实际使用的并发能力,可以通过参数parallelism.default进行配置
假设一共有3个TaskManager,每一个TaskManager中的分配3个TaskSlot,也就是每个TaskManager可以接收3个task,一共9个TaskSlot,如果我们设置parallelism.default=1,即运行程序默认的并行度为1,9个TaskSlot只用了1个,有8个空闲,因此,设置合适的并行度才能提高效率。
taskmanager、slot、并行度之间的关系
1.TM个数*每个TMslots=最大并行度=总slots数
2.source、transform、sink中任意并行度的设置不能超过最大并行度
3.source的并行度<=最大并行度
4.source的并行度=topic partitions,正好的情况,一个并行度,读一个分区数据
source的并行度<topic partitions,部分并行度读多个分区
source的并行度>topic partitions,部分并行度没有数据的情况
source的并行度<topic partitions,部分并行度读多个分区
source的并行度>topic partitions,部分并行度没有数据的情况
flink 读取kafka 数据,partition分配
1.采取取模运算
2.partition个数为6;并行度为3
程序与数据流(DataFlow)
source\transform\sink
执行图(ExecutionGraph)
分为四层
StreamGraph
是根据用户通过 Stream API 编写的代码生成的最初的图。用来表示程序的拓扑结构
JobGraph
StreamGraph经过优化后生成了 JobGraph,提交给 JobManager 的数据结构
主要的优化为,将多个符合条件的节点 chain 在一起作为一个节点,这样可以减少数据在节点之间流动所需要的序列化/反序列化/传输消耗。
ExecutionGraph
JobManager 根据 JobGraph 生成ExecutionGraph。ExecutionGraph是JobGraph的并行化版本,是调度层最核心的数据结构
物理执行图
:JobManager 根据 ExecutionGraph 对 Job 进行调度后,在各个TaskManager 上部署 Task 后形成的“图”,并不是一个具体的数据结构
子主题
并行度(Parallelism)
Stream在算子之间传输数据的形式可以是one-to-one(forwarding)的模式也可以是redistributing的模式,具体是哪一种形式,取决于算子的种类。
One-to-one:stream(比如在source和map operator之间)维护着分区以及元素的顺序。那意味着map 算子的子任务看到的元素的个数以及顺序跟source 算子的子任务生产的元素的个数、顺序相同,map、fliter、flatMap等算子都是one-to-one的对应关系。
类似于spark中的窄依赖
Redistributing:stream(map()跟keyBy/window之间或者keyBy/window跟sink之间)的分区会发生改变。每一个算子的子任务依据所选择的transformation发送数据到不同的目标任务。例如,keyBy() 基于hashCode重分区、broadcast和rebalance会随机重新分区,这些算子都会引起redistribute过程,而redistribute过程就类似于Spark中的shuffle过程。
类似于spark中的宽依赖
One-to-one:stream(比如在source和map operator之间)维护着分区以及元素的顺序。那意味着map 算子的子任务看到的元素的个数以及顺序跟source 算子的子任务生产的元素的个数、顺序相同,map、fliter、flatMap等算子都是one-to-one的对应关系。
类似于spark中的窄依赖
Redistributing:stream(map()跟keyBy/window之间或者keyBy/window跟sink之间)的分区会发生改变。每一个算子的子任务依据所选择的transformation发送数据到不同的目标任务。例如,keyBy() 基于hashCode重分区、broadcast和rebalance会随机重新分区,这些算子都会引起redistribute过程,而redistribute过程就类似于Spark中的shuffle过程。
类似于spark中的宽依赖
任务链(Operator Chains)
流处理api
Enviroment
流处理
StreamExecutionEnvironment.
getExecutionEnvironment
如果程序是独立调用的,则此方法返回本地执行环境
如果命令行客户端调用程序以提交到集群,则此方法返回此集群的执行环境
没有设置并行度,则以flink-conf.yaml为准,默认为1
createLocalEnvironment
返回本地执行环境,需要在调用时指定默认的并行度
createRemoteEnvironment
将Jar提交到远程服务器。需要在调用时指定JobManager的IP和端口号,并指定要在集群中运行的Jar包
val env = ExecutionEnvironment.createRemoteEnvironment("jobmanage-hostname", 6123,"YOURPATH//wordcount.jar")
val env = ExecutionEnvironment.createRemoteEnvironment("jobmanage-hostname", 6123,"YOURPATH//wordcount.jar")
批处理
ExecutionEnvironment
Source
从集合读取数据(fromCollection)
// 定义样例类,传感器id,时间戳,温度
case class SensorReading(id: String, timestamp: Long, temperature: Double)
object Sensor {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream1 = env
.fromCollection(List(
SensorReading("sensor_1", 1547718199, 35.8),
SensorReading("sensor_6", 1547718201, 15.4),
SensorReading("sensor_7", 1547718202, 6.7),
SensorReading("sensor_10", 1547718205, 38.1)
))
stream1.print("stream1:").setParallelism(1)
env.execute()
}
}
case class SensorReading(id: String, timestamp: Long, temperature: Double)
object Sensor {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream1 = env
.fromCollection(List(
SensorReading("sensor_1", 1547718199, 35.8),
SensorReading("sensor_6", 1547718201, 15.4),
SensorReading("sensor_7", 1547718202, 6.7),
SensorReading("sensor_10", 1547718205, 38.1)
))
stream1.print("stream1:").setParallelism(1)
env.execute()
}
}
从element读取数据(fromElements)
val numbers: DataStream[Long] = env.fromElements(1L, 2L, 3L, 4L)
numbers.map( n => n + 1 )
numbers.map( n => n + 1 )
从文件读取数据
val stream2 = env.readTextFile("YOUR_FILE_PATH")
从kafka读取数据
kafka依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.11_2.11</artifactId>
<version>1.10.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.11_2.11</artifactId>
<version>1.10.0</version>
</dependency>
val properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9092")
properties.setProperty("group.id", "consumer-group")
properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
properties.setProperty("auto.offset.reset", "latest")
val stream3 = env.addSource(new FlinkKafkaConsumer011[String]("sensor", new SimpleStringSchema(), properties))
properties.setProperty("bootstrap.servers", "localhost:9092")
properties.setProperty("group.id", "consumer-group")
properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
properties.setProperty("auto.offset.reset", "latest")
val stream3 = env.addSource(new FlinkKafkaConsumer011[String]("sensor", new SimpleStringSchema(), properties))
从自定义source读取数据
1.传入SourceFunction
2.val stream4 = env.addSource( new MySensorSource() )
3.class MySensorSource extends SourceFunction[SensorReading]{
// flag: 表示数据源是否还在正常运行
var running: Boolean = true
override def cancel(): Unit = {
running = false
}
override def run(ctx: SourceFunction.SourceContext[SensorReading]): Unit = {
// 初始化一个随机数发生器
val rand = new Random()
var curTemp = 1.to(10).map(
i => ( "sensor_" + i, 65 + rand.nextGaussian() * 20 )
)
while(running){
// 更新温度值
curTemp = curTemp.map(
t => (t._1, t._2 + rand.nextGaussian() )
)
// 获取当前时间戳
val curTime = System.currentTimeMillis()
curTemp.foreach(
t => ctx.collect(SensorReading(t._1, curTime, t._2))
)
Thread.sleep(100)
}
}
}
// flag: 表示数据源是否还在正常运行
var running: Boolean = true
override def cancel(): Unit = {
running = false
}
override def run(ctx: SourceFunction.SourceContext[SensorReading]): Unit = {
// 初始化一个随机数发生器
val rand = new Random()
var curTemp = 1.to(10).map(
i => ( "sensor_" + i, 65 + rand.nextGaussian() * 20 )
)
while(running){
// 更新温度值
curTemp = curTemp.map(
t => (t._1, t._2 + rand.nextGaussian() )
)
// 获取当前时间戳
val curTime = System.currentTimeMillis()
curTemp.foreach(
t => ctx.collect(SensorReading(t._1, curTime, t._2))
)
Thread.sleep(100)
}
}
}
transform
简单转换
map
val streamMap = stream.map { x => x * 2 }
可以返回转换成一个样例类类型
val ds = inputStream.map(data=>{
val arr = data.split(",")
SensorReading(arr(0),arr(1).toLong,arr(2).toDouble)})
val ds = inputStream.map(data=>{
val arr = data.split(",")
SensorReading(arr(0),arr(1).toLong,arr(2).toDouble)})
flatmap
List("a b","c d")
val streamFlatMap = stream.flatMap{
x => x.split(" ")
}
val streamFlatMap = stream.flatMap{
x => x.split(" ")
}
filter
val streamFilter = stream.filter{
x => x == 1
}
x => x == 1
}
多流转换
Split+SELECT
DataStream → SplitStream
DataStream拆分成两个或者多个DataStream
DataStream拆分成两个或者多个DataStream
SplitStream→DataStream
从一个SplitStream中获取一个或者多个DataStream
从一个SplitStream中获取一个或者多个DataStream
像sql语句单独加一列标签字段一样,然后去选择每个标签
传lambda表达式
val splitStream = stream2
.split( sensorData => {
if (sensorData.temperature > 30) Seq("high") else Seq("low")
} )
val high = splitStream.select("high")
val low = splitStream.select("low")
val all = splitStream.select("high", "low")
.split( sensorData => {
if (sensorData.temperature > 30) Seq("high") else Seq("low")
} )
val high = splitStream.select("high")
val low = splitStream.select("low")
val all = splitStream.select("high", "low")
也可传入类
connect和comap
连接两个数据流,他们的数据类型保持不变。放在一个流中,统一处理,但内部各自的数据和形式不发生变化
两个流相互独立,但可以按同一个规则同时处理
connect的流的类型可以不一样,只能操作两个,connect之后可以调整为一样的
val warning = high.map( sensorData => (sensorData.id, sensorData.temperature) )
val connected = warning.connect(low)
val coMap = connected.map(
warningData => (warningData._1, warningData._2, "warning"),
lowData => (lowData.id, "healthy")
)
val connected = warning.connect(low)
val coMap = connected.map(
warningData => (warningData._1, warningData._2, "warning"),
lowData => (lowData.id, "healthy")
)
union
类型必须是一样
可以操作多个流
val unionStream: DataStream[StartUpLog] = appStoreStream.union(otherStream)
unionStream.print("union:::")
unionStream.print("union:::")
keyBy转换
keyBy(返回键控流),一般都要加一个聚合算子
DataStream → KeyedStream
将一个流分成不相交的分区,每个分区有相同的key.每个key分在一个分区里
stream.keyBy(new DeviceIdPartitioner(initPartitions))
返回keyedStream,也是继承DataStream
rolling aggregation
滚动聚合算子
滚动聚合算子
针对keyedStream的每一个支流做聚合
sum/min/max/minBy/maxBy:底层继承的都是一个私有的aggregate方法
min如果并行计算,可能得不到想要的数据,如果全局排序只能设置并行度为1; 或者加入窗口;或者用reduce
Reduce
keyedStream->DataStream
分组数据流聚合操作
合并当前元素(当前这一条数据)和上次聚合(之前所有条数据)结果
包含每一次聚合的结果,而不是返回最后一次结果
val stream2 = env.readTextFile("YOUR_PATH\\sensor.txt")
.map( data => {
val dataArray = data.split(",")
SensorReading(dataArray(0).trim, dataArray(1).trim.toLong, dataArray(2).trim.toDouble)
})
.keyBy("id")
.reduce( (x, y) => SensorReading(x.id, x.timestamp + 1, y.temperature) )
.map( data => {
val dataArray = data.split(",")
SensorReading(dataArray(0).trim, dataArray(1).trim.toLong, dataArray(2).trim.toDouble)
})
.keyBy("id")
.reduce( (x, y) => SensorReading(x.id, x.timestamp + 1, y.temperature) )
以上是lambda表达式,也可以实现一个类。 继承ReduceFunction
class MyReduceF extends ReduceFunction[SensorReading]{
override def reduce(v1:SensorReading,v2:SensorReading):SensorReading =
SensorReading(v1.id,v2.timestamp,v1.tmperature.min(v2.temperature))}
class MyReduceF extends ReduceFunction[SensorReading]{
override def reduce(v1:SensorReading,v2:SensorReading):SensorReading =
SensorReading(v1.id,v2.timestamp,v1.tmperature.min(v2.temperature))}
支持的数据类型
基础数据类型
支持所有的Java和Scala基础数据类型,Int,Double,Long,String
Java和Scala元组
val persons: DataStream[(String, Integer)] = env.fromElements(
("Adam", 17),
("Sarah", 23) )
persons.filter(p => p._2 > 18)
("Adam", 17),
("Sarah", 23) )
persons.filter(p => p._2 > 18)
scala样例类
case class Person(name: String, age: Int)
val persons: DataStream[Person] = env.fromElements(
Person("Adam", 17),
Person("Sarah", 23) )
persons.filter(p => p.age > 18)
val persons: DataStream[Person] = env.fromElements(
Person("Adam", 17),
Person("Sarah", 23) )
persons.filter(p => p.age > 18)
JAVA简单对象
public class Person {
public String name;
public int age;
public Person() {}
public Person(String name, int age) {
this.name = name;
this.age = age;
}
}
DataStream<Person> persons = env.fromElements(
new Person("Alex", 42),
new Person("Wendy", 23));
其它arrays,lists,maps,enums等
UDF函数
- 函数类
sourceFunction(特定)
对应富函数:RichSourceFunction
MapFunction(特定)
对应富函数:RichMapFunction
FilterFunction(特定)
对应富函数:RichFilterFunction
SinkFunction(特定)
对应富函数:RichSinkFunction
ProcessFunction(普遍适用)
对应富函数:RichProcessFunction(de)
.....
....
如何使用
1.继承或实现函数类/接口FilterFunction接口
class FilterFilter extends FilterFunction[String] {
override def filter(value: String): Boolean = {
value.contains("flink")
}}
val flinkTweets = tweets.filter(new FlinkFilter)
class FilterFilter extends FilterFunction[String] {
override def filter(value: String): Boolean = {
value.contains("flink")
}}
val flinkTweets = tweets.filter(new FlinkFilter)
2.用匿名类的方式实现
val flinkTweets = tweets.filter(
new RichFilterFunction[String] {
override def filter(value: String): Boolean = {
value.contains("flink")
}})
val flinkTweets = tweets.filter(
new RichFilterFunction[String] {
override def filter(value: String): Boolean = {
value.contains("flink")
}})
3.实现函数类或者接口时候还可以传入参数,即定义构造器
val tweets: DataStream[String] = ...
val flinkTweets = tweets.filter(new KeywordFilter("flink"))
class KeywordFilter(keyWord: String) extends FilterFunction[String] {
override def filter(value: String): Boolean = {
value.contains(keyWord)
}
}
val tweets: DataStream[String] = ...
val flinkTweets = tweets.filter(new KeywordFilter("flink"))
class KeywordFilter(keyWord: String) extends FilterFunction[String] {
override def filter(value: String): Boolean = {
value.contains(keyWord)
}
}
2.匿名函数
val tweets: DataStream[String] = ...
val flinkTweets = tweets.filter(_.contains("flink"))
val flinkTweets = tweets.filter(_.contains("flink"))
逻辑相对比较简单时使用
3.富函数
比普通函数富在哪里?
1)有生命周期,比如获取一些初始化参数,一些数据库的链接 open()
2)可以获取上下文进行状态编程、任务名获取等等,getRuntimeContext.getstate
3)做一些收尾工作,比如关闭链接或者清空状态 close()
比普通函数富在哪里?
1)有生命周期,比如获取一些初始化参数,一些数据库的链接 open()
2)可以获取上下文进行状态编程、任务名获取等等,getRuntimeContext.getstate
3)做一些收尾工作,比如关闭链接或者清空状态 close()
Sink
通用 addSink
简单 writeAsCsv等
sinkRedis
1.引入连接器依赖
<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>flink-connector-redis_2.11</artifactId>
<version>1.0</version>
</dependency>
<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>flink-connector-redis_2.11</artifactId>
<version>1.0</version>
</dependency>
2.定义redis mapper类,保存到redis调用的命令
class MyRedisMapper extends RedisMapper[SensorReading]{
override def getCommandDescription: RedisCommandDescription = {
new RedisCommandDescription(RedisCommand.HSET, "sensor_temperature")
}
override def getValueFromData(t: SensorReading): String = t.temperature.toString
override def getKeyFromData(t: SensorReading): String = t.id
}
class MyRedisMapper extends RedisMapper[SensorReading]{
override def getCommandDescription: RedisCommandDescription = {
new RedisCommandDescription(RedisCommand.HSET, "sensor_temperature")
}
override def getValueFromData(t: SensorReading): String = t.temperature.toString
override def getKeyFromData(t: SensorReading): String = t.id
}
3.主函数调用
val conf = new FlinkJedisPoolConfig.Builder().setHost("localhost").setPort(6379).build()
dataStream.addSink( new RedisSink[SensorReading](conf, new MyRedisMapper) )
val conf = new FlinkJedisPoolConfig.Builder().setHost("localhost").setPort(6379).build()
dataStream.addSink( new RedisSink[SensorReading](conf, new MyRedisMapper) )
1.FlinkJedisPoolConfig,连接池配置,是个private私有构造方法。无法直接进行new FlinkJedisPoolConfig
2.那么如何来用呢? 这个类有一个内部类builder是public类型,直接使用builder来构建
3.通用:构造方法私有化,构建一个内部类Builder,调用builder方法,就相当于调用了构造方法,把这写参数传递进来
2.那么如何来用呢? 这个类有一个内部类builder是public类型,直接使用builder来构建
3.通用:构造方法私有化,构建一个内部类Builder,调用builder方法,就相当于调用了构造方法,把这写参数传递进来
JDBC自定义sink
1.引入依赖
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.44</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.44</version>
</dependency>
2.添加myjdbcsink,实现RichSinkFunction
class MyJdbcSink() extends RichSinkFunction[SensorReading]{
var conn: Connection = _
var insertStmt: PreparedStatement = _
var updateStmt: PreparedStatement = _
// open 主要是创建连接
override def open(parameters: Configuration): Unit = {
super.open(parameters)
conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/test", "root", "123456")
insertStmt = conn.prepareStatement("INSERT INTO temperatures (sensor, temp) VALUES (?, ?)")
updateStmt = conn.prepareStatement("UPDATE temperatures SET temp = ? WHERE sensor = ?")
}
// 调用连接,执行sql
override def invoke(value: SensorReading, context: SinkFunction.Context[_]): Unit = {
updateStmt.setDouble(1, value.temperature)
updateStmt.setString(2, value.id)
updateStmt.execute()
if (updateStmt.getUpdateCount == 0) {
insertStmt.setString(1, value.id)
insertStmt.setDouble(2, value.temperature)
insertStmt.execute()
}
}
override def close(): Unit = {
insertStmt.close()
updateStmt.close()
conn.close()
}
}
var conn: Connection = _
var insertStmt: PreparedStatement = _
var updateStmt: PreparedStatement = _
// open 主要是创建连接
override def open(parameters: Configuration): Unit = {
super.open(parameters)
conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/test", "root", "123456")
insertStmt = conn.prepareStatement("INSERT INTO temperatures (sensor, temp) VALUES (?, ?)")
updateStmt = conn.prepareStatement("UPDATE temperatures SET temp = ? WHERE sensor = ?")
}
// 调用连接,执行sql
override def invoke(value: SensorReading, context: SinkFunction.Context[_]): Unit = {
updateStmt.setDouble(1, value.temperature)
updateStmt.setString(2, value.id)
updateStmt.execute()
if (updateStmt.getUpdateCount == 0) {
insertStmt.setString(1, value.id)
insertStmt.setDouble(2, value.temperature)
insertStmt.execute()
}
}
override def close(): Unit = {
insertStmt.close()
updateStmt.close()
conn.close()
}
}
3.主函数调用 dataStream.addSink(new MyJdbcSink())
sinkKafka
sinkEs
window
window是一种切割无线数据为有限数块进行处理的手段
无线的stream拆分成有限大小的buckets桶
window分为两类
countwindow
window size指相同Key的元素的个数,不是输入的所有元素的总数
滚动窗口
val minTempPerWindow: DataStream[(String, Double)] = dataStream
.map(r => (r.id, r.temperature))
.keyBy(_._1)
.countWindow(5)
.reduce((r1, r2) => (r1._1, r1._2.max(r2._2)))
.map(r => (r.id, r.temperature))
.keyBy(_._1)
.countWindow(5)
.reduce((r1, r2) => (r1._1, r1._2.max(r2._2)))
滑动窗口
val keyedStream: KeyedStream[(String, Int), Tuple] = dataStream.map(r => (r.id, r.temperature)).keyBy(0)
//每当某一个key的个数达到2的时候,触发计算,计算最近该key最近10个元素的内容
val windowedStream: WindowedStream[(String, Int), Tuple, GlobalWindow] = keyedStream.countWindow(10,2)
val sumDstream: DataStream[(String, Int)] = windowedStream.sum(1)
//每当某一个key的个数达到2的时候,触发计算,计算最近该key最近10个元素的内容
val windowedStream: WindowedStream[(String, Int), Tuple, GlobalWindow] = keyedStream.countWindow(10,2)
val sumDstream: DataStream[(String, Int)] = windowedStream.sum(1)
每收到两个相同的key数据就计算一次,每一次计算window范围是10
timewindow
滚动窗口tumbling window
特点:时间对齐,窗口长度固定,没有重叠
适合BI统计,每个时间段的聚合
val minTempPerWindow = dataStream
.map(r => (r.id, r.temperature))
.keyBy(_._1)
.timeWindow(Time.seconds(15))
.reduce((r1, r2) => (r1._1, r1._2.min(r2._2)))
.map(r => (r.id, r.temperature))
.keyBy(_._1)
.timeWindow(Time.seconds(15))
.reduce((r1, r2) => (r1._1, r1._2.min(r2._2)))
滑动窗口sliding window
特点:时间对齐,窗口长度固定,可以有重叠。
对最近一个时间段内的统计,求某个接口5min的失败率
传入两个参数,一个是窗口大小,一个是步长
val minTempPerWindow: DataStream[(String, Double)] = dataStream
.map(r => (r.id, r.temperature))
.keyBy(_._1)
.timeWindow(Time.seconds(15), Time.seconds(5))
.reduce((r1, r2) => (r1._1, r1._2.min(r2._2)))
// .window(SlidingEventTimeWindows.of(Time.seconds(15),Time.seconds(5))
.map(r => (r.id, r.temperature))
.keyBy(_._1)
.timeWindow(Time.seconds(15), Time.seconds(5))
.reduce((r1, r2) => (r1._1, r1._2.min(r2._2)))
// .window(SlidingEventTimeWindows.of(Time.seconds(15),Time.seconds(5))
会话窗口 session window
由一系列事件组合一个指定事件长度的timeout间隔组成,时间无对齐
windowapi
.window()方法
一般用.window()来定一个窗口,基于这个window取操作。必须在keyBy之后才能使用
flink提供更加简单的.timewindow和.countWindow方法
如果需要定义偏移量等,则必须要用底层的.window(WindowAssigner)
在测试中经常遇到一个问题,拿一个文件测试,窗口设置15s怎么都没有输出,原因是文件几条数据,还没等到15s,瞬间计算完毕。
窗口分配器(window assigner)
window()方法接收的输入参数是一个WindowAssigner
WindowAssigner负责将每条输入的数据分发到正确的windwo
通用的是可调用window(),返回WindowStream类型
传入WindowAssigner(窗口分配器)
TumblingEventTimeWindows是Procet类型,不能直接new.
调用of方法,传入相当于调用构造方法
WindowStream又可以有max/min等操作
传入WindowAssigner(窗口分配器)
TumblingEventTimeWindows是Procet类型,不能直接new.
调用of方法,传入相当于调用构造方法
WindowStream又可以有max/min等操作
创建不同类型定位窗口
滚动时间窗口(无法指定偏移量) .timewindow(Time.second(15))
滑动时间窗口(无法指定偏移量) .timeWindow(Time.second(15),Time.second(5))
会话窗口(无简写,要传入WindowAssigner):.window(EventTimeSessionWindows.withGap(Time.minutes(10)))
计数窗口
window function
定义了要对窗口中收集的数据做的计算操作
两类
增量聚合函数(incremental aggregation functions)
每条数据到来就进行计算,保持一个简单的状态。典型的增量聚合函数有ReduceFunction, AggregateFunction。
全窗口函数(full window functions)
先把窗口所有数据收集起来,等到计算的时候会遍历所有数据。ProcessWindowFunction就是一个全窗口函数。
其它可选api
.trigger() —— 触发器
定义 window 什么时候关闭,触发计算并输出结果
.evitor() —— 移除器
定义移除某些数据的逻辑
.allowedLteness()
允许处理迟到的数据,可传入1min,就是允许处理迟到1分钟的数据
.sideOutputLateData()
将迟到的数据放入侧输出流
.getSideOutput()
获取侧输出流
时间语义与watermark
时间语义
event time
事件创建的时间,时间戳,每一条日志都会记录自己的生成时间
如果要使用EventTime,那么需要引入EventTime的时间属性
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 从调用时刻开始给env创建的每一个stream追加时间特征
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime).
需要结合WaterMark,指定消息中哪个字段是event_time
// 从调用时刻开始给env创建的每一个stream追加时间特征
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime).
需要结合WaterMark,指定消息中哪个字段是event_time
Ingestion Time
数据进入Flink的时间
Processing Time
每一个执行基于时间操作的算子的本地系统时间,与机器相关,默认的时间属性就是Processing Time
watermark
概念
乱序-Flink接收到的事件的先后顺序不是严格按照事件的Event Time顺序排列的。
如果只根据eventTime决定window的运行,我们不能明确数据是否全部到位,但又不能无限期的等下去
此时必须有一个机制来保证一个特定的时间后,必须触发window进行统计,这个机制就是watermark
此时必须有一个机制来保证一个特定的时间后,必须触发window进行统计,这个机制就是watermark
Watermark是一种衡量Event Time进展的机制
watermark是一种处理乱序的机制,通常用watermark+window来处理乱序
数据流中的watermark用于表示,timestamp小于Watermark的数据,都已经到达了
watermark也可以理解成一种延迟触发机制。设置watermark的延迟时长为t,每次系统会校验已经到达的数据中最大的maxEventTime,然后认定eventTime小于maxEventTime - t的所有数据都已经到达,如果有窗口的停止时间等于maxEventTime – t,那么这个窗口被触发执行
举例:我们设置的允许最大延迟到达时间为2s,所以时间戳为7s的事件对应的Watermark是5s,时间戳为12s的事件的Watermark是10s,如果我们的窗口1是1s~5s,窗口2是6s~10s,那么时间戳为7s的事件到达时的Watermarker恰好触发窗口1,时间戳为12s的事件到达时的Watermark恰好触发窗口2。也就是说,数据的时间戳不断变化,不断递增,当增大到一个【时间戳-t】恰好等于窗口时间,则窗口被触发关闭。
只要没有达到水位那么不管现实中的时间推进了多久都不会触发关窗
原理和特点
是一条特殊的数据记录,本身会有时间戳
watermark必须单调递增,确保事件时间时钟向前推进,而不是后退
watermark与数据的时间戳有关,即延迟
如何传递呢
如果是顺序的单个任务很好传递
并行任务,上游多个子任务,下游多个子任务。那么这些子任务如何更新和分配watermark呢
上游往下游传递时,直接广播,如上游任务watermark是4,直接广播给下游,
表明4以前的数据都已接收处理完毕,你们下游不会再接收到4以前的数据了。
那么问题来了,上游有多个并行子任务(分区),每个分区都会有自己的watermark,这个广播给下游的4是如何来决策的呢?
我们举例来说明这个问题。假设现在有依次是 A到B到C的任务. B作为C的上游,会有多个分区,每个分区有一个WM,会把这些分区中最小的WM传递给C。当A的WM广播给B时,B用A的WM去更新其多分区WM中最小的值,并再更新完毕之后取其中WM最小的值发送给C。以此不断推移
表明4以前的数据都已接收处理完毕,你们下游不会再接收到4以前的数据了。
那么问题来了,上游有多个并行子任务(分区),每个分区都会有自己的watermark,这个广播给下游的4是如何来决策的呢?
我们举例来说明这个问题。假设现在有依次是 A到B到C的任务. B作为C的上游,会有多个分区,每个分区有一个WM,会把这些分区中最小的WM传递给C。当A的WM广播给B时,B用A的WM去更新其多分区WM中最小的值,并再更新完毕之后取其中WM最小的值发送给C。以此不断推移
Watermark的引入
1.assignAscendingTimestamps(升序数据,也不需要引入)
val stream: DataStream[SensorReading] = ...
val withTimestampsAndWatermarks = stream
.assignAscendingTimestamps(e => e.timestamp)
>> result: E(1), W(1), E(2), W(2), ...
val withTimestampsAndWatermarks = stream
.assignAscendingTimestamps(e => e.timestamp)
>> result: E(1), W(1), E(2), W(2), ...
2.assignTimestampsAndWatermarks(乱序数据)
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 从调用时刻开始给env创建的每一个stream追加时间特性
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val readings: DataStream[SensorReading] = env
.addSource(new SensorSource)
.assignTimestampsAndWatermarks(new MyAssigner())
MyAssigner有两种类型
AssignerWithPeriodicWatermarks
AssignerWithPunctuatedWatermarks
以上两个接口都继承自TimestampAssigner。
// 从调用时刻开始给env创建的每一个stream追加时间特性
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val readings: DataStream[SensorReading] = env
.addSource(new SensorSource)
.assignTimestampsAndWatermarks(new MyAssigner())
MyAssigner有两种类型
AssignerWithPeriodicWatermarks
AssignerWithPunctuatedWatermarks
以上两个接口都继承自TimestampAssigner。
1.AssignerWithPunctuatedWatermarks
这个flink没有封装好的类,只能自己去实现。
间断式地生成,根据数据的规则
这个flink没有封装好的类,只能自己去实现。
间断式地生成,根据数据的规则
和周期性生成的方式不同,这种方式不是固定时间的,而是可以根据需要对每条数据进行筛选和处理
class PunctuatedAssigner extends AssignerWithPunctuatedWatermarks[SensorReading] {
val bound: Long = 60 * 1000
override def checkAndGetNextWatermark(r: SensorReading, extractedTS: Long): Watermark = {
if (r.id == "sensor_1") {
new Watermark(extractedTS - bound)
} else {
null
}
}
override def extractTimestamp(r: SensorReading, previousTS: Long): Long = {
r.timestamp
}
}
val bound: Long = 60 * 1000
override def checkAndGetNextWatermark(r: SensorReading, extractedTS: Long): Watermark = {
if (r.id == "sensor_1") {
new Watermark(extractedTS - bound)
} else {
null
}
}
override def extractTimestamp(r: SensorReading, previousTS: Long): Long = {
r.timestamp
}
}
2.AssignerWithPeriodicWatermarks(常用)
(周期性的生成wm,如每隔1s)
(周期性的生成wm,如每隔1s)
1.BoundedOutOfOrdernessTimestampExtractor
flink已封装好一个常用的周期性生成WM的类,该类继承至AssignerWithPeriodicWatermarks。
规则是评估一个最大乱序时间即可
flink已封装好一个常用的周期性生成WM的类,该类继承至AssignerWithPeriodicWatermarks。
规则是评估一个最大乱序时间即可
val stream: DataStream[SensorReading] = ...
val withTimestampsAndWatermarks = stream.assignTimestampsAndWatermarks(
new SensorTimeAssigner
)
class SensorTimeAssigner extends BoundedOutOfOrdernessTimestampExtractor[SensorReading](Time.seconds(5)) {
// 抽取时间戳
override def extractTimestamp(r: SensorReading): Long = r.timestamp
}
>> relust: E(10), W(0), E(8), E(7), E(11), W(1), ...
val withTimestampsAndWatermarks = stream.assignTimestampsAndWatermarks(
new SensorTimeAssigner
)
class SensorTimeAssigner extends BoundedOutOfOrdernessTimestampExtractor[SensorReading](Time.seconds(5)) {
// 抽取时间戳
override def extractTimestamp(r: SensorReading): Long = r.timestamp
}
>> relust: E(10), W(0), E(8), E(7), E(11), W(1), ...
Event Time的使用一定要指定数据源中的时间戳。否则程序无法知道事件的事件时间是什么(数据源里的数据没有时间戳的话,就只能使用Processing Time了)。
2.自定义
如果不想用BoundedOutOfOrdernessTimesTampExtractor,则可以自定义实现
AssignerWithPeriodicWatermarks
如果不想用BoundedOutOfOrdernessTimesTampExtractor,则可以自定义实现
AssignerWithPeriodicWatermarks
系统会周期性的将watermark插入到流中(水位线也是一种特殊的事件!)。默认周期是200毫秒。可以使用ExecutionConfig.setAutoWatermarkInterval()方法进行设置。
适用比较密集,周期性的生成。为什么不每条数据都产生一个watermark?因为大数据量性能问题
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
// 每隔5秒产生一个watermark
env.getConfig.setAutoWatermarkInterval(5000)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
// 每隔5秒产生一个watermark
env.getConfig.setAutoWatermarkInterval(5000)
产生watermark的逻辑:每隔5秒钟,Flink会调用AssignerWithPeriodicWatermarks的getCurrentWatermark()方法。如果方法返回一个时间戳大于之前水位的时间戳,新的watermark会被插入到流中。这个检查保证了水位线是单调递增的。如果方法返回的时间戳小于等于之前水位的时间戳,则不会产生新的watermark
class PeriodicAssigner extends AssignerWithPeriodicWatermarks[SensorReading] {
val bound: Long = 60 * 1000 // 延时为1分钟
var maxTs: Long = Long.MinValue // 观察到的最大时间戳
override def getCurrentWatermark: Watermark = {
new Watermark(maxTs - bound)
}
override def extractTimestamp(r: SensorReading, previousTS: Long) = {
maxTs = maxTs.max(r.timestamp)
r.timestamp
}
}
val bound: Long = 60 * 1000 // 延时为1分钟
var maxTs: Long = Long.MinValue // 观察到的最大时间戳
override def getCurrentWatermark: Watermark = {
new Watermark(maxTs - bound)
}
override def extractTimestamp(r: SensorReading, previousTS: Long) = {
maxTs = maxTs.max(r.timestamp)
r.timestamp
}
}
EvnetTime在window中的使用
滚动窗口(TumblingEventTimeWindows)
def main(args: Array[String]): Unit = {
// 环境
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1)
val dstream: DataStream[String] = env.socketTextStream("localhost",7777)
val textWithTsDstream: DataStream[(String, Long, Int)] = dstream.map { text =>
val arr: Array[String] = text.split(" ")
(arr(0), arr(1).toLong, 1)
}
val textWithEventTimeDstream: DataStream[(String, Long, Int)] = textWithTsDstream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[(String, Long, Int)](Time.milliseconds(1000)) {
override def extractTimestamp(element: (String, Long, Int)): Long = {
return element._2
}
})
val textKeyStream: KeyedStream[(String, Long, Int), Tuple] = textWithEventTimeDstream.keyBy(0)
textKeyStream.print("textkey:")
val windowStream: WindowedStream[(String, Long, Int), Tuple, TimeWindow] = textKeyStream.window(TumblingEventTimeWindows.of(Time.seconds(2)))
val groupDstream: DataStream[mutable.HashSet[Long]] = windowStream.fold(new mutable.HashSet[Long]()) { case (set, (key, ts, count)) =>
set += ts
}
groupDstream.print("window::::").setParallelism(1)
env.execute()
}
}
// 环境
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1)
val dstream: DataStream[String] = env.socketTextStream("localhost",7777)
val textWithTsDstream: DataStream[(String, Long, Int)] = dstream.map { text =>
val arr: Array[String] = text.split(" ")
(arr(0), arr(1).toLong, 1)
}
val textWithEventTimeDstream: DataStream[(String, Long, Int)] = textWithTsDstream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[(String, Long, Int)](Time.milliseconds(1000)) {
override def extractTimestamp(element: (String, Long, Int)): Long = {
return element._2
}
})
val textKeyStream: KeyedStream[(String, Long, Int), Tuple] = textWithEventTimeDstream.keyBy(0)
textKeyStream.print("textkey:")
val windowStream: WindowedStream[(String, Long, Int), Tuple, TimeWindow] = textKeyStream.window(TumblingEventTimeWindows.of(Time.seconds(2)))
val groupDstream: DataStream[mutable.HashSet[Long]] = windowStream.fold(new mutable.HashSet[Long]()) { case (set, (key, ts, count)) =>
set += ts
}
groupDstream.print("window::::").setParallelism(1)
env.execute()
}
}
滑动窗口(SlidingEventTimeWindows)
def main(args: Array[String]): Unit = {
// 环境
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1)
val dstream: DataStream[String] = env.socketTextStream("localhost",7777)
val textWithTsDstream: DataStream[(String, Long, Int)] = dstream.map { text =>
val arr: Array[String] = text.split(" ")
(arr(0), arr(1).toLong, 1)
}
val textWithEventTimeDstream: DataStream[(String, Long, Int)] = textWithTsDstream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[(String, Long, Int)](Time.milliseconds(1000)) {
override def extractTimestamp(element: (String, Long, Int)): Long = {
return element._2
}
})
val textKeyStream: KeyedStream[(String, Long, Int), Tuple] = textWithEventTimeDstream.keyBy(0)
textKeyStream.print("textkey:")
val windowStream: WindowedStream[(String, Long, Int), Tuple, TimeWindow] = textKeyStream.window(SlidingEventTimeWindows.of(Time.seconds(2),Time.milliseconds(500)))
val groupDstream: DataStream[mutable.HashSet[Long]] = windowStream.fold(new mutable.HashSet[Long]()) { case (set, (key, ts, count)) =>
set += ts
}
groupDstream.print("window::::").setParallelism(1)
env.execute()
}
// 环境
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1)
val dstream: DataStream[String] = env.socketTextStream("localhost",7777)
val textWithTsDstream: DataStream[(String, Long, Int)] = dstream.map { text =>
val arr: Array[String] = text.split(" ")
(arr(0), arr(1).toLong, 1)
}
val textWithEventTimeDstream: DataStream[(String, Long, Int)] = textWithTsDstream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[(String, Long, Int)](Time.milliseconds(1000)) {
override def extractTimestamp(element: (String, Long, Int)): Long = {
return element._2
}
})
val textKeyStream: KeyedStream[(String, Long, Int), Tuple] = textWithEventTimeDstream.keyBy(0)
textKeyStream.print("textkey:")
val windowStream: WindowedStream[(String, Long, Int), Tuple, TimeWindow] = textKeyStream.window(SlidingEventTimeWindows.of(Time.seconds(2),Time.milliseconds(500)))
val groupDstream: DataStream[mutable.HashSet[Long]] = windowStream.fold(new mutable.HashSet[Long]()) { case (set, (key, ts, count)) =>
set += ts
}
groupDstream.print("window::::").setParallelism(1)
env.execute()
}
会话窗口(EventTimeSessionWindows)
def main(args: Array[String]): Unit = {
// 环境
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1)
val dstream: DataStream[String] = env.socketTextStream("localhost",7777)
val textWithTsDstream: DataStream[(String, Long, Int)] = dstream.map { text =>
val arr: Array[String] = text.split(" ")
(arr(0), arr(1).toLong, 1)
}
val textWithEventTimeDstream: DataStream[(String, Long, Int)] = textWithTsDstream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[(String, Long, Int)](Time.milliseconds(1000)) {
override def extractTimestamp(element: (String, Long, Int)): Long = {
return element._2
}
})
val textKeyStream: KeyedStream[(String, Long, Int), Tuple] = textWithEventTimeDstream.keyBy(0)
textKeyStream.print("textkey:")
val windowStream: WindowedStream[(String, Long, Int), Tuple, TimeWindow] = textKeyStream.window(EventTimeSessionWindows.withGap(Time.milliseconds(500)) )
windowStream.reduce((text1,text2)=>
( text1._1,0L,text1._3+text2._3)
) .map(_._3).print("windows:::").setParallelism(1)
env.execute()
}
// 环境
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1)
val dstream: DataStream[String] = env.socketTextStream("localhost",7777)
val textWithTsDstream: DataStream[(String, Long, Int)] = dstream.map { text =>
val arr: Array[String] = text.split(" ")
(arr(0), arr(1).toLong, 1)
}
val textWithEventTimeDstream: DataStream[(String, Long, Int)] = textWithTsDstream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[(String, Long, Int)](Time.milliseconds(1000)) {
override def extractTimestamp(element: (String, Long, Int)): Long = {
return element._2
}
})
val textKeyStream: KeyedStream[(String, Long, Int), Tuple] = textWithEventTimeDstream.keyBy(0)
textKeyStream.print("textkey:")
val windowStream: WindowedStream[(String, Long, Int), Tuple, TimeWindow] = textKeyStream.window(EventTimeSessionWindows.withGap(Time.milliseconds(500)) )
windowStream.reduce((text1,text2)=>
( text1._1,0L,text1._3+text2._3)
) .map(_._3).print("windows:::").setParallelism(1)
env.execute()
}
ProcessFunction API(底层API)
转换算子无法访问时间戳信息和水位线信息
Low-Level转换算子
可以访问时间戳
可以访问watermark
可以注册定时事件
可以输出特定的一些事件,如超时事件
8个Process Function
- ProcessFunction
• KeyedProcessFunction
用来操作KeyedStream
处理流的每一个元素,输出为0个、1个或者多个元素。所有的Process Function都继承自RichFunction接口
都有open()、close()和getRuntimeContext()等方法
额外提供两个方法
• processElement(v: IN, ctx: Context, out: Collector[OUT])
流中的每一个元素都会调用这个方法
调用结果将会放在Collector数据类型中输出
Context可以访问元素的时间戳,元素的key,以及TimerService时间服务
Context还可以将结果输出到别的流(side outputs)
• onTimer(timestamp: Long, ctx: OnTimerContext, out: Collector[OUT])
当之前注册的定时器触发时调用
参数timestamp为定时器所设定的触发的时间戳。Collector为输出结果的集合
OnTimerContext和processElement的Context参数一样,提供了上下文的一些信息,例如定时器触发的时间信息(事件时间或者处理时间)
• CoProcessFunction
• ProcessJoinFunction
• BroadcastProcessFunction
• KeyedBroadcastProcessFunction
• ProcessWindowFunction
• ProcessAllWindowFunction
TimerService 和 定时器(Timers)
Context和OnTimerContext所持有的TimerService对象拥有以下方法
• currentProcessingTime(): Long 返回当前处理时间
• currentWatermark(): Long 返回当前watermark的时间戳
• registerProcessingTimeTimer(timestamp: Long): Unit 会注册当前key的processing time的定时器。
当processing time到达定时时间时,触发timer。
当processing time到达定时时间时,触发timer。
• registerEventTimeTimer(timestamp: Long): Unit 会注册当前key的event time 定时器。
当水位线大于等于定时器注册的时间时,触发定时器执行回调函数。
当水位线大于等于定时器注册的时间时,触发定时器执行回调函数。
• deleteProcessingTimeTimer(timestamp: Long): Unit 删除之前注册处理时间定时器。如果没有这个时间戳的定时器,则不执行。
• deleteEventTimeTimer(timestamp: Long): Unit 删除之前注册的事件时间定时器,如果没有此时间戳的定时器,则不执行。
当定时器timer触发时,会执行回调函数onTimer()。注意定时器timer只能在keyed streams上面使用。
举例
侧输出流(SideOutPut)
大部分的datastream api是单一输出,是某种数据类型的流
split可以一条流拆分多流,但流的数据类型相同
process fucntion的sideoutput可产生多流,并且每个流的数据类型可以不一样
一个side output可以定义为OutputTag[X]对象
X是输出流的数据类型。process function可以通过Context对象发射一个事件到一个或者多个side outputs。
val monitoredReadings: DataStream[SensorReading] = readings
.process(new FreezingMonitor)
monitoredReadings
.getSideOutput(new OutputTag[String]("freezing-alarms"))
.print()
readings.print()
.process(new FreezingMonitor)
monitoredReadings
.getSideOutput(new OutputTag[String]("freezing-alarms"))
.print()
readings.print()
CoprocessFunction
两条输入流,提供了操作每个输入流的方法:processElement1和processElement2
状态编程和容错机制
无状态
无状态的计算观察每个独立事件,并根据最后一个事件输出结果。例如,流处理应用程序从传感器接收温度读数,并在温度超过90度时发出警
有状态
有状态的计算则会基于多个事件输出结果
所有类型的窗口。例如,计算过去一小时的平均温度,就是有状态的计算
所有用于复杂事件处理的状态机。例如,若在一分钟内收到两个相差20度以上的温度读数,则发出警告,这是有状态的计
流与流之间的所有关联操作,以及流与静态表或动态表之间的关联操作,都是有状态的计算
可以认为状态就是一个本地变量,可被任务的业务逻辑访问
为了使运行时的Flink了解算子的状态,算子需要预先注册其状态
两种类型的状态
算子状态(operator state)
作用范围限定为算子任务,
由同一并行任务所处理的所有数据都可以访问到相同的状态
状态对于同一任务而言是共享的
算子状态不能由相同或不同算子的另一个任务访问
三种基本数据结构
- 列表状态(List state)
- 联合列表状态(Union list state)
- 广播状态(Broadcast state)
键控状态(keyed state)
根据输入数据流中定义的键(key)来维护和访问的,不同的key有不同的状态
Flink为每个键值维护一个状态实例,并将具有相同键的所有数据,都分区到同一个算子任务中,这个任务会维护和处理这个key对应的状态
当任务处理一条数据时,它会自动将状态的访问范围限定为当前数据的key,具有相同key的所有数据都会访问相同的状态
Keyed State很类似于一个分布式的key-value map数据结构,只能用于KeyedStream(keyBy算子处理之后)
N种数据类型
o set操作: ValueState.update(value: T)
o ListState.addAll(values: java.util.List[T])
o ListState.get()返回Iterable[T]
o ListState.update(values: java.util.List[T])
o MapState.put(key: K, value: V)
o MapState.contains(key: K)
o MapState.remove(key: K)
- ValueState[T]保存单个的值,值的类型为T。
o set操作: ValueState.update(value: T)
- ListState[T]保存一个列表,列表里的元素的数据类型为T。基本操作如下:
o ListState.addAll(values: java.util.List[T])
o ListState.get()返回Iterable[T]
o ListState.update(values: java.util.List[T])
- MapState[K, V]保存Key-Value对。
o MapState.put(key: K, value: V)
o MapState.contains(key: K)
o MapState.remove(key: K)
- ReducingState[T]
- AggregatingState[I, O]
举例
状态编程
RichFlatMapFunction
flatMapState
状态一致性
引入状态,自然引入一致性
假设要对最近一小时登录的用户计数。在系统经历故障之后,计数结果是多少?如果有偏差,是有漏掉的计数还是重复计数?
一致性级别
at-most-once: 这其实是没有正确性保障的委婉说法——故障发生之后,计数结果可能丢失。既不恢复丢失的状态,也不重播丢失的数据。语义含义是最多处理一次时间。
举例:视频流丢一帧没关系,如TCP握手连接,目标是侦测网络连接
举例:视频流丢一帧没关系,如TCP握手连接,目标是侦测网络连接
at-least-once: 也就是说,计数程序在发生故障后可能多算,但是绝不会少算,这表示计数结果可能大于正确值,但绝不会小于正确值
举例:1)统计当前数据出现过没有,出现了就true,没出现就false,多算没关系只要出现即可
举例:1)统计当前数据出现过没有,出现了就true,没出现就false,多算没关系只要出现即可
exactly-once: 这指的是系统保证在发生故障后得到的计数结果与正确值一致。没有事件丢失,只处理一次,恰好处理一次,内部状态也仅仅更新一次
storm和spark streaming保证exactly-once:无法单独地对每条记录运用应用逻辑,而是同时处理多条(一批)记录,保证对每一批的处理要么全部成功,要么全部失败。这就导致在得到结果前,必须等待一批记录处理结束
Flink的一个重大价值在于,它既保证了exactly-once,也具有低延迟和高吞吐的处理能力。
端到端(end-to-end)状态一致性
目前我们看到的一致性保证都是由流处理器实现的,是在flink流处理器内部保证
除了流处理器以外还包含了数据源(kafka)和输出到持久化系统
端到端一致性保证,结果的正确性贯穿了整个流处理应用的始终,整个端到端的一致性级别取决于所有组件中一致性最弱的组件
各环节一致性划分
内部保证:依赖checkpoint
source端:外部源可重设数据的读取位置
sink端:保证从故障回复时,数据不会重复写入外部系统
幂等写入:
1.幂等操作,是说一个操作,可以重复执行很多次,但只导致一次结果更改,也就是说,后面再重复执行就不起作用了。
如求导,hashmap,写入hbase
1.幂等操作,是说一个操作,可以重复执行很多次,但只导致一次结果更改,也就是说,后面再重复执行就不起作用了。
如求导,hashmap,写入hbase
事务写入:
1.原子性,一系列操作要么全部成功,要么一个都不做。比如银行转账,要等到转入成功之后才可以转出
2.需要构建事务来写入外部系统,构建的事务对应着 checkpoint,等到 checkpoint 真正完成的时候,才把所有对应的结果写入 sink 系统中
1.原子性,一系列操作要么全部成功,要么一个都不做。比如银行转账,要等到转入成功之后才可以转出
2.需要构建事务来写入外部系统,构建的事务对应着 checkpoint,等到 checkpoint 真正完成的时候,才把所有对应的结果写入 sink 系统中
两种实现方式
预写日志(wal)
1.把结果数据当成状态保存,收到cp完成通知时,一次性写入sink
2.简单易于实现,耗时,不一定能达到效果,如一次性写入时失败怎么办? 完全重放会导致重复
3.DataStream API 提供了GenericWriteAheadSink模板类
1.把结果数据当成状态保存,收到cp完成通知时,一次性写入sink
2.简单易于实现,耗时,不一定能达到效果,如一次性写入时失败怎么办? 完全重放会导致重复
3.DataStream API 提供了GenericWriteAheadSink模板类
两阶段提交(2pc)
1.sink任务启动一个事务,将接下来的数据全部写入事务
2.写入外部sink时,只是预提交
3.收到cp通知时,才正式提交事务,实现真正写入
4.DataStream API 提供了TwoPhaseCommitSinkFunction 接口
1.sink任务启动一个事务,将接下来的数据全部写入事务
2.写入外部sink时,只是预提交
3.收到cp通知时,才正式提交事务,实现真正写入
4.DataStream API 提供了TwoPhaseCommitSinkFunction 接口
必须提供事务支持,sink任务必须能够模拟外部系统上的事务
提交事务必须是幂等操作
cp的间隔时间里,必须能够开启一个事务并接受数据写入
收到cp通知前,事务必须是等待提交状态
一致性检查点checkpoint
Flink的检查点算法
val stream: DataStream[(String, Int)] = ...
val counts: DataStream[(String, Int)] = stream
.keyBy(record => record._1)
.mapWithState( (in: (String, Int), state: Option[Int]) =>
state match {
case Some(c) => ( (in._1, c + in._2), Some(c + in._2) )
case None => ( (in._1, in._2), Some(in._2) )
})
val stream: DataStream[(String, Int)] = ...
val counts: DataStream[(String, Int)] = stream
.keyBy(record => record._1)
.mapWithState( (in: (String, Int), state: Option[Int]) =>
state match {
case Some(c) => ( (in._1, c + in._2), Some(c + in._2) )
case None => ( (in._1, in._2), Some(in._2) )
})
核心作用是确保状态正确,即使遇到程序中断,也要正确
概念
数项链珠子
在项链上每隔一段就松松地系上一根有色皮筋
当珠子被拨动的时候,皮筋也可以被拨动;
你安排一个助手,让他在你和朋友拨到皮筋时记录总数。用这种方法,当有人数错时,就不必从头开始数
你向其他人发出错误警示,然后你们都从上一根皮筋处开始重数,助手则会告诉每个人重数时的起始数值,例如在粉色皮筋处的数值是多少
类似于皮筋标记,关键点是: 对于指定的皮筋而言,珠子的相对位置是确定的。
这让皮筋成为重新计数的参考点。总状态(珠子的总数)在每颗珠子被拨动之后更新一次,助手则会保存与每根皮筋对应的检查点状态。如当遇到粉色皮筋时一共数了多少珠子,当遇到橙色皮筋时又是多少。当问题出现时,这种方法使得重新计数变得简单
这让皮筋成为重新计数的参考点。总状态(珠子的总数)在每颗珠子被拨动之后更新一次,助手则会保存与每根皮筋对应的检查点状态。如当遇到粉色皮筋时一共数了多少珠子,当遇到橙色皮筋时又是多少。当问题出现时,这种方法使得重新计数变得简单
检查点算法
(barrier对齐)
(barrier对齐)
简单的想法:暂停应用,保存状态到检查点
flink改进:chandy算法的分布式快照。检查点的保存和数据处理分离。
检查点分界线(checkpoint barrier),相当于数据流插入一条特殊数据。
检查点如何保存呢,是jobmanager发出指令
检查点如何保存呢,是jobmanager发出指令
checkpoint配置
flink+kafka
内部 —— 利用checkpoint机制,把状态存盘,发生故障的时候可以恢复,保证内部的状态一致性
source —— kafka consumer作为source,可以将偏移量保存下来,如果后续任务出现了故障,恢复的时候可以由连接器重置偏移量,重新消费数据,保证一致性
sink —— kafka producer作为sink,采用两阶段提交 sink,需要实现一个 TwoPhaseCommitSinkFunction
flink由jobmanager协调各个tm进行checkpoint的存储,保存在statebackend中,默认statebackend是内存
1.第一条数据来了之后,开启一个 kafka 的事务(transaction)
2.正常写入 kafka 分区日志但标记为未提交,这就是“预提交”
3.jobmanager 触发 checkpoint 操作,barrier 从 source 开始向下传递,遇到 barrier 的算子将状态存入状态后端,并通知 jobmanager
4.sink 连接器收到 barrier,保存当前状态,存入 checkpoint,通知 jobmanager,并开启下一阶段的事务,用于提交下个检查点的数据
5.jobmanager 收到所有任务的通知,发出确认信息,表示 checkpoint 完成
6.sink 任务收到 jobmanager 的确认信息,正式提交这段时间的数据
7.外部kafka关闭事务,提交的数据可以正常消费了。
2.正常写入 kafka 分区日志但标记为未提交,这就是“预提交”
3.jobmanager 触发 checkpoint 操作,barrier 从 source 开始向下传递,遇到 barrier 的算子将状态存入状态后端,并通知 jobmanager
4.sink 连接器收到 barrier,保存当前状态,存入 checkpoint,通知 jobmanager,并开启下一阶段的事务,用于提交下个检查点的数据
5.jobmanager 收到所有任务的通知,发出确认信息,表示 checkpoint 完成
6.sink 任务收到 jobmanager 的确认信息,正式提交这段时间的数据
7.外部kafka关闭事务,提交的数据可以正常消费了。
3种状态后端
• MemoryStateBackend
内存级的状态后端,会将键控状态作为内存中的对象进行管理,将它们存储在TaskManager的JVM堆上;
而将checkpoint存储在JobManager的内存中。
内存级的状态后端,会将键控状态作为内存中的对象进行管理,将它们存储在TaskManager的JVM堆上;
而将checkpoint存储在JobManager的内存中。
特点:快速、低延迟、但是不稳定。一般生产环境不用
• FsStateBackend
将checkpoint存到远程的持久化文件系统(FileSystem)上。
而对于本地状态,跟MemoryStateBackend一样,也会存在TaskManager的JVM堆上。
将checkpoint存到远程的持久化文件系统(FileSystem)上。
而对于本地状态,跟MemoryStateBackend一样,也会存在TaskManager的JVM堆上。
val env = StreamExecutionEnvironment.getExecutionEnvironment
val checkpointPath: String = ???
val backend = new RocksDBStateBackend(checkpointPath)
env.setStateBackend(backend)
env.setStateBackend(new FsStateBackend("file:///tmp/checkpoints"))
env.enableCheckpointing(1000)
// 配置重启策略
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(60, Time.of(10, TimeUnit.SECONDS)))
val checkpointPath: String = ???
val backend = new RocksDBStateBackend(checkpointPath)
env.setStateBackend(backend)
env.setStateBackend(new FsStateBackend("file:///tmp/checkpoints"))
env.enableCheckpointing(1000)
// 配置重启策略
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(60, Time.of(10, TimeUnit.SECONDS)))
特点:内存级别的访问速度,一定的容错性。但是当本地的状态需要特别大时,本地TM的JVM堆无法存储这么大的内存
• RocksDBStateBackend
将所有状态序列化后,存入本地的RocksDB中存储。
将所有状态序列化后,存入本地的RocksDB中存储。
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-rocksdb_2.11</artifactId>
<version>1.10.0</version>
</dependency>
特点:当做内嵌KV的数据库使用,可落盘,就不存在内存的限制,不会出现OOM.
但是序列化和反序列化非常耗时
但是序列化和反序列化非常耗时
保存点(Savepoints)
自定义的镜像保存功能
算法与checkpoints完全相同,可以认为就是具有一些额外元数据的检查点
与checkpoints不同,不会自动创建保存点,需要明确触发创建操作
优势
有计划的手动备份:时间点,什么状态,什么功能保存
版本迁移:如flink1.9迁移到1.20
暂停和重启应用:比如不重要的应用给重要应用让路资源时
更新应用程序
Table API与SQL
pom依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.11</artifactId>
<version>1.10.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala-bridge_2.11</artifactId>
<version>1.10.0</version>
</dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.11</artifactId>
<version>1.10.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala-bridge_2.11</artifactId>
<version>1.10.0</version>
</dependency>
简单举例
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val inputStream = env.readTextFile("..\\sensor.txt")
val dataStream = inputStream
.map( data => {
val dataArray = data.split(",")
SensorReading(dataArray(0).trim, dataArray(1).trim.toLong, dataArray(2).trim.toDouble)
}
)
// 基于env创建 tableEnv
val settings: EnvironmentSettings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build()
val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env, settings)
// 从一条流创建一张表
val dataTable: Table = tableEnv.fromDataStream(dataStream)
// 从表里选取特定的数据
val selectedTable: Table = dataTable.select('id, 'temperature)
.filter("id = 'sensor_1'")
//直接写sql
tableEnv.createTemporaryView("dataTable",dataTable)
val sql ="select id from dataTable where id = 'sensor_1' "
val resultSqlTable = tableEnv.sqlQuery(sql)
resultSqlTable.toAppendStream[(String,Double)].print("result")
val selectedStream: DataStream[(String, Double)] = selectedTable
.toAppendStream[(String, Double)]
selectedStream.print()
env.execute("table test")
}
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val inputStream = env.readTextFile("..\\sensor.txt")
val dataStream = inputStream
.map( data => {
val dataArray = data.split(",")
SensorReading(dataArray(0).trim, dataArray(1).trim.toLong, dataArray(2).trim.toDouble)
}
)
// 基于env创建 tableEnv
val settings: EnvironmentSettings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build()
val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env, settings)
// 从一条流创建一张表
val dataTable: Table = tableEnv.fromDataStream(dataStream)
// 从表里选取特定的数据
val selectedTable: Table = dataTable.select('id, 'temperature)
.filter("id = 'sensor_1'")
//直接写sql
tableEnv.createTemporaryView("dataTable",dataTable)
val sql ="select id from dataTable where id = 'sensor_1' "
val resultSqlTable = tableEnv.sqlQuery(sql)
resultSqlTable.toAppendStream[(String,Double)].print("result")
val selectedStream: DataStream[(String, Double)] = selectedTable
.toAppendStream[(String, Double)]
selectedStream.print()
env.execute("table test")
}
基本程序结构
//创建
val tableEnv = ...
val tableEnv = ...
//创建一张读表
tableEnv.connect(...).createTemporaryTable("inputTable")
tableEnv.connect(...).createTemporaryTable("inputTable")
//注册一张输出表
tableEnv.connect(...)..createTemporaryTable("outputTable")
tableEnv.connect(...)..createTemporaryTable("outputTable")
//通过table api查询算子,得到一张结果表
val result = tableEnv.from("inputTable").select(...)
val result = tableEnv.from("inputTable").select(...)
//通过sql查询语句,得到一张表
val sqlResult = tableEnv.sqlQuery("select .. from ..")
val sqlResult = tableEnv.sqlQuery("select .. from ..")
//将结果写入输出表
sqlResult.insertInto("outputTable")
sqlResult.insertInto("outputTable")
执行环境
基于老版本planner流处理
val settings = EnvrionmentSettings.newInstance()
.useOldPlanner()
.inStreamingMode()
.build()
val oldStreamTableEnv = StreamTableEnvironment.create(env,settings)
.useOldPlanner()
.inStreamingMode()
.build()
val oldStreamTableEnv = StreamTableEnvironment.create(env,settings)
基于老版本planner批处理
val batchEnv = ExecutionEnvironment.getExecutionEnvironment
val oldBatchTableEnv =BatchTableEnvironment.create(batchEnv)
val oldBatchTableEnv =BatchTableEnvironment.create(batchEnv)
基于新版本blink流处理
val settings = EnvrionmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build()
val blinkStreamTableEnv = StreamTableEnvironment.create(env,settings)
.useBlinkPlanner()
.inStreamingMode()
.build()
val blinkStreamTableEnv = StreamTableEnvironment.create(env,settings)
基于新版本blink批处理
val settings = EnvrionmentSettings.newInstance()
.useBlinkPlanner()
.inBatchMode()
.build()
val blinkBatchTableEnv = TableEnvironment.create(settings)
.useBlinkPlanner()
.inBatchMode()
.build()
val blinkBatchTableEnv = TableEnvironment.create(settings)
表的概念
TableEnvironment可以注册目录Catalog,并基于Catalog注册表
表由一个identifier来制定
Catalog名
数据库
对象名
创建表
调用.connect()方法,连接外部系统
tableEnv.connect()
.withFormat(..)
.withSchema(...)
.createTemporaryTbale(..)
.withFormat(..)
.withSchema(...)
.createTemporaryTbale(..)
读取文件
val filePath="/../../file"
tableEnv.connect(new FileSystem().path(filepath))
.withFormat(new OldCsv())
//OldCsv弃用,引入flink-csv包,切换成新的csv接口
.withSchema(new Schema()
.field("id",DataTypes.STRING))
.createTemporaryTable("inputTable")
val inputTable:Table = tableEnv.from("inputTable")
inputTable.toAppendStream[(String)].print()
env.execute("xx")
tableEnv.connect(new FileSystem().path(filepath))
.withFormat(new OldCsv())
//OldCsv弃用,引入flink-csv包,切换成新的csv接口
.withSchema(new Schema()
.field("id",DataTypes.STRING))
.createTemporaryTable("inputTable")
val inputTable:Table = tableEnv.from("inputTable")
inputTable.toAppendStream[(String)].print()
env.execute("xx")
读取kafka
tableEnv.connect(new Kafka()
.version("0.11")
.topic("sensor")
.property("zk.connect","")
.property("btstrp.servers",""))
.withFormat()
.withSchema()
.createTemporaryTable("kafkaInputTable")
val inputTable :Table = tableEnv.from("kafkaInputTable")
inputTable.toAppendStream[("String")].print()
env.execute("kafka test")
.version("0.11")
.topic("sensor")
.property("zk.connect","")
.property("btstrp.servers",""))
.withFormat()
.withSchema()
.createTemporaryTable("kafkaInputTable")
val inputTable :Table = tableEnv.from("kafkaInputTable")
inputTable.toAppendStream[("String")].print()
env.execute("kafka test")
查询转换
DataStream转换成表
数据类型与schema的对应
1.基于名称(name-based)-都可改名
val s = tableEnv.fromDataStream(dataStream,'timestamp as 'ts,'id as 'myId,'temporary)
2.基于位置(position-based)-都可改名
val s = tableEnv.fromDataStream(dataStream,'abc,'cdf)
表转换成DataStream
需要制定数据类型,要将每一行转换成数据类型
表作为流式查询结果,是动态更新
转换有两种模式:追加(Append)撤回(Retract)
创建临时视图
createTemporaryView
表的输出
tableEnv.connect()
.createTemporaryTable("outputTable")
val rt:Table = ..
rt.insertInto("outtable")
.createTemporaryTable("outputTable")
val rt:Table = ..
rt.insertInto("outtable")
输出到文件
输出到hive
输出到kafka
输出到es
upsert模式
输出到mysql
table.toAppendStream
table.toRetractStream
聚合,有状态数据,类似拉链,对过期的数据作废。
key不断加和,过期的值打个false的标记
更新模式
追加(Append)模式
1.只做插入操作
1.只做插入操作
撤回(Retract)模式
1.添加add和撤回(retract)消息
2.插入、删除、更新
1.添加add和撤回(retract)消息
2.插入、删除、更新
更新插入(Upsert)
查看执行计划
TableEnvironment.explain()方法,返回一个字符串,描述三个计划
1.优化的逻辑查询计划
2.优化后的逻辑查询计划
3.实际执行的计划
val explain:String = tableEnv.explain(rstable)
println(explain)
1.优化的逻辑查询计划
2.优化后的逻辑查询计划
3.实际执行的计划
val explain:String = tableEnv.explain(rstable)
println(explain)
动态表
如果流中的数据类型是case class可以直接根据case class的结构生成table
tableEnv.fromDataStream(dataStream)
或者根据字段顺序单独命名
tableEnv.fromDataStream(dataStream,’id,’timestamp .......)
最后的动态表可以转换为流进行输出
table.toAppendStream[(String,String)]
tableEnv.fromDataStream(dataStream)
或者根据字段顺序单独命名
tableEnv.fromDataStream(dataStream,’id,’timestamp .......)
最后的动态表可以转换为流进行输出
table.toAppendStream[(String,String)]
动态表随时间变化
持续查询
永不终止,并会生成另一个动态表
字段
用一个单引放到字段前面来标识字段名, 如 ‘name , ‘id ,’amount
tableapi窗口聚合操作
// 统计每10秒中每个传感器温度值的个数
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val inputStream = env.readTextFile("..\\sensor.txt")
val dataStream = inputStream
.map( data => {
val dataArray = data.split(",")
SensorReading(dataArray(0).trim, dataArray(1).trim.toLong, dataArray(2).trim.toDouble)
}
)
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[SensorReading](Time.seconds(1)) {
override def extractTimestamp(element: SensorReading): Long = element.timestamp * 1000L
})
// 基于env创建 tableEnv
val settings: EnvironmentSettings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build()
val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env, settings)
// 从一条流创建一张表,按照字段去定义,并指定事件时间的时间字段
val dataTable: Table = tableEnv.fromDataStream(dataStream, 'id, 'temperature, 'ts.rowtime)
// 按照时间开窗聚合统计
val resultTable: Table = dataTable
.window( Tumble over 10.seconds on 'ts as 'tw )
.groupBy('id, 'tw)
.select('id, 'id.count)
val selectedStream: DataStream[(Boolean, (String, Long))] = resultTable
.toRetractStream[(String, Long)]
selectedStream.print()
env.execute("table window test")
}
GROUP BY
如果了使用 groupby,table转换为流的时候只能用toRetractDstream
val dataStream: DataStream[(Boolean, (String, Long))] = table
.toRetractStream[(String,Long)]
2. toRetractDstream 得到的第一个boolean型字段标识 true就是最新的数据(Insert),false表示过期老数据(Delete)
val dataStream: DataStream[(Boolean, (String, Long))] = table
.toRetractStream[(String,Long)]
dataStream.filter(_._1).print()
3. 如果使用的api包括时间窗口,那么窗口的字段必须出现在groupBy中。
val resultTable: Table = dataTable
.window( Tumble over 10.seconds on 'ts as 'tw )
.groupBy('id, 'tw)
.select('id, 'id.count)
函数
内置函数
比较函数
sql
v1 = v2
v1 > v2
v1 > v2
tableapi
any1 === any2
any1 > any2
any1 > any2
子主题
逻辑函数
sql
boolean1 or boolean2
boolean1 IS FALSE
NOT boolean
boolean1 IS FALSE
NOT boolean
tableapi
||
.isFalse
!BOOLEAN
.isFalse
!BOOLEAN
算数函数
sql
+
power(n1,n2)
power(n1,n2)
tableapi
+
n1.power(n2)
n1.power(n2)
其他函数
用户自定义函数(UDF)
标量函数(Scalar Functions)
一对一
一对一
表函数(Table Functions)
一对多
一对多
聚合函数(UDAF)
把一个表中数据,聚合成一个标量值
(多对一)
把一个表中数据,聚合成一个标量值
(多对一)
继承AggregationFunction
工作原理
首先,他需要一个累加器,保存聚合中间结果的数据结构。
可以createAccumulator()方法创建空累加器
可以createAccumulator()方法创建空累加器
随后,对每个输入行调用函数的accumulate()方法来更新累加器
处理完所有行,调用函数的getValue()方法计算并返回最终结果
必须实现三个方法
getValue()
accumulate()
createAccumulator()
举例
表聚合函数(TableAggreatieFunctions)
(多对多) top值
(多对多) top值
时间窗口
定义处理时间(Processing Time)-三种方法
1. DataStream转换成表时指定。
如果是processTime直接在创建动态表时进行追加就可以。
val dataTable: Table = tableEnv.fromDataStream(dataStream, 'id, 'temperature, 'ps.proctime)
如果是processTime直接在创建动态表时进行追加就可以。
val dataTable: Table = tableEnv.fromDataStream(dataStream, 'id, 'temperature, 'ps.proctime)
2.DDL定义
3.定义TableSchema时指定
定义处理时间(Processing Time)-三种方法
1. 如果是EventTime要在创建动态表时声明。a.可以追加指定 b.也可以直接用timestamp指定
val dataTable: Table = tableEnv.fromDataStream(dataStream, 'id, 'temperature, 'ts.rowtime)
val dataTable: Table = tableEnv.fromDataStream(dataStream, 'id, 'temperature, 'ts.rowtime)
2.DDL定义
3.定义 Tableschema时指定
窗口,主要两种
Group Windows(分组窗口)
根据时间或行计数间隔
根据时间或行计数间隔
Table API中使用Group Windows
滚动窗口可以使用Tumble over 10000.millis on 来表示
val resultTable: Table = dataTable
.window( Tumble over 10.seconds on 'ts as 'tw ) //定义窗口,别名为tw
.groupBy('id, 'tw) //按照字段和窗口tw分组
.select('id, 'id.count) //聚合
滚动窗口可以使用Tumble over 10000.millis on 来表示
val resultTable: Table = dataTable
.window( Tumble over 10.seconds on 'ts as 'tw ) //定义窗口,别名为tw
.groupBy('id, 'tw) //按照字段和窗口tw分组
.select('id, 'id.count) //聚合
滚动窗口
滑动窗口
会话窗口
SQL中使用Group Windows
Over Windows
针对每个输入行,计算相邻行范围内的聚合
针对每个输入行,计算相邻行范围内的聚合
TableAPI使用over window
无界over windows
子主题
有界over windows
sql使用over window
编写SQL
// 统计每10秒中每个传感器温度值的个数
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val inputStream = env.readTextFile("..\\sensor.txt")
val dataStream = inputStream
.map( data => {
val dataArray = data.split(",")
SensorReading(dataArray(0).trim, dataArray(1).trim.toLong, dataArray(2).trim.toDouble)
}
)
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[SensorReading](Time.seconds(1)) {
override def extractTimestamp(element: SensorReading): Long = element.timestamp * 1000L
})
// 基于env创建 tableEnv
val settings: EnvironmentSettings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build()
val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env, settings)
// 从一条流创建一张表,按照字段去定义,并指定事件时间的时间字段
val dataTable: Table = tableEnv.fromDataStream(dataStream, 'id, 'temperature, 'ts.rowtime)
// 直接写sql完成开窗统计
val resultSqlTable: Table = tableEnv.sqlQuery("select id, count(id) from "
+ dataTable + " group by id, tumble(ts, interval '15' second)")
val selectedStream: DataStream[(Boolean, (String, Long))] = resultSqlTable.toRetractStream[(String, Long)]
selectedStream.print()
env.execute("table window test")
}
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val inputStream = env.readTextFile("..\\sensor.txt")
val dataStream = inputStream
.map( data => {
val dataArray = data.split(",")
SensorReading(dataArray(0).trim, dataArray(1).trim.toLong, dataArray(2).trim.toDouble)
}
)
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[SensorReading](Time.seconds(1)) {
override def extractTimestamp(element: SensorReading): Long = element.timestamp * 1000L
})
// 基于env创建 tableEnv
val settings: EnvironmentSettings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build()
val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env, settings)
// 从一条流创建一张表,按照字段去定义,并指定事件时间的时间字段
val dataTable: Table = tableEnv.fromDataStream(dataStream, 'id, 'temperature, 'ts.rowtime)
// 直接写sql完成开窗统计
val resultSqlTable: Table = tableEnv.sqlQuery("select id, count(id) from "
+ dataTable + " group by id, tumble(ts, interval '15' second)")
val selectedStream: DataStream[(Boolean, (String, Long))] = resultSqlTable.toRetractStream[(String, Long)]
selectedStream.print()
env.execute("table window test")
}
Flink的监控和优化
metrics
metric类型
counter
对一个计数器进行累加,对于多条或多兆数据一直往上加的过程
Gauge
最简单的metrics,反映一个值。java heap内存用多少,可以实时暴露一个gauge
meter
Meter 是指统计吞吐量和单位时间内发生“事件”的次数。它相当于求一种速率,即事件次数除以使用的时间
Histogram
Histogram 用于统计一些数据的分布,比如说 Quantile、Mean、StdDev、Max、Min 等
metric group
Metric 在 Flink 内部有多层结构,以 Group 的方式组织,它并不是一个扁平化的结构,Metric Group + Metric Name 是 Metrics 的唯一标识
•TaskManagerMetricGroup
•TaskManagerJobMetricGroup
•TaskMetricGroup
•TaskIOMetricGroup
•OperatorMetricGroup
•${User-defined Group} / ${User-defined Metrics}
•OperatorIOMetricGroup
•JobManagerMetricGroup
•JobManagerJobMetricGroup
•TaskManagerJobMetricGroup
•TaskMetricGroup
•TaskIOMetricGroup
•OperatorMetricGroup
•${User-defined Group} / ${User-defined Metrics}
•OperatorIOMetricGroup
•JobManagerMetricGroup
•JobManagerJobMetricGroup
Table API & SQL
表api
SQL
统一流和批,很容易被人接受
依赖
表api: flink-table-planner_2.11
批处理查询:flink-tabl-api-java/scala-brige_2.11
流处理:flink-streaming-scala_2.11
kafka依赖:flink-connector-kafka
json依赖:flink-json
Stream SQL
获取运行环境(sEnv)
getExecutionEnvironment
获取表api运行环境(sTableEnv)
StreamTableEnvironment.create
读取数据源
数据转换
new MapFunction(String,Tuple2<String,String>)
DataStream转为table
sTableEnv.fromDataStream(ds,"field1,field2")
注册为一个表
registerTable
sqlQuery
Table table2 = sTableEnv.sqlQuery().select("fields")
表转换为流
toAppendStream(table2,String.Class)
执行
Batch SQL
获取运行环境
创建Batch环境
子主题
读取 数据返回DataSet
注册为一个表
tEnv.registerDataSet(name="user1",ds,fileds="")
sqlQuery
Table table= tEnv.sqlQuery("select * from user1")
表转为Dataset
toDataSet
Batch Table
Table table1 = tEnv.FromDataSet(ds2,fields:"uid,name")
Table table2 =table1.select("name")
tEnv.toDataSet(table2,String.class)
Table API & SQL流处理
动态表和连续查询
动态表会随时间变化
查询动态表生成的是连续查询
查询不断更新其结果表
流上定义表
连续查询
滚动窗口
表转流
流查询的结果表被动态更新,随着新纪录到达发生变化
两种模式
Append-only Mode
insert,且以前结果永不更新
insert,且以前结果永不更新
ds.flatMap(new FlatMapFunction<String,Tuple2<String,Integer>>)
流转为表
Table table sTableEnv.fromDataStream(ds2)
sTableEnv.registerTable(name="wc",table)
append mod
sTableEnv.toAppendStream(table,Row.class)
分组统计涉及到更新,如果用append则报错
Retract Mode
始终都可以使用此模式
返回true,代表插入
返回false,代表数据撤回
sTableEnv.toRetractStream(table,Row.class)
flink sql connector读取kafka
sTableEnv.connect(new Kafka().version()
.topci().startEaliest()
.property("group.id","group1")
.property("boostrap.servers",""))
.withFormat(new Json().failOnMissingField)
.withSchema(new Schema().field("user1",Types.Long).field().filed())
.inAppendMode.registerTableSource()
.topci().startEaliest()
.property("group.id","group1")
.property("boostrap.servers",""))
.withFormat(new Json().failOnMissingField)
.withSchema(new Schema().field("user1",Types.Long).field().filed())
.inAppendMode.registerTableSource()
查询转换为table
table转为流
追加模式
Table API
tEnv.scan
filter/groupBy/select
SQL操作
基于Apache Calcite
系统架构
组件栈
Deploy层
flink部署模式 Single JVM
Cluster
standalone\yarn
standalone\yarn
cloud
gce,ec2
gce,ec2
Runtime层
主要负责flink的核心实现,支持分布式stream处理,jobgraph到executiongraph的映射、调度
API层
DataStream API
DataSetAPI
Library层
CEP
EventProcessing
对复杂事件处理的库
对复杂事件处理的库
Table API & SQL
FlinkML机器学习\Gelly图计算
0 条评论
下一页