flink
2022-04-07 13:19:28 0 举报
AI智能生成
flink知识点总结
作者其他创作
大纲/内容
维度表关联
实时查询维表
查询小数据量的维表情况下才使用这种方式,并且要妥善处理连接外部系统的线程,一般还会用到线程池。
预加载全量数据
当我们的系统启动时,就将维度表数据全部加载到内存中,然后数据在内存中进行关联,不需要直接访问外部数据库。一旦维表数据发生更新,Flink 任务是无法感知,可以采取定时拉取维表数据
对计算节点的内存消耗很高,所以不能适用于数量很大的维度表
对计算节点的内存消耗很高,所以不能适用于数量很大的维度表
适用于那些实时场景不是很高,维表数据较小的场景
LRU 缓存
如果维表的数据比较大,无法一次性全部加载到内存中,可以使用LRU策略加载维表数据。
利用 Flink 的 RichAsyncFunction 读取 Hbase 的数据到缓存中,我们在关联维度表时先去查询缓存,如果缓存中不存在这条数据,就利用客户端去查询 Hbase,然后插入到缓存中
将维表消息广播出去
广播
实例
//1:初始化数据
DataSet<Integer> toBroadcast = env.fromElements(1, 2, 3)
DataSet<Integer> toBroadcast = env.fromElements(1, 2, 3)
//2:广播数据
.withBroadcastSet(toBroadcast, "broadcastSetName");
//3:获取数据
Collection<Integer> broadcastSet = getRuntimeContext().getBroadcastVariable("broadcastSetName");
Collection<Integer> broadcastSet = getRuntimeContext().getBroadcastVariable("broadcastSetName");
重分区
重分区算子用来对数据进行重新分区,可以用来解决数据倾斜问题
种类
Random Partitioning
dataStream.shuffle()
根据均匀分布随机分配元素,(类似于random.nextInt(3),0 - 3 在概率上是均匀的)
Rebalancing
dataStream.rebalance()
分区元素循环,每个分区创建相等的负载。数据发生倾斜的时候可以用于性能优化
对数据集进行再平衡,重分组,消除数据倾斜
Rescaling
dataSteam.rescale()
rescale与rebalance很像,也是将数据均匀分布到各下游各实例上,但它的传输开销更小,因为rescale并不是将每个数据轮询地发送给下游每个实例,而是就近发送给下游实例
Custom Partitioning
自定义分区需要时间Paritition接口
dataStream.partitionCustom(partitioner, “someKey”)
或者dataStream.partitionCustom(partitioner,0)
并发
推荐上下游并行度保持一致,即 Kafka 的分区数等于 Flink Consumer 的并行度。
为了加快数据的处理速度,来设置 Flink 消费者的并行度大于 Kafka 的分区数
如果你不做任何的设置则会导致部分 Flink Consumer 线程永远消费不到数据
需要设置 Flink 的 Redistributing,也就是数据重分配。
数据重分配-再分区
实例
dataStream
.setParallelism(2)
// 采用REBALANCE分区策略重分区
.rebalance() //.rescale()
.print()
.setParallelism(4);
.setParallelism(2)
// 采用REBALANCE分区策略重分区
.rebalance() //.rescale()
.print()
.setParallelism(4);
Rebalance 分区策略,数据会以 round-robin 的方式对数据进行再次分区,可以全局负载均衡。
Rescale 分区策略基于上下游的并行度,会将数据以循环的方式输出到下游的每个实例中
状态
Flink重启策略
Flink设置环境
// 1 设置全局并行度设置为3 kafka分区数为6
env.setParallelism(3);
env.setParallelism(3);
// 2. 设置 checkPoint
// 2.1 开启Checkpoint,每 5 分钟做一次CK
env.enableCheckpointing(3000000L);
// 2.2 设置 ck 超时时间 为 1 分钟
env.getCheckpointConfig().setCheckpointTimeout(60000L);
// 2.3 指定CK的一致性语义
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 2.4 设置任务关闭的时候保留最后一次CK数据
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
// 2.1 开启Checkpoint,每 5 分钟做一次CK
env.enableCheckpointing(3000000L);
// 2.2 设置 ck 超时时间 为 1 分钟
env.getCheckpointConfig().setCheckpointTimeout(60000L);
// 2.3 指定CK的一致性语义
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 2.4 设置任务关闭的时候保留最后一次CK数据
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
// 3. 设置自动重启策略, 重试 3 次, 间隔10秒
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 10000L));
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 10000L));
// 4. 设置状态后端
env.setStateBackend(new FsStateBackend(CHECK_POINT_PATH + "/" + jobName));
env.setStateBackend(new FsStateBackend(CHECK_POINT_PATH + "/" + jobName));
窗口
骨架
// Keyed Window
stream
.keyBy(...) <- 按照一个Key进行分组
.window(...) <- 将数据流中的元素分配到相应的窗口中
[.trigger(...)] <- 指定触发器Trigger(可选)
[.evictor(...)] <- 指定清除器Evictor(可选)
.reduce/aggregate/process() <- 窗口处理函数Window Function
// Non-Keyed Window
stream
.windowAll(...) <- 不分组,将数据流中的所有元素分配到相应的窗口中
[.trigger(...)] <- 指定触发器Trigger(可选)
[.evictor(...)] <- 指定清除器Evictor(可选)
.reduce/aggregate/process() <- 窗口处理函数Window Function
stream
.keyBy(...) <- 按照一个Key进行分组
.window(...) <- 将数据流中的元素分配到相应的窗口中
[.trigger(...)] <- 指定触发器Trigger(可选)
[.evictor(...)] <- 指定清除器Evictor(可选)
.reduce/aggregate/process() <- 窗口处理函数Window Function
// Non-Keyed Window
stream
.windowAll(...) <- 不分组,将数据流中的所有元素分配到相应的窗口中
[.trigger(...)] <- 指定触发器Trigger(可选)
[.evictor(...)] <- 指定清除器Evictor(可选)
.reduce/aggregate/process() <- 窗口处理函数Window Function
分组
keyBy
分组
windowAll
不分组窗口
所有数据将发送给下游的单个实例,或者说下游算子的并行度为1
窗口分配器(WindowAssigner)
一种基于数量(Count-based Window)
不保证顺序
一种基于时间(Time-based Window):TimeWindow
滚动窗口:Tumbling
TumblingEventTimeWindows
TumblingProcessingTimeWindows
滑动窗口:Sliding
SlidingEventTimeWindows
SlidingProcessingTimeWindows
会话窗口:Session
EventTimeSessionWindows
ProcessingTimeSessionWindows
DynamicProcessingTimeSessionWindows
窗口函数
增量计算
增量计算指的是窗口保存一份中间数据,每流入一个新元素,新元素与中间数据两两合一,生成新的中间数据,再保存到窗口中
ReduceFunction
实例
case class StockPrice(symbol: String, price: Double)
val input: DataStream[StockPrice] = ...
senv.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
// reduce的返回类型必须和输入类型StockPrice一致
val sum = input
.keyBy(s => s.symbol)
.timeWindow(Time.seconds(10))
.reduce((s1, s2) => StockPrice(s1.symbol, s1.price + s2.price))
val input: DataStream[StockPrice] = ...
senv.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
// reduce的返回类型必须和输入类型StockPrice一致
val sum = input
.keyBy(s => s.symbol)
.timeWindow(Time.seconds(10))
.reduce((s1, s2) => StockPrice(s1.symbol, s1.price + s2.price))
AggregateFunction
实例
case class StockPrice(symbol: String, price: Double)
// IN: StockPrice
// ACC:(String, Double, Int) - (symbol, sum, count)
// OUT: (String, Double) - (symbol, average)
class AverageAggregate extends AggregateFunction[StockPrice, (String, Double, Int), (String, Double)] {
override def createAccumulator() = ("", 0, 0)
override def add(item: StockPrice, accumulator: (String, Double, Int)) =
(item.symbol, accumulator._2 + item.price, accumulator._3 + 1)
override def getResult(accumulator:(String, Double, Int)) = (accumulator._1 ,accumulator._2 / accumulator._3)
override def merge(a: (String, Double, Int), b: (String, Double, Int)) =
(a._1 ,a._2 + b._2, a._3 + b._3)
}
senv.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
val input: DataStream[StockPrice] = ...
val average = input
.keyBy(s => s.symbol)
.timeWindow(Time.seconds(10))
.aggregate(new AverageAggregate)
// IN: StockPrice
// ACC:(String, Double, Int) - (symbol, sum, count)
// OUT: (String, Double) - (symbol, average)
class AverageAggregate extends AggregateFunction[StockPrice, (String, Double, Int), (String, Double)] {
override def createAccumulator() = ("", 0, 0)
override def add(item: StockPrice, accumulator: (String, Double, Int)) =
(item.symbol, accumulator._2 + item.price, accumulator._3 + 1)
override def getResult(accumulator:(String, Double, Int)) = (accumulator._1 ,accumulator._2 / accumulator._3)
override def merge(a: (String, Double, Int), b: (String, Double, Int)) =
(a._1 ,a._2 + b._2, a._3 + b._3)
}
senv.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
val input: DataStream[StockPrice] = ...
val average = input
.keyBy(s => s.symbol)
.timeWindow(Time.seconds(10))
.aggregate(new AverageAggregate)
全量计算
全量计算指的是窗口先缓存该窗口所有元素,等到触发条件后对窗口内的全量元素执行计算。
ProcessWindowFunction
实例
//对价格出现的次数做了统计,选出出现次数最多的输出出来。
case class StockPrice(symbol: String, price: Double)
class FrequencyProcessFunction extends ProcessWindowFunction[StockPrice, (String, Double), String, TimeWindow] {
override def process(key: String, context: Context, elements: Iterable[StockPrice], out: Collector[(String, Double)]): Unit = {
// 股票价格和该价格出现的次数
var countMap = scala.collection.mutable.Map[Double, Int]()
for(element <- elements) {
val count = countMap.getOrElse(element.price, 0)
countMap(element.price) = count + 1
}
// 按照出现次数从高到低排序
val sortedMap = countMap.toSeq.sortWith(_._2 > _._2)
// 选出出现次数最高的输出到Collector
if (sortedMap.size > 0) {
out.collect((key, sortedMap(0)._1))
}
}
}
senv.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
val input: DataStream[StockPrice] = ...
val frequency = input
.keyBy(s => s.symbol)
.timeWindow(Time.seconds(10))
.process(new FrequencyProcessFunction)
case class StockPrice(symbol: String, price: Double)
class FrequencyProcessFunction extends ProcessWindowFunction[StockPrice, (String, Double), String, TimeWindow] {
override def process(key: String, context: Context, elements: Iterable[StockPrice], out: Collector[(String, Double)]): Unit = {
// 股票价格和该价格出现的次数
var countMap = scala.collection.mutable.Map[Double, Int]()
for(element <- elements) {
val count = countMap.getOrElse(element.price, 0)
countMap(element.price) = count + 1
}
// 按照出现次数从高到低排序
val sortedMap = countMap.toSeq.sortWith(_._2 > _._2)
// 选出出现次数最高的输出到Collector
if (sortedMap.size > 0) {
out.collect((key, sortedMap(0)._1))
}
}
}
senv.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
val input: DataStream[StockPrice] = ...
val frequency = input
.keyBy(s => s.symbol)
.timeWindow(Time.seconds(10))
.process(new FrequencyProcessFunction)
ProcessWindowFunction相比AggregateFunction和ReduceFunction的应用场景更广,能解决的问题也更复杂。但ProcessWindowFunction需要将窗口中所有元素作为状态存储起来,这将占用大量的存储资源,尤其是在数据量大窗口多的场景下,使用不慎可能导致整个程序宕机。比如,每天的数据在TB级,我们需要Slide为十分钟Size为一小时的滑动窗口,这种设置会导致窗口数量很多,而且一个元素会被复制好多份分给每个所属的窗口,这将带来巨大的内存压力。
ProcessWindowFunction与增量计算相结合
对于一个窗口来说,Flink先增量计算,窗口关闭前,将增量计算结果发送给ProcessWindowFunction作为输入再进行处理。
实例
Lambda函数对所有内容进行最大值和最小值的处理,这一步是增量计算。计算的结果以数据类型(String, Double, Double)传递给WindowEndProcessFunction,WindowEndProcessFunction只需要将窗口结束的时间戳添加到结果MaxMinPrice中即可。
实例
case class StockPrice(symbol: String, price: Double)
case class MaxMinPrice(symbol: String, max: Double, min: Double, windowEndTs: Long)
class WindowEndProcessFunction extends ProcessWindowFunction[(String, Double, Double), MaxMinPrice, String, TimeWindow] {
override def process(key: String,
context: Context,
elements: Iterable[(String, Double, Double)],
out: Collector[MaxMinPrice]): Unit = {
val maxMinItem = elements.head
val windowEndTs = context.window.getEnd
out.collect(MaxMinPrice(key, maxMinItem._2, maxMinItem._3, windowEndTs))
}
}
senv.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
val input: DataStream[StockPrice] = ...
// reduce的返回类型必须和输入类型相同
// 为此我们将StockPrice拆成一个三元组 (股票代号,最大值、最小值)
val maxMin = input
.map(s => (s.symbol, s.price, s.price))
.keyBy(s => s._1)
.timeWindow(Time.seconds(10))
.reduce(
((s1: (String, Double, Double), s2: (String, Double, Double)) => (s1._1, Math.max(s1._2, s2._2), Math.min(s1._3, s2._3))),
new WindowEndProcessFunction
)
触发器:Trigger
决定何时启动Window Function来处理窗口中的数据以及何时将窗口内的数据清理
增量计算窗口函数对每个新流入的数据直接进行聚合,Trigger决定了在窗口结束时将聚合结果发送出去
全量计算窗口函数需要将窗口内的元素缓存,Trigger决定了在窗口结束时对所有元素进行计算然后将结果发送出去
每个窗口都有一个默认的Trigger,比如前文这些例子都是基于Processing Time的时间窗口,当到达窗口的结束时间时,Trigger以及对应的计算被触发
如果我们有一些个性化的触发条件,比如窗口中遇到某些特定的元素、元素总数达到一定数量或窗口中的元素到达时满足某种特定的模式时,我们可以自定义一个Trigger
甚至可以在Trigger中定义一些提前计算的逻辑,比如在Event Time语义中,虽然Watermark还未到达,但是我们可以定义提前计算输出的逻辑,以快速获取计算结果,获得更低的延迟。
WindowAssigner的默认trigger
Event Time的窗口会有一个EventTimeTrigger
每当窗口的Watermark时间戳到达窗口的结束时间,Trigger会发送FIRE
ProcessingTimeTrigger对应Processing Time窗口
CountTrigger对应Count-based窗口。
TriggerResult的结果
CONTINUE:什么都不做
FIRE:启动计算并将结果发送给下游,不清理窗口数据。
PURGE:清理窗口数据但不执行计算。
FIRE_AND_PURGE:启动计算,发送结果然后清理窗口数据。
自定义Trigger
在股票或任何交易场景中,我们比较关注价格急跌的情况,默认窗口长度是60秒,如果价格跌幅超过5%,则立即执行Window Function,如果价格跌幅在1%到5%之内,那么10秒后触发Window Function。
实例
class MyTrigger extends Trigger[StockPrice, TimeWindow] {
override def onElement(element: StockPrice,
time: Long,
window: TimeWindow,
triggerContext: Trigger.TriggerContext): TriggerResult = {
val lastPriceState: ValueState[Double] = triggerContext.getPartitionedState(new ValueStateDescriptor[Double]("lastPriceState", classOf[Double]))
// 设置返回默认值为CONTINUE
var triggerResult: TriggerResult = TriggerResult.CONTINUE
// 第一次使用lastPriceState时状态是空的,需要先进行判断
// 状态数据由Java端生成,如果是空,返回一个null
// 如果直接使用Scala的Double,需要使用下面的方法判断是否为空
if (Option(lastPriceState.value()).isDefined) {
if ((lastPriceState.value() - element.price) > lastPriceState.value() * 0.05) {
// 如果价格跌幅大于5%,直接FIRE_AND_PURGE
triggerResult = TriggerResult.FIRE_AND_PURGE
} else if ((lastPriceState.value() - element.price) > lastPriceState.value() * 0.01) {
val t = triggerContext.getCurrentProcessingTime + (10 * 1000 - (triggerContext.getCurrentProcessingTime % 10 * 1000))
// 给10秒后注册一个Timer
triggerContext.registerProcessingTimeTimer(t)
}
}
lastPriceState.update(element.price)
triggerResult
}
// 我们不用EventTime,直接返回一个CONTINUE
override def onEventTime(time: Long, window: TimeWindow, triggerContext: Trigger.TriggerContext): TriggerResult = {
TriggerResult.CONTINUE
}
override def onProcessingTime(time: Long, window: TimeWindow, triggerContext: Trigger.TriggerContext): TriggerResult = {
TriggerResult.FIRE_AND_PURGE
}
override def clear(window: TimeWindow, triggerContext: Trigger.TriggerContext): Unit = {
val lastPrice: ValueState[Double] = triggerContext.getPartitionedState(new ValueStateDescriptor[Double]("lastPrice", classOf[Double]))
lastPrice.clear()
}
}
senv.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
val input: DataStream[StockPrice] = ...
val average = input
.keyBy(s => s.symbol)
.timeWindow(Time.seconds(60))
.trigger(new MyTrigger)
.aggregate(new AverageAggregate)
override def onElement(element: StockPrice,
time: Long,
window: TimeWindow,
triggerContext: Trigger.TriggerContext): TriggerResult = {
val lastPriceState: ValueState[Double] = triggerContext.getPartitionedState(new ValueStateDescriptor[Double]("lastPriceState", classOf[Double]))
// 设置返回默认值为CONTINUE
var triggerResult: TriggerResult = TriggerResult.CONTINUE
// 第一次使用lastPriceState时状态是空的,需要先进行判断
// 状态数据由Java端生成,如果是空,返回一个null
// 如果直接使用Scala的Double,需要使用下面的方法判断是否为空
if (Option(lastPriceState.value()).isDefined) {
if ((lastPriceState.value() - element.price) > lastPriceState.value() * 0.05) {
// 如果价格跌幅大于5%,直接FIRE_AND_PURGE
triggerResult = TriggerResult.FIRE_AND_PURGE
} else if ((lastPriceState.value() - element.price) > lastPriceState.value() * 0.01) {
val t = triggerContext.getCurrentProcessingTime + (10 * 1000 - (triggerContext.getCurrentProcessingTime % 10 * 1000))
// 给10秒后注册一个Timer
triggerContext.registerProcessingTimeTimer(t)
}
}
lastPriceState.update(element.price)
triggerResult
}
// 我们不用EventTime,直接返回一个CONTINUE
override def onEventTime(time: Long, window: TimeWindow, triggerContext: Trigger.TriggerContext): TriggerResult = {
TriggerResult.CONTINUE
}
override def onProcessingTime(time: Long, window: TimeWindow, triggerContext: Trigger.TriggerContext): TriggerResult = {
TriggerResult.FIRE_AND_PURGE
}
override def clear(window: TimeWindow, triggerContext: Trigger.TriggerContext): Unit = {
val lastPrice: ValueState[Double] = triggerContext.getPartitionedState(new ValueStateDescriptor[Double]("lastPrice", classOf[Double]))
lastPrice.clear()
}
}
senv.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
val input: DataStream[StockPrice] = ...
val average = input
.keyBy(s => s.symbol)
.timeWindow(Time.seconds(60))
.trigger(new MyTrigger)
.aggregate(new AverageAggregate)
在自定义Trigger时,如果使用了状态,一定要使用clear方法将状态数据清理,否则随着窗口越来越多,状态数据会越积越多。
清除器:Evictor
清除器(Evictor)是在WindowAssigner和Trigger的基础上的一个可选选项,用来清除一些数据。我们可以在Window Function执行前或执行后调用Evictor。
evictBefore和evictAfter分别在Window Function之前和之后被调用,窗口的所有元素被放在了Iterable<TimestampedValue<T>>,我们要实现自己的清除逻辑。当然,对于增量计算的ReduceFunction和AggregateFunction,我们没必要使用Evictor。
Flink提供了几个实现好的Evictor:
CountEvictor保留一定数目的元素,多余的元素按照从前到后的顺序先后清理
TimeEvictor保留一个时间段的元素,早于这个时间段的元素会被清理。
https://zhuanlan.zhihu.com/p/102325190
水位线 Watermaker
what
为了解决数据到达 Flink 之前发生的乱序问题,用 EventTime 和 WaterMark 进行配合使用。
was
水印的出现是为了解决实时计算中的数据乱序问题,它的本质是 DataStream 中一个带有时间戳的元素
如果 Flink 系统中出现了一个 WaterMark T,那么就意味着 EventTime < T 的数据都已经到达,窗口的结束时间和 T 相同的那个窗口被触发进行计算了。
也就是说:水印是 Flink 判断迟到数据的标准,同时也是窗口触发的标记
在程序并行度大于 1 的情况下,会有多个流产生水印和窗口,这时候 Flink 会选取时间戳最小的水印。
如何计算Watermaker
Watermaker = 当前窗口的最大的事件时间 - 最大允许的延迟时间或乱序时间
Flink 在用时间 + 窗口 + 水印来解决实际生产中的数据乱序问题,有如下的触发条件:
watermark 时间 >= window_end_time;
在 [window_start_time,window_end_time) 中有数据存在,这个窗口是左闭右开的
Watermaker有什么用?
之前的窗口都是按照系统时间来触发计算的,如: [10:00:00 ~ 10:00:10) 的窗口,一但系统时间到了10:00:10就会触发计算,那么可能会导致延迟到达的数据丢失!
现在有了Watermaker,窗口就可以按照Watermaker来触发计算! 也就是说Watermaker是用来触发窗口计算的!
Watermaker是用来触发窗口计算的!
Watermaker如何触发窗口计算的?
窗口计算的触发条件为:
1.窗口中有数据
2.Watermaker >= 窗口的结束时间
Watermaker = 当前窗口的最大的事件时间 - 最大允许的延迟时间或乱序时间
实例
标准
/**
* @author WGR
* @create 2021/9/13 -- 15:18
*/
public class WindowTest3_EventTimeWindow {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.getConfig().setAutoWatermarkInterval(100);
// socket文本流
DataStream<String> inputStream = env.socketTextStream("192.168.1.180", 9998);
// 转换成SensorReading类型,分配时间戳和watermark
DataStream<SensorReading> dataStream = inputStream.map(line -> {
String[] fields = line.split(",");
SensorReading sensorReading = new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
System.out.println(sensorReading.toString());
return sensorReading;
})
// 乱序数据设置时间戳和watermark
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<SensorReading>(Time.seconds(2)) {
@Override
public long extractTimestamp(SensorReading element) {
return element.getTimestamp() * 1000L;
}
});
// 基于事件时间的开窗聚合,统计15秒内温度的最小值
SingleOutputStreamOperator<SensorReading> minTempStream = dataStream.keyBy("id")
.timeWindow(Time.seconds(15))
.minBy("temperature");
minTempStream.print("minTemp");
env.execute();
}
}
* @author WGR
* @create 2021/9/13 -- 15:18
*/
public class WindowTest3_EventTimeWindow {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.getConfig().setAutoWatermarkInterval(100);
// socket文本流
DataStream<String> inputStream = env.socketTextStream("192.168.1.180", 9998);
// 转换成SensorReading类型,分配时间戳和watermark
DataStream<SensorReading> dataStream = inputStream.map(line -> {
String[] fields = line.split(",");
SensorReading sensorReading = new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
System.out.println(sensorReading.toString());
return sensorReading;
})
// 乱序数据设置时间戳和watermark
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<SensorReading>(Time.seconds(2)) {
@Override
public long extractTimestamp(SensorReading element) {
return element.getTimestamp() * 1000L;
}
});
// 基于事件时间的开窗聚合,统计15秒内温度的最小值
SingleOutputStreamOperator<SensorReading> minTempStream = dataStream.keyBy("id")
.timeWindow(Time.seconds(15))
.minBy("temperature");
minTempStream.print("minTemp");
env.execute();
}
}
OutputTag+allowedLateness
/**
* @author WGR
* @create 2021/9/14 -- 11:04
*/
public class WindowTest4_EventTimeWindow {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// env.setParallelism(1);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.getConfig().setAutoWatermarkInterval(100);
// socket文本流
DataStream<String> inputStream = env.socketTextStream("localhost", 7777);
// 转换成SensorReading类型,分配时间戳和watermark
DataStream<SensorReading> dataStream = inputStream.map(line -> {
String[] fields = line.split(",");
return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
})
// 乱序数据设置时间戳和watermark
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<SensorReading>(Time.seconds(2)) {
@Override
public long extractTimestamp(SensorReading element) {
return element.getTimestamp() * 1000L;
}
});
OutputTag<SensorReading> outputTag = new OutputTag<SensorReading>("late") {
};
// 基于事件时间的开窗聚合,统计15秒内温度的最小值
SingleOutputStreamOperator<SensorReading> minTempStream = dataStream.keyBy("id")
.timeWindow(Time.seconds(15))
.allowedLateness(Time.minutes(1))
.sideOutputLateData(outputTag)
.minBy("temperature");
minTempStream.print("minTemp");
minTempStream.getSideOutput(outputTag).print("late");
env.execute();
}
}
* @author WGR
* @create 2021/9/14 -- 11:04
*/
public class WindowTest4_EventTimeWindow {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// env.setParallelism(1);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.getConfig().setAutoWatermarkInterval(100);
// socket文本流
DataStream<String> inputStream = env.socketTextStream("localhost", 7777);
// 转换成SensorReading类型,分配时间戳和watermark
DataStream<SensorReading> dataStream = inputStream.map(line -> {
String[] fields = line.split(",");
return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
})
// 乱序数据设置时间戳和watermark
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<SensorReading>(Time.seconds(2)) {
@Override
public long extractTimestamp(SensorReading element) {
return element.getTimestamp() * 1000L;
}
});
OutputTag<SensorReading> outputTag = new OutputTag<SensorReading>("late") {
};
// 基于事件时间的开窗聚合,统计15秒内温度的最小值
SingleOutputStreamOperator<SensorReading> minTempStream = dataStream.keyBy("id")
.timeWindow(Time.seconds(15))
.allowedLateness(Time.minutes(1))
.sideOutputLateData(outputTag)
.minBy("temperature");
minTempStream.print("minTemp");
minTempStream.getSideOutput(outputTag).print("late");
env.execute();
}
}
自我总结:
watermarker是将窗口触发时间延迟n秒,用以接收延迟到来的时间窗口期内的eventtime范围时间,窗口开始结束时间是严格的窗口时间范围,但是出发结束的时间被延迟了。
滑动窗口是将窗口前沿时间扩大到n时间以前,扩大统计时间范围
滚动窗口是 统计时间和滑动时间相同的滑动窗口
refer
https://blog.csdn.net/q322625/article/details/110176094
https://www.cnblogs.com/dalianpai/p/15268363.html
checkpoint
backpress
特点
短时间内流量陡增造成数据的堆积或者消费速度变慢
数据的消费速度小于数据的生产速度
危害
导致checkpoint超时
影响state大小:拖慢checkpoint,设置到时OOM
定位
排查时先把operator chain禁用,方便定位到具体算子
Flink Web Ul
该节点发送速率赶不上数据接收速率
下游节点接收速率较慢,通过反压限制了该节点的发送
利用metic定位
原因
数据倾斜
可以在 Flink 的后台管理页面看到每个 Task 处理数据的大小。当数据倾斜出现时,通常是简单地使用类似 KeyBy 等分组聚合函数导致的,需要用户将热点 Key 进行预处理,降低或者消除热点 Key 的影
代码执行效率问题
通过查看运行机器节点的 CPU 和内存情况定位问题
分析GC情况
通过 -XX:+PrintGCDetails 参数查看 GC 的日志
外部组件交互
0 条评论
下一页