Iceberg总结
2023-01-12 11:04:16 0 举报
AI智能生成
Iceberg数据湖总结
作者其他创作
大纲/内容
概念:数据湖就是集中式的数据存储库,可以存储各种数据格式数据,例如:非结构化、结构化数据、文本、视频...
数据湖做到了离线和实时底层数据存储的统一,解决了Kappa架构的痛点问题
1.Kafka不支持海量数据存储
2.Kappa架构中使用Kafka做分层,Kafka不支持SQL OLAP分析
3.Kafka做分层不能很好的集成原有的数据血缘关系系统、数据质量管理系统
4.Kafka不支持数据的更新,只支持数据的Append
Kappa架构痛点问题
大数据中为什么需要数据湖
什么是数据湖
概念:Apache Iceberg是用于海量数据分析场景的表格式(Table Format),单表可以支持数十PB数据存储。可以和Hive、Presto、Spark、Flink做高效整合。Iceberg是一种数据湖解决方案。
iceberg支持实时/批量数据写入和读取,支持Spark/Flink计算引擎。
不绑定任何底层存储,支持Parquet、ORC、Avro格式兼容行存储和列存储。
Iceberg支持隐藏分区和分区变更,方便业务进行数据分区策略。
Iceberg支持快照数据重复查询,具备版本回滚功能。
Iceberg扫描计划很快,读取表或者查询文件可以不需要分布式SQL引擎。
Iceberg通过表元数据来对查询进行高效过滤。
基于乐观锁的并发支持,提供多线程并发写入能力并保证数据线性一致。
特点
注意:Iceberg非常轻量级,与Spark、Flink进行整合时就是一个jar包,官网:https://iceberg.apache.org
Iceberg概念及特点
Data file - 数据文件
Snapshot - 表快照
Manifest list - 清单列表
Manifest file - 清单文件
Iceberg术语
Table Format 表格式就是指元数据与数据组织方式
Iceberg表格式处于底层存储(例如:HDFS)和上层计算框架(Flink、Spark)之间
分支主题
Iceberg底层表格式
Iceberg表格式
Iceberg数据存储格式
Iceberg支持分区隐藏分区
Iceberg支持表演化
Iceberg支持Schema演化
Iceberg支持分区演化
Iceberg支持列顺序的演化
Iceberg特点
boolean\t布尔类型,true或者false\t
int\t32位有符号整形\t可以转换成long类型
long\t64位有符号整形\t
float\t单精度浮点型\t可以转换成double类型
double\t双精度浮点型\t
date\t日期,不含时间和时区\t
time\t时间,不含日期和时区\t以微秒存储,1000微秒 = 1毫秒
timestamp\t不含时区的timestamp\t以微秒存储,1000微秒 = 1毫秒
timestamptz\t含时区的timestamp\t以微秒存储,1000微秒 = 1毫秒
string\t任意长度的字符串类型\tUTF-8编码
fixed(L)\t长度为L的固定长度字节数组\t
binary\t任意长度的字节数组\t
struct<...>\t任意数据类型组成的一个结构化字段\t
list <E>\t任意数据类型组成的List\t
Iceberg支持数据类型
Iceberg0.12.1 与Hive 2.x & Hive3.1.2匹配
版本匹配
1.将 iceberg-hive-runtime.jar 与 libfb303-0.9.3.jar 上传到Hive服务端与客户端对应的lib目录中
<property> <name>iceberg.engine.hive.enabled</name> <value>true</value> </property>
2.在Hive客户端 $HIVE_HOME/conf/hive-site.xml中配置开启Iceberg支持
配置
Hive操作Iceberg支持多种Catalog元数据管理方式:Hive、Hadoop、第三方、自定义
iceberg.catalog.<catalog_name>.type -- 指定catalog类型
iceberg.catalog.<catalog_name>.warehouse -- 指定数据路径
配置Catalog 属性
1.不设置iceberg.catalog默认使用Hive catalog
3.iceberg.catalog指定成location_based_table,默认是将iceberg数据对应的路径加载成iceberg表
Hive&Iceberg 整合Catalog使用情况
1.如果Iceberg表有分区,使用Hive创建时直接指定分区就可以,Hive创建不支持隐藏分区
2.如果加载iceberg数据路径是分区表,创建iceberg表时分区字段当做普通字段处理就可以
注意
Hive 操作Iceberg格式表
Hive与Iceberg整合
Iceberg底层数据存储结构
查询当前最新数据
查询指定快照数据
根据时间戳查询数据
Iceberg如何根据元数据查询数据
Iceberg表数据组织与查询
2.需要导入maven pom相关依赖
代码实操
Hive Catalog
Hadoop Catalog
3.SparkSQL中设置Catalog
create table xxx (col1 xx...) using iceberg partitioned by (col1)
注意:向Iceberg分区表中插入数据之前,需要对数据按照分区列进行排序
创建分区表
timestampe转换分区
years
months
days
hours
创建隐藏分区
create table ... using iceberg as select ....
replace table .... as select ....
drop table ...
alter table ... add column ...
alter table ... drop column...
alter table .... rename column to ...
Alter 增加列、删除列、重命名列
alter table... add partition ...
alter table ... drop partition ...
Alter 增加、删除分区
4.Spark 与Iceberg 整合DDL操作
spark.read.format("iceberg").load("iceberg table path")
spark.table("iceberg tablename")
1.DataFrame 读取Iceberg表
select * from ${catalog名称}.${库名称}.${表名}.snapshots
2.SparkSQL查询iceberg表快照信息
select * from ${catalog名称}.${库名称}.${表名}.history
3.SparkSQL 查询iceberg表历史
select * from ${catalog名称}.${库名称}.${表名}.files
4.SparkSQL 查询iceberg datafiles
select * from ${catalog名称}.${库名称}.${表名}.manifest
5.SparkSQL 查询iceberg manifest
6.Spark查询Iceberg指定快照数据
7.Spark 指定时间戳查询iceberg数据
8.Spark 回滚Iceberg快照
9.合并Iceberg数据文件
删除历史快照时对应的不再引用的parquet数据文件也会被删除
目前SQL 方式删除快照有bug问题
write.metadata.delete-after-commit.enabled\t - 每次表提交后是否删除旧的元数据文件
write.metadata.previous-version-max\t - 要保留旧的元数据文件数量
在创建iceberg表时可以指定参数处理元数据文件增多问题
10.删除历史快照
5.Spark与Iceberg整合查询操作
1.Insert into
操作的是 t1表中的数据
注意: 更新数据时,在查询的数据匹配条件中只能有一条数据匹配,否则报错。
2.Merge into
将test3表中的记录全部覆盖到test2表中
spark.sql( \"\"\
普通表insert overwrite
会将test1的原数据全部删除,将test3的数据覆盖到test1中
spark.sql( \"\"\
动态覆盖
会覆盖匹配上分区的数据,没有则新增: test1如果分区名字对应上,test3的数据会覆盖匹配上分区test1里面的数据,如果没有该分区,则新增新分区
spark.sql( \"\"\
静态覆盖
分区表overwrite
3.Insert overwrite
delete from 如果where条件删除的是整个分区,那么只会修改元数据
delete from 如果where条件删除的是某条数据,iceberg会重写当前这条数据所在的paquet文件数据
4.delete from
update .... set ...where ..
5.Update
写入普通表
df.writeTo(\"hadoop_prod.iceberg_db.df_tbl1\").create()
写入分区表 写入前一定要按照分区排序
df.sortWithinPartitions($\"loc\") .writeTo(\"hadoop_prod.default.df_tbl2\") .partitionedBy($\"loc\") .create()
spark.read.table("hadoop_prod.default.df_tbl2").show()
df.write(tbl).create() 相当于 CREATE TABLE AS SELECT ...
df.write(tbl).replace() 相当于 REPLACE TABLE AS SELECT ..
df.write(tbl).append() 相当于 INSERT INTO ...
df.write(tbl).overwritePartitions() 相当于动态 INSERT OVERWRITE ...
操作
6.DataFrame 读写Iceberg
6.Spark 与Iceberg整合写操作
1.StructuredStreaming 向Iceberg写数据编码
写Iceberg表写出数据支持两种模式:append和complete,append是将每个微批数据行追加到表中。complete是替换每个微批数据内容。
向Iceberg中写出数据时指定的path可以是HDFS路径,可以是Iceberg表名,如果是表名,要预先创建好Iceberg表。
实时向Iceberg表中写数据时,建议trigger设置至少为1分钟提交一次,因为每次提交都会产生一个新的数据文件和元数据文件,这样可以减少一些小文件。为了进一步减少数据文件,建议定期合并“data files”(参照1.9.6.9)和删除旧的快照(1.9.6.10)。
2.注意事项
7.Structured Streaming 实时向Iceberg写数据
Spark3.1.2与Iceberg0.12.1整合
Flink 支持DataStream API 和 SQL API 批量/实时操作Iceberg
需要设置Checkpoint,Flink向Iceberg中写入Commit数据时,只有Checkpoint成功之后才会Commit数据,否则后期在Hive中查询不到数据。
读取Kafka数据后需要包装成RowData或者Row对象,才能向Iceberg表中写出数据。写出数据时默认是追加数据,如果指定overwrite就是全部覆盖数据。
不建议使用DataStream API 向Iceberg中写数据,建议使用SQL API。
注意问题
可以创建Hive 对应的Iceberg表,可以实时看到表中数据增加,但是注意:虽然loc是分区列,创建时忽略分区列就可以,此外映射表的路径要保持与保存Iceberg数据路径一致。
DataStream API 实时读取Kafka 数据写入Iceberg表
DataStream<RowData> batchData = FlinkSource.forRowData().env(env) .tableLoader(tableLoader) .streaming(true) .build();
DataStream API 批量/实时读取Iceberg表数据
DataStream<RowData> batchData = FlinkSource.forRowData().env(env) .tableLoader(tableLoader) .startSnapshotId(5001264950716649450L) .streaming(true) .build();
DataStream API 基于快照来批量/实时读取Iceberg表数据
//合并data files Actions.forTable(table) .rewriteDataFiles() .targetSizeInBytes(536870912L) .execute();
定义提交Flink任务合并Data files
DataStream API 操作Iceberg
1.创建Catalog
2.使用当前Catalog
3.创建数据库
4.使用当前库
5.创建Iceberg 表
6.向iceberg表中插入数据
SQL API 创建Iceberg表并写入数据
批量查询就是写SQL 直接查询
实时查询需要设置table.dynamic-table-options.enabled 为true,并且在SQL中指定Option
SQL API 批量/实时查询Iceberg表数据
实时查询需要设置table.dynamic-table-options.enabled 为true,并且在SQL中指定Option : start-snapshot-id
SQL API 基于某个快照ID 实时增量读取iceberg数据
案例:SQL API 实现实时读取Kafka 数据写入Iceberg表
Flink SQL API 操作Iceberg
Iceberg目前不支持Flink SQL 查询表的元数据信息,需要使用Java API 实现。
Flink不支持创建带有隐藏分区的Iceberg表
Flink不支持带有WaterMark的Iceberg表
Flink不支持添加列、删除列、重命名列操作。
Flink对Iceberg Connector支持并不完善。
Flink与Iceberg整合不足
Flink 与 Iceberg整合
都是构建于存储格式之上的数据组织方式
提供ACID能力,提供一定的事务、并行执行能力
提供行级别数据修改能力。
提供一定的Schema扩展能力,例如:新增、修改、删除列操作。
支持数据合并,处理小文件。
支持Time travel 查询快照数据。
支持批量和实时数据读写
相同点
Iceberg支持Parquet、avro、orc数据格式,Hudi支持Parquet和Avro格式。
两者数据存储和查询机制不同
对于处理小文件合并时,Iceberg只支持API方式手动处理合并小文件,Hudi对于小文件合并处理可以根据配置自动的执行。
Spark与Iceberg和Hudi整合时,Iceberg对SparkSQL的支持目前来看更好。Spark与Hudi整合更多的是Spark DataFrame API 操作。
关于Schema方面,Iceberg Schema与计算引擎是解耦的,不依赖任何的计算引擎,而Hudi的Schema依赖于计算引擎Schema。
不同点
Iceberg 与 Hudi数据湖技术对比
数据湖Iceberg
0 条评论
下一页