Flink调优总结
2023-07-01 15:42:58 40 举报
AI智能生成
Flink调优主要包括内存管理、并行度设置、任务链优化等方面。首先,合理配置内存资源,避免内存溢出或浪费;其次,根据数据量和计算复杂度调整并行度,提高任务执行效率;再者,优化任务链,减少中间结果的传输和存储开销。此外,还需关注网络拓扑结构、作业调度策略等细节,以提高整体性能。总之,Flink调优需要综合考虑多个方面,通过不断尝试和调整,找到最佳配置,实现高效稳定的数据处理。
作者其他创作
大纲/内容
五、Job优化
使用DataGen造数据
DataStream的DataGenerator
public class DataGenTest {
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
SingleOutputStreamOperator<OrderInfo> orderInfoDS = env.addSource(new DataGeneratorSource<>(new RandomGenerator<OrderInfo>() {
@Override
public OrderInfo next() {
return OrderInfo.builder()
.id( random.nextInt(1, 100000))
.userId( random.nextInt(1, 100000))
.totalAmount(random.nextUniform(1, 1000))
.createTime(System.currentTimeMillis())
.build();
}
})).returns(Types.POJO(OrderInfo.class));
orderInfoDS.print("orderInfoDS");
env.execute("");
}
}
SQL的DataGenerator
public class DataGenSQLTest {
public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
String orderSql="CREATE TABLE order_info (\n" +
" id INT,\n" +
" userId BIGINT,\n" +
" totalAmount DOUBLE,\n" +
" createTime AS localtimestamp,\n" +
" WATERMARK FOR createTime AS createTime\n" +
") WITH (\n" +
" 'connector' = 'datagen',\n" +
" 'rows-per-second'='20000',\n" +
" 'fields.id.kind'='sequence',\n" +
" 'fields.id.start'='1',\n" +
" 'fields.id.end'='100000000',\n" +
" 'fields.user_id.kind'='random',\n" +
" 'fields.user_id.min'='1',\n" +
" 'fields.user_id.max'='1000000',\n" +
" 'fields.total_amount.kind'='random',\n" +
" 'fields.total_amount.min'='1',\n" +
" 'fields.total_amount.max'='1000'\n" +
")";
tableEnv.executeSql(orderSql);
tableEnv.executeSql("select * from order_info").print();
}
}
算子指定UUID
算子 UID 是根据 JobGraph 自动生成的,JobGraph 的更改可能会导致
UUID 改变。手动指定算子 UUID ,可以让 Flink 有效地将算子的状态从 savepoint 映
射到作业修改后(拓扑图可能也有改变)的正确的算子上
.uid("uv-reduce").name("uv-reduce")
链路延迟测量
metrics.latency.interval: 30000 #默认 0,表示禁用,单位毫秒
监控的粒度
metrics.latency.granularity: operator #默认 operator
single:每个算子单独统计延迟
operator(默认值):每个下游算子都统计自己与 Source 算子之间的延迟
subtask:每个下游算子的 sub-task 都统计自己与 Source 算子的 sub-task 之间
的延迟
开启对象重用
当调用了 enableObjectReuse 方法后,Flink 会把中间深拷贝的步骤都省略掉,
SourceFunction 产生的数据直接作为 MapFunction 的输入,可以减少 gc 压力。但需要
特别注意的是,这个方法不能随便调用,必须要确保下游 Function 只有一种,或者下游的
Function 均不会改变对象内部的值。否则可能会有线程安全的问题。
SourceFunction 产生的数据直接作为 MapFunction 的输入,可以减少 gc 压力。但需要
特别注意的是,这个方法不能随便调用,必须要确保下游 Function 只有一种,或者下游的
Function 均不会改变对象内部的值。否则可能会有线程安全的问题。
细粒度滑动窗口优化
解决思路:滚动窗口+在线存储+读时聚合
六、FlinkSQL调优
设置空闲状态保留时间
FlinkSQL 的 regular join(inner、left、right),左右表的数据都会一直保存在
状态里,不会清理!要么设置 TTL,要么使用 FlinkSQL 的 interval join
使用 Top-N 语法进行去重,重复数据的出现一般都位于特定区间内(例如一小时
或一天内),过了这段时间之后,对应的状态就不再需要了
开启MiniBatch
MiniBatch 是微批处理,原理是缓存一定的数据后再触发处理,以减少对 State 的访问,
从而提升吞吐并减少数据的输出量。MiniBatch 主要依靠在每个 Task 上注册的 Timer 线程
来触发微批,需要消耗一定的线程调度性能
从而提升吞吐并减少数据的输出量。MiniBatch 主要依靠在每个 Task 上注册的 Timer 线程
来触发微批,需要消耗一定的线程调度性能
// 默认关闭,开启 miniBatch
configuration.setString("table.exec.mini-batch.enabled", "true");
// 批量输出的间隔时间
configuration.setString("table.exec.mini-batch.allow-latency", "5 s");
// 防止 OOM 设置每个批次最多缓存数据的条数,可以设为 2 万条
configuration.setString("table.exec.mini-batch.size", "20000");
configuration.setString("table.exec.mini-batch.enabled", "true");
// 批量输出的间隔时间
configuration.setString("table.exec.mini-batch.allow-latency", "5 s");
// 防止 OOM 设置每个批次最多缓存数据的条数,可以设为 2 万条
configuration.setString("table.exec.mini-batch.size", "20000");
适用场景
微批处理通过增加延迟换取高吞吐,如果有超低延迟的要求,不建议开启微批处理。通
常对于聚合的场景,微批处理可以显著的提升系统性能,建议开启
开启LocalGlobal
原理概述
LocalGlobal 优 化 将 原 先 的 Aggregate 分 成 Local+Global 两 阶 段 聚 合 , 即
MapReduce 模型中的 Combine+Reduce 处理模式。第一阶段在上游节点本地攒一批数据
进行聚合(localAgg),并输出这次微批的增量值(Accumulator)。第二阶段再将收到
的 Accumulator 合并(Merge),得到最终的结果(GlobalAgg)
LocalGlobal 开启方式
1)LocalGlobal 优化需要先开启 MiniBatch,依赖于 MiniBatch 的参数
2)table.optimizer.agg-phase-strategy: 聚合策略。默认 AUTO,支持参数 AUTO、
TWO_PHASE(使用 LocalGlobal 两阶段聚合)、ONE_PHASE(仅使用 Global 一阶段聚合)。
LocalGlobal 优化对普通聚合(例如 SUM、COUNT、MAX、MIN 和 AVG)有较好的效果
开启Split Distinct
原理概述
从 Flink1.9.0 版 本 开 始 , 提 供 了 COUNT DISTINCT 自 动 打 散 功 能 , 通 过
HASH_CODE(distinct_key) % BUCKET_NUM 打散
HASH_CODE(distinct_key) % BUCKET_NUM 打散
Split Distinct 开启方式
table.optimizer.distinct-agg.split.enabled: true,默认 false。
table.optimizer.distinct-agg.split.bucket-num: Split Distinct 优化在第一层聚合中,被打散的 bucket 数目。默认 1024。
注意事项
(1)目前不能在包含 UDAF 的 Flink SQL 中使用 Split Distinct 优化方法。
(2)拆分出来的两个 GROUP 聚合还可参与 LocalGlobal 优化。
(3)该功能在 Flink1.9.0 版本及以上版本才支持。
多维DISTINCT使用Filter
提交案例:多维Distinct
SELECT
a,
COUNT(DISTINCT b) AS total_b,
COUNT(DISTINCT CASE WHEN c IN ('A', 'B') THEN b ELSE NULL END) AS AB_b,
COUNT(DISTINCT CASE WHEN c IN ('C', 'D') THEN b ELSE NULL END) AS CD_b
FROM T
GROUP BY a
提交案例:使用Filter
SELECT
a,
COUNT(DISTINCT b) AS total_b,
COUNT(DISTINCT b) FILTER (WHERE c IN ('A', 'B')) AS AB_b,
COUNT(DISTINCT b) FILTER (WHERE c IN ('C', 'D')) AS CD_b
FROM T
GROUP BY a
原理概述
Flink SQL 优化器可以识别同一唯一键上的不同 FILTER 参数。如,在上面的示例中,三个 COUNT DISTINCT 都作在 b 列上。
此时,经过优化器识别后,Flink 可以只使用一个共享状态实例,而不是三个状态实例,可减少状态的大小和对状态的访问
此时,经过优化器识别后,Flink 可以只使用一个共享状态实例,而不是三个状态实例,可减少状态的大小和对状态的访问
七、常见故障排除
非法配置异常
看到从 TaskExecutorProcessUtils 或 JobManagerProcessUtils 抛出的
IllegalConfigurationException,通常表明存在无效的配置值(例如负内存大小、大于 1 的
分数等)或配置冲突。请重新配置内存参数
IllegalConfigurationException,通常表明存在无效的配置值(例如负内存大小、大于 1 的
分数等)或配置冲突。请重新配置内存参数
Java 堆空间异常
报 OutOfMemoryError: Java heap space 异常,通常表示 JVM Heap 太小
可以尝试通过增加总内存来增加 JVM 堆大小。也可以直接为 TaskManager 增加任务堆
内存或为 JobManager 增加 JVM 堆内存
直接缓冲存储器异常
报 OutOfMemoryError: Direct buffer memory 异常,通常表示 JVM 直接内
存限制太小或存在直接内存泄漏
元空间异常
报 OutOfMemoryError: Metaspace 异常,通常表示 JVM 元空间限制配置得太小
网络缓冲区数量不足
报 IOException: Insufficient number of network buffers 异常,这仅与
TaskManager 相关。通常表示配置的网络内存大小不够大。您可以尝试增加网络内存
超出容器内存异常
如果 Flink 容器尝试分配超出其请求大小(Yarn 或 Kubernetes)的内存,这通常表
明 Flink 没有预留足够的本机内存
如果在 JobManager 进程中遇到这个问题,还可以通过设置排除可能的 JVM Direct Memory 泄漏的选项来开启 JVM Direct Memory 的限制 jobmanager.memory.enable-jvm-direct-memory-limit: true
如果想手动多分一部分内存给 RocksDB 来防止超用,预防在云原生的环境因 OOM
被 K8S kill,可将 JVM OverHead 内存调大
Checkpoint 失败
Checkpoint Decline
Checkpoint Expire
如果 Checkpoint 做的非常慢,超过了 timeout 还没有完成,则整个 Checkpoint
也会失败
Checkpoint 慢
Source Trigger Checkpoint 慢
使用全量 Checkpoint
作业存在反压或者数据倾斜
Barrier 对齐慢
主线程太忙,导致没机会做 snapshot
同步阶段做的慢
异步阶段做的慢
Kafka动态发现分区
properties.setProperty(FlinkKafkaConsumerBase.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS, 30 * 1000 + "");
Watermark不更新
如果数据源中的某一个分区/分片在一段时间内未发送事件数据,则意味着
WatermarkGenerator 也不会获得任何新数据去生成 watermark。我们称这类数据源为
空闲输入或空闲源。在这种情况下,当某些其他分区仍然发送事件数据的时候就会出现问题。
比如 Kafka 的 Topic 中,由于某些原因,造成个别 Partition 一直没有新的数据。由于下游
算子 watermark 的计算方式是取所有不同的上游并行数据源 watermark 的最小值,则
其 watermark 将不会发生变化,导致窗口、定时器等不会被触发
WatermarkGenerator 也不会获得任何新数据去生成 watermark。我们称这类数据源为
空闲输入或空闲源。在这种情况下,当某些其他分区仍然发送事件数据的时候就会出现问题。
比如 Kafka 的 Topic 中,由于某些原因,造成个别 Partition 一直没有新的数据。由于下游
算子 watermark 的计算方式是取所有不同的上游并行数据源 watermark 的最小值,则
其 watermark 将不会发生变化,导致窗口、定时器等不会被触发
可以使用 WatermarkStrategy 来检测空闲输入并将其标记为
空闲状态.withIdleness(Duration.ofMinutes(5)
空闲状态.withIdleness(Duration.ofMinutes(5)
依赖冲突
打包插件建议使用 maven-shade-plugin。
超出文件描述符限制
脏数据导致数据转发失败
通讯超时
Flink on Yarn其他常见错误
https://developer.aliyun.com/article/719703
一、资源配置调优
内存设置
TaskManager内存模型
内存组成架构
托管内存(Managed Memory)
用于 RocksDB State Backend 的本地内存和批的排序、哈希表、缓存中间结果
taskmanager.memory.managed.fraction,默认 0.4
taskmanager.memory.managed.size,默认 none
如果 size 没指定,则等于 Flink 内存*fraction
网络内存(Network)
网络数据交换所使用的堆外内存大小,如网络数据交换缓冲区
taskmanager.memory.network.fraction,默认 0.1
taskmanager.memory.network.min,默认 64mb
taskmanager.memory.network.max,默认 1gb
Flink 内存*fraction,如果小于配置的 min(或大于配置的 max)大小,则使用 min/max大小
taskmanager.memory.network.min,默认 64mb
taskmanager.memory.network.max,默认 1gb
Flink 内存*fraction,如果小于配置的 min(或大于配置的 max)大小,则使用 min/max大小
Task 内存
Task Heap(堆内)
taskmanager.memory.task.heap.size,默认 none,由 Flink 内存扣除掉其他部分的内存得到。
Task Off-Heap(堆外)
taskmanager.memory.task.off-heap.size,默认 0,表示不使用堆外内存
框架内存
Flink 框架,即 TaskManager 本身所占用的内存,不计入 Slot 的资源中
Framework Heap(堆内)
taskmanager.memory.framework.heap.size,默认 128MB
Framework Off-Heap(堆外)
taskmanager.memory.framework.off-heap.size,默认 128MB
JVM 特定内存
JVM metaspace
JVM 元空间
taskmanager.memory.jvm-metaspace.size,默认 256mb
JVM over-head
JVM 执行时自身所需要的内容,包括线程堆栈、IO、编译
缓存等所使用的内存
taskmanager.memory.jvm-overhead.fraction,默认 0.1
taskmanager.memory.jvm-overhead.min,默认 192mb
taskmanager.memory.jvm-overhead.max,默认 1gb
总进程内存*fraction,如果小于配置的 min(或大于配置的 max)大小,则使用 min/max大小
taskmanager.memory.jvm-overhead.min,默认 192mb
taskmanager.memory.jvm-overhead.max,默认 1gb
总进程内存*fraction,如果小于配置的 min(或大于配置的 max)大小,则使用 min/max大小
案例分析
基于Yarn模式,一般参数指定的是总进程内存,taskmanager.memory.process.size,
比如指定为 4G,每一块内存得到大小如下:
(1)计算 Flink 内存
JVM 元空间 256m
JVM 执行开销: 4g*0.1=409.6m,在[192m,1g]之间,最终结果 409.6m
Flink 内存=4g-256m-409.6m=3430.4m
(2)网络内存=3430.4m*0.1=343.04m,在[64m,1g]之间,最终结果 343.04m
(3)托管内存=3430.4m*0.4=1372.16m
(4)框架内存,堆内和堆外都是 128m
(5)Task 堆内内存=3430.4m-128m-128m-343.04m-1372.16m=1459.2m
生产资源配置示例
bin/flink run \
-t yarn-per-job \
-d \
-p 5 \ 指定并行度
-Dyarn.application.queue=test \ 指定 yarn 队列
-Djobmanager.memory.process.size=2048mb \ JM2~4G 足够
-Dtaskmanager.memory.process.size=4096mb \ 单个 TM2~8G 足够
-Dtaskmanager.numberOfTaskSlots=2 \ 与容器核数 1core:1slot 或 2core:1slot
-c com.flink.tuning.UvDemo \
/opt/module/flink-1.13.1/myjar/flink-tuning-1.0-SNAPSHOT.jar
-t yarn-per-job \
-d \
-p 5 \ 指定并行度
-Dyarn.application.queue=test \ 指定 yarn 队列
-Djobmanager.memory.process.size=2048mb \ JM2~4G 足够
-Dtaskmanager.memory.process.size=4096mb \ 单个 TM2~8G 足够
-Dtaskmanager.numberOfTaskSlots=2 \ 与容器核数 1core:1slot 或 2core:1slot
-c com.flink.tuning.UvDemo \
/opt/module/flink-1.13.1/myjar/flink-tuning-1.0-SNAPSHOT.jar
Flink 是实时流处理,关键在于资源情况能不能抗住高峰时期每秒的数据量,通常用
QPS/TPS 来描述数据情况
QPS/TPS 来描述数据情况
合理利用cpu资源
使用DefaultResourceCalculator 策略
Yarn 的容量调度器默认情况下是使用“DefaultResourceCalculator”分配策略,只根
据内存调度资源,所以在 Yarn 的资源管理页面上看到每个容器的 vcore 个数还是 1
使用DominantResourceCalculator策略
bin/flink run \
-t yarn-per-job \
-d \
-p 5 \
-Drest.flamegraph.enabled=true \
-Dyarn.application.queue=test \
-Djobmanager.memory.process.size=1024mb \
-Dtaskmanager.memory.process.size=4096mb \
-Dtaskmanager.numberOfTaskSlots=2 \
-c com.flink.tuning.UvDemo \
/opt/module/flink-1.13.1/myjar/flink-tuning-1.0-SNAPSHOT.jar
-t yarn-per-job \
-d \
-p 5 \
-Drest.flamegraph.enabled=true \
-Dyarn.application.queue=test \
-Djobmanager.memory.process.size=1024mb \
-Dtaskmanager.memory.process.size=4096mb \
-Dtaskmanager.numberOfTaskSlots=2 \
-c com.flink.tuning.UvDemo \
/opt/module/flink-1.13.1/myjar/flink-tuning-1.0-SNAPSHOT.jar
容器的 vcore 数:
JobManager1 个,占用 1 个容器,vcore=1
TaskManager3 个,占用 3 个容器,每个容器 vcore=2,总 vcore=2*3=6,因为默认
单个容器的 vcore 数=单 TM 的 slot 数
JobManager1 个,占用 1 个容器,vcore=1
TaskManager3 个,占用 3 个容器,每个容器 vcore=2,总 vcore=2*3=6,因为默认
单个容器的 vcore 数=单 TM 的 slot 数
使用DominantResourceCalculator策略并指定容器vcore数
bin/flink run \
-t yarn-per-job \
-d \
-p 5 \
-Drest.flamegraph.enabled=true \
-Dyarn.application.queue=test \
-Dyarn.containers.vcores=3 \
-Djobmanager.memory.process.size=1024mb \
-Dtaskmanager.memory.process.size=4096mb \
-Dtaskmanager.numberOfTaskSlots=2 \
-c com.flink.tuning.UvDemo \
/opt/module/flink-1.13.1/myjar/flink-tuning-1.0-SNAPSHOT.jar
-t yarn-per-job \
-d \
-p 5 \
-Drest.flamegraph.enabled=true \
-Dyarn.application.queue=test \
-Dyarn.containers.vcores=3 \
-Djobmanager.memory.process.size=1024mb \
-Dtaskmanager.memory.process.size=4096mb \
-Dtaskmanager.numberOfTaskSlots=2 \
-c com.flink.tuning.UvDemo \
/opt/module/flink-1.13.1/myjar/flink-tuning-1.0-SNAPSHOT.jar
JobManager1 个,占用 1 个容器,vcore=1
TaskManager3 个,占用 3 个容器,每个容器 vcore =3,总 vcore=3*3=9
并行度设置
全局并行度计算
开发完成后,先进行压测。任务并行度给 10 以下,测试单个并行度的处理上限。然后
总 QPS/单并行度的处理能力 = 并行度
总 QPS/单并行度的处理能力 = 并行度
不能只从 QPS 去得出并行度,因为有些字段少、逻辑简单的任务,单并行度一秒处理
几万条数据。而有些数据字段多,处理逻辑复杂,单并行度一秒只能处理 1000 条数据。
最好根据高峰期的 QPS 压测,并行度*1.2 倍,富余一些资源
几万条数据。而有些数据字段多,处理逻辑复杂,单并行度一秒只能处理 1000 条数据。
最好根据高峰期的 QPS 压测,并行度*1.2 倍,富余一些资源
Source 端并行度的配置
数据源端是 Kafka,Source 的并行度设置为 Kafka 对应 Topic 的分区数。
如果已经等于 Kafka 的分区数,消费速度仍跟不上数据生产速度,考虑下 Kafka 要扩
大分区,同时调大并行度等于分区数。
如果已经等于 Kafka 的分区数,消费速度仍跟不上数据生产速度,考虑下 Kafka 要扩
大分区,同时调大并行度等于分区数。
Transform端并行度的配置
Keyby 之前的算子
般不会做太重的操作,都是比如 map、filter、flatmap 等处理较快的算子,并行度
可以和 source 保持一致
Keyby 之后的算子
如果并发较大,建议设置并行度为 2 的整数次幂,例如:128、256、512
小并发任务的并行度不一定需要设置成 2 的整数次幂;
大并发任务如果没有 KeyBy,并行度也无需设置为 2 的整数次幂;
Sink 端并行度的配置
Sink 端是数据流向下游的地方,可以根据 Sink 端的数据量及下游的服务抗压能力进
行评估。如果 Sink 端是 Kafka,可以设为 Kafka 对应 Topic 的分区数。
另外 Sink 端要与下游的服务进行交互,并行度还得根据下游的服务抗压能力来设置
二、状态及Checkpoint调优
RocksDB大状态调优
开启State访问性能监控
State 访问的性能监控会产生一定的性能影响,所以,默认每 100 次做一次取样
(sample),对不同的 State Backend 性能损失影响不同:
对于 RocksDB State Backend,性能损失大概在 1% 左右
对于 Heap State Backend,性能损失最多可达 10%
state.backend.latency-track.keyed-state-enabled:true #启用访问状态的性能监控
state.backend.latency-track.sample-interval: 100 #采样间隔
state.backend.latency-track.history-size: 128 #保留的采样数据个数,越大越精确
state.backend.latency-track.state-name-as-variable: true #将状态名作为变量
开启增量检查点和本地恢复
开启增量检查点
RocksDB 是目前唯一可用于支持有状态流处理应用程序增量检查点的状态后端
state.backend.incremental: true #默认 false,改为 true。
或代码中指定
new EmbeddedRocksDBStateBackend(true)
开启本地恢复
当 Flink 任务失败时,可以基于本地的状态信息进行恢复任务,可能不需要从 hdfs 拉
取数据。本地恢复目前仅涵盖键控类型的状态后端(RocksDB)
state.backend.local-recovery: true
设置多目录
state.backend.rocksdb.localdir:
/data1/flink/rocksdb,/data2/flink/rocksdb,/data3/flink/rocksdb
注意:不要配置单块磁盘的多个目录,务必将目录配置到多块不同的磁盘上,让多块磁盘来
分担压力
分担压力
调整预定义选项
当 前 支 持 的 预 定 义 选 项 有 DEFAULT 、 SPINNING_DISK_OPTIMIZED 、
SPINNING_DISK_OPTIMIZED_HIGH_MEM 或 FLASH_SSD_OPTIMIZED
state.backend.rocksdb.predefined-options: SPINNING_DISK_OPTIMIZED_HIGH_MEM
#设置为机械硬盘+内存模式
增大block缓存
整个 RocksDB 共享一个 block cache,读数据时内存的 cache 大小,该参数越大读
数据时缓存命中率越高,默认大小为 8 MB,建议设置到 64 ~ 256 MB。
state.backend.rocksdb.block.cache-size: 64m #默认 8m
增大write buffer和 level 阈值大小
RocksDB 中,每个 State 使用一个 Column Family,每个 Column Family 使用独
占的 write buffer,默认 64MB,建议调大。
调整这个参数通常要适当增加 L1 层的大小阈值 max-size-level-base,默认 256m该值太小会造成能存放的 SST 文件过少,层级变多造成查找困难,太大会造成文件过多,
合并困难。建议设为 target_file_size_base(默认 64MB)的倍数,且不能太小,例如 5~10倍,即 320~640MB。
占的 write buffer,默认 64MB,建议调大。
调整这个参数通常要适当增加 L1 层的大小阈值 max-size-level-base,默认 256m该值太小会造成能存放的 SST 文件过少,层级变多造成查找困难,太大会造成文件过多,
合并困难。建议设为 target_file_size_base(默认 64MB)的倍数,且不能太小,例如 5~10倍,即 320~640MB。
state.backend.rocksdb.writebuffer.size: 128m
state.backend.rocksdb.compaction.level.max-size-level-base: 320m
增大write buffer数量
每个 Column Family 对应的 writebuffer 最大数量,这实际上是内存中“只读内存
表“的最大数量,默认值是 2。对于机械磁盘来说,如果内存足够大,可以调大到 5 左右
state.backend.rocksdb.writebuffer.count: 5
增大后台线程数和write buffer合并数
增大线程数
用于后台 flush 和合并 sst 文件的线程数,默认为 1,建议调大,机械硬盘用户可以
改为 4 等更大的值
state.backend.rocksdb.thread.num: 4
增大 writebuffer 最小合并数
将数据从 writebuffer 中 flush 到磁盘时,需要合并的 writebuffer 最小数量,默认
值为 1,可以调成 3
state.backend.rocksdb.writebuffer.number-to-merge: 3
开启分区索引功能
简单来说就是对 RocksDB 的 partitioned Index 做了多级索引
state.backend.rocksdb.memory.partitioned-index-filters:true #默认 false
参数设定案例
bin/flink run \
-t yarn-per-job \
-d \
-p 5 \
-Drest.flamegraph.enabled=true \
-Dyarn.application.queue=test \
-Djobmanager.memory.process.size=1024mb \
-Dtaskmanager.memory.process.size=4096mb \
-Dtaskmanager.numberOfTaskSlots=2 \
-Dstate.backend.incremental=true \
-Dstate.backend.local-recovery=true \
-Dstate.backend.rocksdb.predefined-options=SPINNING_DISK_OPTIMIZED_HIGH_MEM \
-Dstate.backend.rocksdb.block.cache-size=64m \
-Dstate.backend.rocksdb.writebuffer.size=128m \
-Dstate.backend.rocksdb.compaction.level.max-size-level-base=320m \
-Dstate.backend.rocksdb.writebuffer.count=5 \
-Dstate.backend.rocksdb.thread.num=4 \
-Dstate.backend.rocksdb.writebuffer.number-to-merge=3 \
-Dstate.backend.rocksdb.memory.partitioned-index-filters=true \
-Dstate.backend.latency-track.keyed-state-enabled=true \
-c com.flink.tuning.RocksdbTuning \
/opt/module/flink-1.13.1/myjar/flink-tuning-1.0-SNAPSHOT.jar
-t yarn-per-job \
-d \
-p 5 \
-Drest.flamegraph.enabled=true \
-Dyarn.application.queue=test \
-Djobmanager.memory.process.size=1024mb \
-Dtaskmanager.memory.process.size=4096mb \
-Dtaskmanager.numberOfTaskSlots=2 \
-Dstate.backend.incremental=true \
-Dstate.backend.local-recovery=true \
-Dstate.backend.rocksdb.predefined-options=SPINNING_DISK_OPTIMIZED_HIGH_MEM \
-Dstate.backend.rocksdb.block.cache-size=64m \
-Dstate.backend.rocksdb.writebuffer.size=128m \
-Dstate.backend.rocksdb.compaction.level.max-size-level-base=320m \
-Dstate.backend.rocksdb.writebuffer.count=5 \
-Dstate.backend.rocksdb.thread.num=4 \
-Dstate.backend.rocksdb.writebuffer.number-to-merge=3 \
-Dstate.backend.rocksdb.memory.partitioned-index-filters=true \
-Dstate.backend.latency-track.keyed-state-enabled=true \
-c com.flink.tuning.RocksdbTuning \
/opt/module/flink-1.13.1/myjar/flink-tuning-1.0-SNAPSHOT.jar
Checkpoint设置
一般需求,我们的 Checkpoint 时间间隔可以设置为分钟级别(1 ~5 分钟)。对于
状态很大的任务每次 Checkpoint 访问 HDFS 比较耗时,可以设置为 5~10 分钟一次
Checkpoint,并且调大两次 Checkpoint 之间的暂停间隔。
同时,也需要考虑时效性的要求,需要在时效性和性能之间做一个
平衡,如果时效性要求高,结合 end- to-end 时长,设置秒级或毫秒级。
如果 Checkpoint 语义配置为 EXACTLY_ONCE,那么在 Checkpoint 过程中还会存在 barrier 对齐的过程,
可以通过 Flink Web UI 的 Checkpoint 选项卡来查看 Checkpoint 过程中各阶段的耗
时情况,从而确定到底是哪个阶段导致 Checkpoint 时间过长然后针对性的解决问题。
状态很大的任务每次 Checkpoint 访问 HDFS 比较耗时,可以设置为 5~10 分钟一次
Checkpoint,并且调大两次 Checkpoint 之间的暂停间隔。
同时,也需要考虑时效性的要求,需要在时效性和性能之间做一个
平衡,如果时效性要求高,结合 end- to-end 时长,设置秒级或毫秒级。
如果 Checkpoint 语义配置为 EXACTLY_ONCE,那么在 Checkpoint 过程中还会存在 barrier 对齐的过程,
可以通过 Flink Web UI 的 Checkpoint 选项卡来查看 Checkpoint 过程中各阶段的耗
时情况,从而确定到底是哪个阶段导致 Checkpoint 时间过长然后针对性的解决问题。
三、反压处理
概述
反压的理解
简单来说,Flink 拓扑中每个节点(Task)间的数据都以阻塞队列的方式传输,下游来
不及消费导致队列被占满后,上游的生产也会被阻塞,最终导致数据源的摄入被阻塞
不及消费导致队列被占满后,上游的生产也会被阻塞,最终导致数据源的摄入被阻塞
反压的危害
影响 checkpoint 时长
barrier 不会越过普通数据,数据处理被阻塞也会导致
checkpoint barrier 流经整个数据管道的时长变长,导致 checkpoint 总体时间(End to End Duration)变长
checkpoint barrier 流经整个数据管道的时长变长,导致 checkpoint 总体时间(End to End Duration)变长
影响 state 大小
barrier 对齐时,接受到较快的输入管道的 barrier 后,它后面数
据会被缓存起来但不处理,直到较慢的输入管道的 barrier 也到达,这些被缓存的数据会
被放到 state 里面,导致 checkpoint 变大
危害
checkpoint 时间变长有可能导致 checkpoint 超时失败,而 state 大小同
样可能拖慢 checkpoint 甚至导致 OOM (使用 Heap-based StateBackend)或者物理
内存使用超出容器资源(使用 RocksDBStateBackend)的稳定性问题
样可能拖慢 checkpoint 甚至导致 OOM (使用 Heap-based StateBackend)或者物理
内存使用超出容器资源(使用 RocksDBStateBackend)的稳定性问题
定位反压节点
利用 Flink Web UI 定位
Flink 1.13 优化了反压检测的逻辑(使用基于任务 Mailbox 计时,而不在再于堆栈采
样),并且重新实现了作业图的 UI 展示:Flink 现在在 UI 上通过颜色和数值来展示繁忙
和反压的程度
利用Metrics定位
常用的几个 Metrics
outPoolUsage 发送端 Buffer 的使用率
inPoolUsage 接收端 Buffer 的使用率
floatingBuffersUsage(1.9 以上) 接收端 Floating Buffer 的使用率
exclusiveBuffersUsage(1.9 以上) 接收端 Exclusive Buffer 的使用率
其中 inPoolUsage = floatingBuffersUsage + exclusiveBuffersUsage
根据指标分析反压
分析反压的大致思路是:如果一个 Subtask 的发送端 Buffer 占用率很高,则表明它
被下游反压限速了;如果一个 Subtask 的接受端 Buffer 占用很高,则表明它将反压传导
至上游
可以进一步分析数据传输
Flink 1.9及以上版本,还可以根据 floatingBuffersUsage/exclusiveBuffersUsage 以
及其上游 Task 的 outPoolUsage 来进行进一步的分析一个 Subtask 和其上游Subtask 的数据传输。
在流量较大时,Channel 的 Exclusive Buffer 可能会被写满,此时 Flink 会向 Buffer
Pool 申请剩余的 Floating Buffer。这些 Floating Buffer 属于备用 Buffer。
及其上游 Task 的 outPoolUsage 来进行进一步的分析一个 Subtask 和其上游Subtask 的数据传输。
在流量较大时,Channel 的 Exclusive Buffer 可能会被写满,此时 Flink 会向 Buffer
Pool 申请剩余的 Floating Buffer。这些 Floating Buffer 属于备用 Buffer。
产生反应的可能性
(1)该节点的发送速率跟不上它的产生数据速率。这一般会发生在一条输入多条输出
的 Operator(比如 flatmap)。这种情况,该节点是反压的根源节点,它是从 Source Task
到 Sink Task 的第一个出现反压的节点
的 Operator(比如 flatmap)。这种情况,该节点是反压的根源节点,它是从 Source Task
到 Sink Task 的第一个出现反压的节点
(2)下游的节点接受速率较慢,通过反压机制限制了该节点的发送速率。这种情况,
需要继续排查下游节点,一直找到第一个为 OK 的一般就是根源节点
需要继续排查下游节点,一直找到第一个为 OK 的一般就是根源节点
反压的原因及处理
查看是否数据倾斜
我们可以通过 Web UI 各个 SubTask 的 Records Sent 和 Record Received 来确认,另外 Checkpoint detail 里不同 SubTask 的 State size 也是一个分析数据倾斜的有用指标。
使用火焰图分析
如果不是数据倾斜,最常见的问题可能是用户代码的执行效率问题(频繁被阻塞或者性
能问题),需要找到瓶颈算子中的哪部分计算逻辑消耗巨大。
最有用的办法就是对 TaskManager 进行 CPU profile,从中我们可以分析到 Task
Thread 是否跑满一个 CPU 核:如果是的话要分析 CPU 主要花费在哪些函数里面;如果
不是的话要看 Task Thread 阻塞在哪里,可能是用户函数本身有些同步的调用,可能是
checkpoint 或者 GC 等系统活动导致的暂时系统暂停
纵向是调用链,从下往上,顶部就是正在执行的函数
横向是样本出现次数,可以理解为执行时长。
看顶层的哪个函数占据的宽度最大。只要有"平顶"(plateaus),就表示该函数可能存在性能问题
横向是样本出现次数,可以理解为执行时长。
看顶层的哪个函数占据的宽度最大。只要有"平顶"(plateaus),就表示该函数可能存在性能问题
分析GC情况
TaskManager 的内存以及 GC 问题也可能会导致反压,包括 TaskManager JVM 各
区内存不合理导致的频繁 Full GC 甚至失联。通常建议使用默认的 G1 垃圾回收器
通过 GC 日志分析出单个 Flink Taskmanager 堆总大小、年轻代、老年代分配的内
存空间、Full GC 后老年代剩余大小等
外部组件交互
如果发现我们的 Source 端数据读取性能比较低或者 Sink 端写入性能较差,需要检
查第三方组件是否遇到瓶颈,还有就是做维表 join 时的性能问题
四、数据倾斜
判断是否存在数据倾斜
相同 Task 的多个 Subtask 中,个别 Subtask 接收到的数据量明显大于其他
Subtask 接收到的数据量,通过 Flink Web UI 可以精确地看到每个 Subtask 处理了多
少数据,即可判断出 Flink 任务是否存在数据倾斜
有时 Checkpoint detail 里不同 SubTask 的 State size 也是一个分析数据倾
斜的有用指标
数据倾斜的解决
keyBy 后的聚合操作存在数据倾斜
1、不能直接用二次聚合来处理
Flink 是实时流处理,如果 keyby 之后的聚合操作存在数据倾斜,且没有开窗口(没攒
批)的情况下,简单的认为使用两阶段聚合,是不能解决问题的。因为这个时候 Flink 是来
一条处理一条,且向下游发送一条结果,对于原来 keyby 的维度(第二阶段聚合)来讲,
数据量并没有减少,
2、使用 LocalKeyBy 的思想
在 keyBy 上游算子数据发送之前,首先在上游算子的本地对数据进行聚合后,再发送
到下游,使下游接收到的数据量大大减少
keyBy 之前发生数据倾斜
如果 keyBy 之前就存在数据倾斜,上游算子的某些实例可能处理的数据较多,某些实
例可能处理的数据较少,产生该情况可能是因为数据源的数据本身就不均匀,例如由于某些
原因 Kafka 的 topic 中某些 partition 的数据量较大,某些 partition 的数据量较少
这种情况,需要让 Flink 任务强制进行 shuffle。使用 shuffle、rebalance 或 rescale
算子即可将数据均匀分配,从而解决数据倾斜的问题。
keyBy 后的窗口聚合操作存在数据倾斜
两阶段聚合实现思路
第一阶段聚合:key 拼接随机数前缀或后缀,进行 keyby、开窗、聚合
注意:聚合完不再是 WindowedStream,要获取 WindowEnd 作为窗口标记作为第二
阶段分组依据,避免不同窗口的结果聚合到一起
注意:聚合完不再是 WindowedStream,要获取 WindowEnd 作为窗口标记作为第二
阶段分组依据,避免不同窗口的结果聚合到一起
第二阶段聚合:按照原来的 key 及 windowEnd 作 keyby、聚合
注意:随机数范围,需要自己去测,因为 keyby 的分区器是(两次 hash*下游并行度/最大并行度)
0 条评论
下一页