数据湖技术梳理
2022-09-10 09:17:30 3 举报
AI智能生成
数据户技术整理
作者其他创作
大纲/内容
Hudi
总结
读写性能都非常佳
时间轴
组件
操作类型 : 对数据集执行的操作类型
即时时间 : 即时时间通常是一个时间戳(例如:20190117010349),该时间戳按操作开始时间的顺序单调增加。
状态 : 即时的状态
关键点
COMMITS - 一次提交表示将一组记录原子写入到数据集中。
CLEANS - 删除数据集中不再需要的旧文件版本的后台活动。
DELTA_COMMIT - 增量提交是指将一批记录原子写入到MergeOnRead存储类型的数据集中,其中一些/所有数据都可以只写到增量日志中。
COMPACTION - 协调Hudi中差异数据结构的后台活动,例如:将更新从基于行的日志文件变成列格式。在内部,压缩表现为时间轴上的特殊提交。
ROLLBACK - 表示提交/增量提交不成功且已回滚,删除在写入过程中产生的所有部分文件。
SAVEPOINT - 将某些文件组标记为"已保存",以便清理程序不会将其删除。在发生灾难/数据恢复的情况下,它有助于将数据集还原到时间轴上的某个点。
状态
REQUESTED - 表示已调度但尚未启动的操作。
INFLIGHT - 表示当前正在执行该操作。
COMPLETED - 表示在时间轴上完成了该操作。
存储类型和视图
存储类型
写时复制
优先支持在文件级原子更新数据,而无需重写整个表/分区
能够只读取更新的部分,而不是进行低效的扫描或搜索
严格控制文件大小来保持出色的查询性能(小的文件会严重损害查询性能)
读时合并
读时合并存储是写时复制的升级版,从某种意义上说,它仍然可以通过读优化表提供数据集的读取优化视图(写时复制的功能)
支持视图
读优化 + 增量
读优化 + 增量 + 近实时
视图
读优化视图
增量视图
实时视图
写操作
写入方式
INSERT(插入)
UPSERT(插入更新)
BULK_INSERT(批插入)
DeltaStreamer
从Kafka单次摄取新事件,从Sqoop、HiveIncrementalPuller输出或DFS文件夹中的多个文件 增量导入
支持json、avro或自定义记录类型的传入数据
管理检查点,回滚和恢复
利用DFS或Confluent schema注册表的Avro模式。
支持自定义转换操作
Datasource Writer
hudi-spark模块提供了DataSource API,可以将任何DataFrame写入(也可以读取)到Hudi数据集中
存储管理
Hudi中的小文件处理功能,可以分析传入的工作负载并将插入内容分配到现有文件组中, 而不是创建新文件组。新文件组会生成小文件。
可以配置Cleaner来清理较旧的文件片,清理的程度可以调整, 具体取决于查询所需的最长时间和增量拉取所需的回溯。
用户还可以调整基础/parquet文件、日志文件的大小 和预期的压缩率,使足够数量的插入被分到同一个文件组中,最终产生大小合适的基础文件。
智能调整批插入并行度,可以产生大小合适的初始文件组。 实际上,正确执行此操作非常关键,因为文件组一旦创建后就不能删除,只能如前所述对其进行扩展。
对于具有大量更新的工作负载,读取时合并存储提供了一种很好的机制, 可以快速将其摄取到较小的文件中,之后通过压缩将它们合并为较大的基础文件。
删除数据
Soft Deletes(软删除)
Hard Deletes(硬删除)
与Hive同步
支持将数据集的最新模式同步到Hive Metastore,以便查询新的列和分区。 如果需要从命令行或在独立的JVM中运行它,Hudi提供了一个HiveSyncTool
查询数据
视图
hudi_tbl 实现了由 HoodieParquetInputFormat 支持的数据集的读优化视图,从而提供了纯列式数据。
hudi_tbl_rt 实现了由 HoodieParquetRealtimeInputFormat 支持的数据集的实时视图,从而提供了基础数据和日志数据的合并视图。
hive
HiveServer2需要在其辅助jars路径中提供hudi-hadoop-mr-bundle-x.y.z-SNAPSHOT.jar
spark
Hudi DataSource:支持读取优化和增量拉取,类似于标准数据源(例如:spark.read.parquet)的工作方式。
以Hive表读取:支持所有三个视图,包括实时视图,依赖于自定义的Hudi输入格式(再次类似Hive)。
读优化表
要使用SparkSQL将RO表读取为Hive表,只需按如下所示将路径过滤器推入sparkContext
增量拉取
hudi-spark模块提供了DataSource API,这是一种从Hudi数据集中提取数据并通过Spark处理数据的更优雅的方法。
Presto
Presto是一种常用的查询引擎,可提供交互式查询性能。 Hudi RO表可以在Presto中无缝查询。
配置
一般配置
Spark数据源配置
WriteClient 配置
RecordPayload 配置
与云存储连接
AWS S3:S3和Hudi协同工作所需的配置。
Google Cloud Storage:GCS和Hudi协同工作所需的
Spark数据源配置
写选项
TABLE_NAME_OPT_KEY 属性:hoodie.datasource.write.table.name [必须] Hive表名,用于将数据集注册到其中。
OPERATION_OPT_KEY 属性:hoodie.datasource.write.operation, 默认值:upsert 是否为写操作进行插入更新、插入或批量插入。使用bulkinsert将新数据加载到表中,之后使用upsert或insert。 批量插入使用基于磁盘的写入路径来扩展以加载大量输入,而无需对其进行缓存。
STORAGE_TYPE_OPT_KEY 属性:hoodie.datasource.write.storage.type, 默认值:COPY_ON_WRITE 此写入的基础数据的存储类型。两次写入之间不能改变。
PRECOMBINE_FIELD_OPT_KEY 属性:hoodie.datasource.write.precombine.field, 默认值:ts 实际写入之前在preCombining中使用的字段。 当两个记录具有相同的键值时,我们将使用Object.compareTo(..)从precombine字段中选择一个值最大的记录。
PAYLOAD_CLASS_OPT_KEY 属性:hoodie.datasource.write.payload.class, 默认值:org.apache.hudi.OverwriteWithLatestAvroPayload 使用的有效载荷类。如果您想在插入更新或插入时使用自己的合并逻辑,请重写此方法。 这将使得PRECOMBINE_FIELD_OPT_VAL设置的任何值无效
RECORDKEY_FIELD_OPT_KEY 属性:hoodie.datasource.write.recordkey.field, 默认值:uuid 记录键字段。用作HoodieKey中recordKey部分的值。 实际值将通过在字段值上调用.toString()来获得。可以使用点符号指定嵌套字段,例如:a.b.c
PARTITIONPATH_FIELD_OPT_KEY 属性:hoodie.datasource.write.partitionpath.field, 默认值:partitionpath 分区路径字段。用作HoodieKey中partitionPath部分的值。 通过调用.toString()获得实际的值
KEYGENERATOR_CLASS_OPT_KEY 属性:hoodie.datasource.write.keygenerator.class, 默认值:org.apache.hudi.SimpleKeyGenerator 键生成器类,实现从输入的Row对象中提取键
COMMIT_METADATA_KEYPREFIX_OPT_KEY 属性:hoodie.datasource.write.commitmeta.key.prefix, 默认值:_ 以该前缀开头的选项键会自动添加到提交/增量提交的元数据中。 这对于与hudi时间轴一致的方式存储检查点信息很有用
INSERT_DROP_DUPS_OPT_KEY 属性:hoodie.datasource.write.insert.drop.duplicates, 默认值:false 如果设置为true,则在插入操作期间从传入DataFrame中过滤掉所有重复记录。
HIVE_SYNC_ENABLED_OPT_KEY 属性:hoodie.datasource.hive_sync.enable, 默认值:false 设置为true时,将数据集注册并同步到Apache Hive Metastore
HIVE_DATABASE_OPT_KEY 属性:hoodie.datasource.hive_sync.database, 默认值:default 要同步到的数据库
HIVE_TABLE_OPT_KEY 属性:hoodie.datasource.hive_sync.table, [Required] 要同步到的表
HIVE_USER_OPT_KEY 属性:hoodie.datasource.hive_sync.username, 默认值:hive 要使用的Hive用户名
HIVE_PASS_OPT_KEY 属性:hoodie.datasource.hive_sync.password, 默认值:hive 要使用的Hive密码
HIVE_URL_OPT_KEY 属性:hoodie.datasource.hive_sync.jdbcurl, 默认值:jdbc:hive2://localhost:10000 Hive metastore url
HIVE_PARTITION_FIELDS_OPT_KEY 属性:hoodie.datasource.hive_sync.partition_fields, 默认值: 数据集中用于确定Hive分区的字段。
HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY 属性:hoodie.datasource.hive_sync.partition_extractor_class, 默认值:org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor 用于将分区字段值提取到Hive分区列中的类。
HIVE_ASSUME_DATE_PARTITION_OPT_KEY 属性:hoodie.datasource.hive_sync.assume_date_partitioning, 默认值:false 假设分区格式是yyyy/mm/dd
读选项
VIEW_TYPE_OPT_KEY 属性:hoodie.datasource.view.type, 默认值:read_optimized 是否需要以某种模式读取数据,增量模式(自InstantTime以来的新数据) (或)读优化模式(基于列数据获取最新视图) (或)实时模式(基于行和列数据获取最新视图)
BEGIN_INSTANTTIME_OPT_KEY 属性:hoodie.datasource.read.begin.instanttime, [在增量模式下必须] 开始增量提取数据的即时时间。这里的instanttime不必一定与时间轴上的即时相对应。 取出以instant_time > BEGIN_INSTANTTIME写入的新数据。 例如:'20170901080000'将获取2017年9月1日08:00 AM之后写入的所有新数据。
END_INSTANTTIME_OPT_KEY 属性:hoodie.datasource.read.end.instanttime, 默认值:最新即时(即从开始即时获取所有新数据) 限制增量提取的数据的即时时间。取出以instant_time <= END_INSTANTTIME写入的新数据。
WriteClient 配置
withPath(hoodie_base_path) 属性:hoodie.base.path [必须] 创建所有数据分区所依据的基本DFS路径。 始终在前缀中明确指明存储方式(例如hdfs://,s3://等)。 Hudi将有关提交、保存点、清理审核日志等的所有主要元数据存储在基本目录下的.hoodie目录中。
withSchema(schema_str) 属性:hoodie.avro.schema [必须] 这是数据集的当前读取器的avro模式(schema)。 这是整个模式的字符串。HoodieWriteClient使用此模式传递到HoodieRecordPayload的实现,以从源格式转换为avro记录。 在更新过程中重写记录时也使用此模式。
forTable(table_name) 属性:hoodie.table.name [必须] 数据集的表名,将用于在Hive中注册。每次运行需要相同。
withBulkInsertParallelism(bulk_insert_parallelism = 1500) 属性:hoodie.bulkinsert.shuffle.parallelism 批量插入旨在用于较大的初始导入,而此处的并行度决定了数据集中文件的初始数量。 调整此值以达到在初始导入期间所需的最佳尺寸。
withParallelism(insert_shuffle_parallelism = 1500, upsert_shuffle_parallelism = 1500) 属性:hoodie.insert.shuffle.parallelism, hoodie.upsert.shuffle.parallelism 最初导入数据后,此并行度将控制用于读取输入记录的初始并行度。 确保此值足够高,例如:1个分区用于1 GB的输入数据
combineInput(on_insert = false, on_update=true) 属性:hoodie.combine.before.insert, hoodie.combine.before.upsert 在DFS中插入或更新之前先组合输入RDD并将多个部分记录合并为单个记录的标志
withWriteStatusStorageLevel(level = MEMORY_AND_DISK_SER) 属性:hoodie.write.status.storage.level HoodieWriteClient.insert和HoodieWriteClient.upsert返回一个持久的RDD[WriteStatus], 这是因为客户端可以选择检查WriteStatus并根据失败选择是否提交。这是此RDD的存储级别的配置
withAutoCommit(autoCommit = true) 属性:hoodie.auto.commit 插入和插入更新后,HoodieWriteClient是否应该自动提交。 客户端可以选择关闭自动提交,并在"定义的成功条件"下提交
withAssumeDatePartitioning(assumeDatePartitioning = false) 属性:hoodie.assume.date.partitioning HoodieWriteClient是否应该假设数据按日期划分,即从基本路径划分为三个级别。 这是支持<0.3.1版本创建的表的一个补丁。最终将被删除
withConsistencyCheckEnabled(enabled = false) 属性:hoodie.consistency.check.enabled HoodieWriteClient是否应该执行其他检查,以确保写入的文件在基础文件系统/存储上可列出。 将其设置为true可以解决S3的最终一致性模型,并确保作为提交的一部分写入的所有数据均能准确地用于查询。
索引配置
withIndexConfig (HoodieIndexConfig) 可插入以具有外部索引(HBase)或使用存储在Parquet文件中的默认布隆过滤器(bloom filter)
withIndexType(indexType = BLOOM) 属性:hoodie.index.type 要使用的索引类型。默认为布隆过滤器。可能的选项是[BLOOM | HBASE | INMEMORY]。 布隆过滤器消除了对外部系统的依赖,并存储在Parquet数据文件的页脚中
bloomFilterNumEntries(numEntries = 60000) 属性:hoodie.index.bloom.num_entries 仅在索引类型为BLOOM时适用。 这是要存储在布隆过滤器中的条目数。 我们假设maxParquetFileSize为128MB,averageRecordSize为1024B,因此,一个文件中的记录总数约为130K。 默认值(60000)大约是此近似值的一半。HUDI-56 描述了如何动态地对此进行计算。 警告:将此值设置得太低,将产生很多误报,并且索引查找将必须扫描比其所需的更多的文件;如果将其设置得非常高,将线性增加每个数据文件的大小(每50000个条目大约4KB)。
bloomFilterFPP(fpp = 0.000000001) 属性:hoodie.index.bloom.fpp 仅在索引类型为BLOOM时适用。 根据条目数允许的错误率。 这用于计算应为布隆过滤器分配多少位以及哈希函数的数量。通常将此值设置得很低(默认值:0.000000001),我们希望在磁盘空间上进行权衡以降低误报率
bloomIndexPruneByRanges(pruneRanges = true) 属性:hoodie.bloom.index.prune.by.ranges 仅在索引类型为BLOOM时适用。 为true时,从文件框定信息,可以加快索引查找的速度。 如果键具有单调递增的前缀,例如时间戳,则特别有用。
bloomIndexUseCaching(useCaching = true) 属性:hoodie.bloom.index.use.caching 仅在索引类型为BLOOM时适用。 为true时,将通过减少用于计算并行度或受影响分区的IO来缓存输入的RDD以加快索引查找
bloomIndexTreebasedFilter(useTreeFilter = true) 属性:hoodie.bloom.index.use.treebased.filter 仅在索引类型为BLOOM时适用。 为true时,启用基于间隔树的文件过滤优化。与暴力模式相比,此模式可根据键范围加快文件过滤速度
bloomIndexBucketizedChecking(bucketizedChecking = true) 属性:hoodie.bloom.index.bucketized.checking 仅在索引类型为BLOOM时适用。 为true时,启用了桶式布隆过滤。这减少了在基于排序的布隆索引查找中看到的偏差
bloomIndexKeysPerBucket(keysPerBucket = 10000000) 属性:hoodie.bloom.index.keys.per.bucket 仅在启用bloomIndexBucketizedChecking并且索引类型为bloom的情况下适用。 此配置控制“存储桶”的大小,该大小可跟踪对单个文件进行的记录键检查的次数,并且是分配给执行布隆过滤器查找的每个分区的工作单位。 较高的值将分摊将布隆过滤器读取到内存的固定成本。
bloomIndexParallelism(0) 属性:hoodie.bloom.index.parallelism 仅在索引类型为BLOOM时适用。 这是索引查找的并行度,其中涉及Spark Shuffle。 默认情况下,这是根据输入的工作负载特征自动计算的
hbaseZkQuorum(zkString) [必须] 属性:hoodie.index.hbase.zkquorum 仅在索引类型为HBASE时适用。要连接的HBase ZK Quorum URL。
hbaseZkPort(port) [必须] 属性:hoodie.index.hbase.zkport 仅在索引类型为HBASE时适用。要连接的HBase ZK Quorum端口。
hbaseZkZnodeParent(zkZnodeParent) [必须] 属性:hoodie.index.hbase.zknode.path 仅在索引类型为HBASE时适用。这是根znode,它将包含HBase创建及使用的所有znode。
hbaseTableName(tableName) [必须] 属性:hoodie.index.hbase.table 仅在索引类型为HBASE时适用。HBase表名称,用作索引。Hudi将row_key和[partition_path, fileID, commitTime]映射存储在表中。
存储选项
limitFileSize (size = 120MB) 属性:hoodie.parquet.max.file.size Hudi写阶段生成的parquet文件的目标大小。对于DFS,这需要与基础文件系统块大小保持一致,以实现最佳性能。
parquetBlockSize(rowgroupsize = 120MB) 属性:hoodie.parquet.block.size Parquet行组大小。最好与文件大小相同,以便将文件中的单个列连续存储在磁盘上
parquetPageSize(pagesize = 1MB) 属性:hoodie.parquet.page.size Parquet页面大小。页面是parquet文件中的读取单位。 在一个块内,页面被分别压缩。
parquetCompressionRatio(parquetCompressionRatio = 0.1) 属性:hoodie.parquet.compression.ratio 当Hudi尝试调整新parquet文件的大小时,预期对parquet数据进行压缩的比例。 如果bulk_insert生成的文件小于预期大小,请增加此值
parquetCompressionCodec(parquetCompressionCodec = gzip) 属性:hoodie.parquet.compression.codec Parquet压缩编解码方式名称。默认值为gzip。可能的选项是[gzip | snappy | uncompressed | lzo]
logFileMaxSize(logFileSize = 1GB) 属性:hoodie.logfile.max.size LogFile的最大大小。这是在将日志文件移到下一个版本之前允许的最大大小。
logFileDataBlockMaxSize(dataBlockSize = 256MB) 属性:hoodie.logfile.data.block.max.size LogFile数据块的最大大小。这是允许将单个数据块附加到日志文件的最大大小。 这有助于确保附加到日志文件的数据被分解为可调整大小的块,以防止发生OOM错误。此大小应大于JVM内存。
logFileToParquetCompressionRatio(logFileToParquetCompressionRatio = 0.35) 属性:hoodie.logfile.to.parquet.compression.ratio 随着记录从日志文件移动到parquet,预期会进行额外压缩的比例。 用于merge_on_read存储,以将插入内容发送到日志文件中并控制压缩parquet文件的大小。
parquetCompressionCodec(parquetCompressionCodec = gzip) 属性:hoodie.parquet.compression.codec Parquet文件的压缩编解码方式
压缩(Compaction)配置
withCleanerPolicy(policy = KEEP_LATEST_COMMITS) 属性:hoodie.cleaner.policy 要使用的清理政策。Hudi将删除旧版本的parquet文件以回收空间。 任何引用此版本文件的查询和计算都将失败。最好确保数据保留的时间超过最大查询执行时间。
retainCommits(no_of_commits_to_retain = 24) 属性:hoodie.cleaner.commits.retained 留的提交数。因此,数据将保留为num_of_commits * time_between_commits(计划的)。 这也直接转化为您可以逐步提取此数据集的数量
archiveCommitsWith(minCommits = 96, maxCommits = 128) 属性:hoodie.keep.min.commits, hoodie.keep.max.commits 每个提交都是.hoodie目录中的一个小文件。由于DFS通常不支持大量小文件,因此Hudi将较早的提交归档到顺序日志中。 提交通过重命名提交文件以原子方式发布。
withCommitsArchivalBatchSize(batch = 10) 属性:hoodie.commits.archival.batch 这控制着批量读取并一起归档的提交即时的数量。
compactionSmallFileSize(size = 0) 属性:hoodie.parquet.small.file.limit 该值应小于maxFileSize,如果将其设置为0,会关闭此功能。 由于批处理中分区中插入记录的数量众多,总会出现小文件。 Hudi提供了一个选项,可以通过将对该分区中的插入作为对现有小文件的更新来解决小文件的问题。 此处的大小是被视为“小文件大小”的最小文件大小。
insertSplitSize(size = 500000) 属性:hoodie.copyonwrite.insert.split.size 插入写入并行度。为单个分区的总共插入次数。 写出100MB的文件,至少1kb大小的记录,意味着每个文件有100K记录。默认值是超额配置为500K。 为了改善插入延迟,请对其进行调整以匹配单个文件中的记录数。 将此值设置为较小的值将导致文件变小(尤其是当compactionSmallFileSize为0时)
autoTuneInsertSplits(true) 属性:hoodie.copyonwrite.insert.auto.split Hudi是否应该基于最后24个提交的元数据动态计算insertSplitSize。默认关闭。
approxRecordSize(size = 1024) 属性:hoodie.copyonwrite.record.size.estimate 平均记录大小。如果指定,hudi将使用它,并且不会基于最后24个提交的元数据动态地计算。 没有默认值设置。这对于计算插入并行度以及将插入打包到小文件中至关重要。如上所述。
withInlineCompaction(inlineCompaction = false) 属性:hoodie.compact.inline 当设置为true时,紧接在插入或插入更新或批量插入的提交或增量提交操作之后由摄取本身触发压缩
withMaxNumDeltaCommitsBeforeCompaction(maxNumDeltaCommitsBeforeCompaction = 10) 属性:hoodie.compact.inline.max.delta.commits 触发内联压缩之前要保留的最大增量提交数
withCompactionLazyBlockReadEnabled(true) 属性:hoodie.compaction.lazy.block.read 当CompactedLogScanner合并所有日志文件时,此配置有助于选择是否应延迟读取日志块。 选择true以使用I/O密集型延迟块读取(低内存使用),或者为false来使用内存密集型立即块读取(高内存使用)
withCompactionReverseLogReadEnabled(false) 属性:hoodie.compaction.reverse.log.read HoodieLogFormatReader会从pos=0到pos=file_length向前读取日志文件。 如果此配置设置为true,则Reader会从pos=file_length到pos=0反向读取日志文件
withCleanerParallelism(cleanerParallelism = 200) 属性:hoodie.cleaner.parallelism 如果清理变慢,请增加此值。
withCompactionStrategy(compactionStrategy = org.apache.hudi.io.compact.strategy.LogFileSizeBasedCompactionStrategy) 属性:hoodie.compaction.strategy 用来决定在每次压缩运行期间选择要压缩的文件组的压缩策略。 默认情况下,Hudi选择具有累积最多未合并数据的日志文件
withTargetIOPerCompactionInMB(targetIOPerCompactionInMB = 500000) 属性:hoodie.compaction.target.io LogFileSizeBasedCompactionStrategy的压缩运行期间要花费的MB量。当压缩以内联模式运行时,此值有助于限制摄取延迟。
withTargetPartitionsPerDayBasedCompaction(targetPartitionsPerCompaction = 10) 属性:hoodie.compaction.daybased.target 由org.apache.hudi.io.compact.strategy.DayBasedCompactionStrategy使用,表示在压缩运行期间要压缩的最新分区数。
withPayloadClass(payloadClassName = org.apache.hudi.common.model.HoodieAvroPayload) 属性:hoodie.compaction.payload.class 这需要与插入/插入更新过程中使用的类相同。 就像写入一样,压缩也使用记录有效负载类将日志中的记录彼此合并,再次与基本文件合并,并生成压缩后要写入的最终记录。
压缩(Compaction)配置
withCleanerPolicy(policy = KEEP_LATEST_COMMITS) 属性:hoodie.cleaner.policy 要使用的清理政策。Hudi将删除旧版本的parquet文件以回收空间。 任何引用此版本文件的查询和计算都将失败。最好确保数据保留的时间超过最大查询执行时间。
retainCommits(no_of_commits_to_retain = 24) 属性:hoodie.cleaner.commits.retained 留的提交数。因此,数据将保留为num_of_commits * time_between_commits(计划的)。 这也直接转化为您可以逐步提取此数据集的数量
archiveCommitsWith(minCommits = 96, maxCommits = 128) 属性:hoodie.keep.min.commits, hoodie.keep.max.commits 每个提交都是.hoodie目录中的一个小文件。由于DFS通常不支持大量小文件,因此Hudi将较早的提交归档到顺序日志中。 提交通过重命名提交文件以原子方式发布。
withCommitsArchivalBatchSize(batch = 10) 属性:hoodie.commits.archival.batch 这控制着批量读取并一起归档的提交即时的数量。
compactionSmallFileSize(size = 0) 属性:hoodie.parquet.small.file.limit 该值应小于maxFileSize,如果将其设置为0,会关闭此功能。 由于批处理中分区中插入记录的数量众多,总会出现小文件。 Hudi提供了一个选项,可以通过将对该分区中的插入作为对现有小文件的更新来解决小文件的问题。 此处的大小是被视为“小文件大小”的最小文件大小。
insertSplitSize(size = 500000) 属性:hoodie.copyonwrite.insert.split.size 插入写入并行度。为单个分区的总共插入次数。 写出100MB的文件,至少1kb大小的记录,意味着每个文件有100K记录。默认值是超额配置为500K。 为了改善插入延迟,请对其进行调整以匹配单个文件中的记录数。 将此值设置为较小的值将导致文件变小(尤其是当compactionSmallFileSize为0时)
autoTuneInsertSplits(true) 属性:hoodie.copyonwrite.insert.auto.split Hudi是否应该基于最后24个提交的元数据动态计算insertSplitSize。默认关闭。
approxRecordSize(size = 1024) 属性:hoodie.copyonwrite.record.size.estimate 平均记录大小。如果指定,hudi将使用它,并且不会基于最后24个提交的元数据动态地计算。 没有默认值设置。这对于计算插入并行度以及将插入打包到小文件中至关重要。如上所述。
withInlineCompaction(inlineCompaction = false) 属性:hoodie.compact.inline 当设置为true时,紧接在插入或插入更新或批量插入的提交或增量提交操作之后由摄取本身触发压缩
withMaxNumDeltaCommitsBeforeCompaction(maxNumDeltaCommitsBeforeCompaction = 10) 属性:hoodie.compact.inline.max.delta.commits 触发内联压缩之前要保留的最大增量提交数
withCompactionLazyBlockReadEnabled(true) 属性:hoodie.compaction.lazy.block.read 当CompactedLogScanner合并所有日志文件时,此配置有助于选择是否应延迟读取日志块。 选择true以使用I/O密集型延迟块读取(低内存使用),或者为false来使用内存密集型立即块读取(高内存使用)
withCompactionReverseLogReadEnabled(false) 属性:hoodie.compaction.reverse.log.read HoodieLogFormatReader会从pos=0到pos=file_length向前读取日志文件。 如果此配置设置为true,则Reader会从pos=file_length到pos=0反向读取日志文件
withCleanerParallelism(cleanerParallelism = 200) 属性:hoodie.cleaner.parallelism 如果清理变慢,请增加此值。
withCompactionStrategy(compactionStrategy = org.apache.hudi.io.compact.strategy.LogFileSizeBasedCompactionStrategy) 属性:hoodie.compaction.strategy 用来决定在每次压缩运行期间选择要压缩的文件组的压缩策略。 默认情况下,Hudi选择具有累积最多未合并数据的日志文件
withTargetIOPerCompactionInMB(targetIOPerCompactionInMB = 500000) 属性:hoodie.compaction.target.io LogFileSizeBasedCompactionStrategy的压缩运行期间要花费的MB量。当压缩以内联模式运行时,此值有助于限制摄取延迟。
withTargetPartitionsPerDayBasedCompaction(targetPartitionsPerCompaction = 10) 属性:hoodie.compaction.daybased.target 由org.apache.hudi.io.compact.strategy.DayBasedCompactionStrategy使用,表示在压缩运行期间要压缩的最新分区数。
withPayloadClass(payloadClassName = org.apache.hudi.common.model.HoodieAvroPayload) 属性:hoodie.compaction.payload.class 这需要与插入/插入更新过程中使用的类相同。 就像写入一样,压缩也使用记录有效负载类将日志中的记录彼此合并,再次与基本文件合并,并生成压缩后要写入的最终记录。
指标配置
on(metricsOn = true) 属性:hoodie.metrics.on 打开或关闭发送指标。默认情况下处于启用状态。
withReporterType(reporterType = GRAPHITE) 属性:hoodie.metrics.reporter.type 指标报告者的类型。默认使用graphite,也是唯一支持的类型。
toGraphiteHost(host = localhost) 属性:hoodie.metrics.graphite.host 要连接的graphite主机
onGraphitePort(port = 4756) 属性:hoodie.metrics.graphite.port 要连接的graphite端口
usePrefix(prefix = "") 属性:hoodie.metrics.graphite.metric.prefix 适用于所有指标的标准前缀。这有助于添加如数据中心、环境等信息
内存配置
withMaxMemoryFractionPerPartitionMerge(maxMemoryFractionPerPartitionMerge = 0.6) 属性:hoodie.memory.merge.fraction 该比例乘以用户内存比例(1-spark.memory.fraction)以获得合并期间要使用的堆空间的最终比例
withMaxMemorySizePerCompactionInBytes(maxMemorySizePerCompactionInBytes = 1GB) 属性:hoodie.memory.compaction.fraction HoodieCompactedLogScanner读取日志块,将记录转换为HoodieRecords,然后合并这些日志块和记录。 在任何时候,日志块中的条目数可以小于或等于相应的parquet文件中的条目数。这可能导致Scanner出现OOM。 因此,可溢出的映射有助于减轻内存压力。使用此配置来设置可溢出映射的最大允许inMemory占用空间。
withWriteStatusFailureFraction(failureFraction = 0.1) 属性:hoodie.memory.writestatus.failure.fraction 此属性控制报告给驱动程序的失败记录和异常的比例配置。
插入更新
Hudi插入更新在t1表的一次提交中就进行了高达4TB的压力测试
读优化查询
对Hudi和非Hudi数据集的Hive、Presto、Spark查询,比较大的优势
管理
通过Admin CLI进行管理
建表
create --path /user/hive/warehouse/table1 --tableName hoodie_table_1 --tableType COPY_ON_WRITE
查看
desc
连接数据集
connect --path /app/uber/trips
帮助
help
检查提交
commits show --sortBy "Total Bytes Written" --desc true --limit 10
hdfs
hdfs dfs -ls /app/uber/trips/.hoodie/*.inflight
特定提交
commit showpartitions --commit 20161005165855 --sortBy "Total Bytes Written" --desc true --limit 10
文件集粒度
commit showfiles --commit 20161005165855 --sortBy "Partition Path"
文件视图
show fsview all
统计信息
stats filesizes --partitionPath 2016/09/01 --sortBy "95th" --desc true --limit 10
stats wa
压缩
compactions show all
compaction show --instant <INSTANT_1>
compaction validate --instant 20181005222611-验证压缩
hudi client
提交持续时间 - 这是成功提交一批记录所花费的时间
回滚持续时间 - 同样,撤消失败的提交所剩余的部分数据所花费的时间(每次写入失败后都会自动发生)
文件级别指标 - 显示每次提交中新增、版本、删除(清除)的文件数量
记录级别指标 - 每次提交插入/更新的记录总数
分区级别指标 - 更新的分区数量(对于了解提交持续时间的突然峰值非常有用)
故障排除
_hoodie_record_key - 作为每个DFS分区内的主键,是所有更新/插入的基础
_hoodie_commit_time - 该记录上次的提交
_hoodie_file_name - 包含记录的实际文件名(对检查重复非常有用)
_hoodie_partition_path - basePath的路径,该路径标识包含此记录的分区
Graphite指标
Hudi应用程序的Spark UI
实时计算
Hudi 和 Fink集成
实现了新的 Flink streaming writer
支持 batch 和 streaming 模式 reader
支持 Flink SQL API
操作
insert into <table> values有则更新,无则写入
存储
HDFS,Kudu,TiDB,Kafka,Hudi,MySQL
优势
Update / Delete 记录:Hudi 使用细粒度的文件/记录级别索引,来支持 Update / Delete 记录,同时还提供写操作的事务保证,支持 ACID 语义。查询会处理最后一个提交的快照,并基于此输出结果;
变更流:Hudi 对获取数据变更提供了流的支持,可以从给定的时间点获取给定表中已 updated / inserted / deleted 的所有记录的增量流,可以查询不同时间的状态数据;
技术栈统一:可以兼容我们现有的 adhoc 查询引擎 presto,spark。
社区更新迭代速度快:已经支持 Flink 两种不同方式的的读写操作,如 COW 和 MOR。
坑
Hudi 对 Flink 的支持不是很成熟,我们对 Spark - StrunctStreaming 做了大量的开发和测试
1、如果使用无分区的 COW 写入的方式,在千万级写入量的时候会发现写入越来越慢;
2、后来我们将无分区的改为增量分区的方式写入,速度提升了很多。
因为 spark 在写入时会读取 basefile 文件索引,文件越大越多,读取文件索引就会越慢
随着 Flink 对 hudi 支持越来越好
1、类冲突
2、不能找到 class 文件
3、rocksdb 冲突
4、当有依赖冲突时,我们会把 Flink 模块相关或者 Hudi 模块相关的冲突依赖 exclude 掉。
5、而如果有其他依赖包找不到的情况,我们会把相关的依赖通过 pom 文件引入进来。
checkpoint问题
checkpoint 太大导致 checkpoint 时间过长而引起的失败
我们设置状态的 TTL 时间,把全量 checkpoint 改为增量 checkpoint,且提高并行度来解决
COW 和 MOR 的选择
Hudi 表以 COW 居多,之所以选择 COW
1、第一是因为我们目前历史存量 ODS 的数据都是一次性导入到 datalake 数据表中,不存在写放大的情况。
2、另外一个原因是,COW 的工作流比较简单,不会涉及到 compaction 这样的额外操作。
hudi中间存储
MQ 不再担任实时数据仓库存储的中间存储介质,而 Hudi 存储在 HDFS 上,可以存储海量数据集;
实时数据仓库中间层可以使用 OLAP 分析引擎查询中间结果数据;
真正意义上的批流一体,数据 T+1 延迟的问题得到解决;
读时 Schema 不再需要严格定义 Schema 类型,支持 schema evolution;
支持主键索引,数据查询效率数倍增加,并且支持 ACID 语义,保证数据不重复不丢失;
Hudi 具有 Timeline 的功能,可以更多存储数据中间的状态数据,数据完备性更强。
选择理由
Hudi 提供了一个在 HDFS 中 upsert 的解决方案,即类似关系型数据库的使用体验,对于可更新数据非常友好,并且也符合 MySQL binlog 的语义。
增量查询,可以很方便的获取最近 30 分钟,或者 1 天内发生变化的数据,这对于一些可叠加的离线计算任务非常友好,不再需要针对全量数据进行计算,只需要针对变化数据进行计算,大大节省了机器资源和时间。
可以实时同步元数据到 Hive,为“入湖即可查”创造了条件。
对 COW 和 MOR 两种不同使用场景分别进行了优化。
Hudi 社区开放且迭代速度快,在其孵化阶段就被 AWS EMR 集成,然后被阿里云 DLA 数据湖分析[2]、阿里云 EMR[3]以及腾讯云 EMR[4]集成,前景不错,同时 ApacheHudi 国内技术交流群讨论非常热烈,国内基于 Hudi 构建数据湖的公司越来越多。
集成能力
0.8.0之前的版本“迫不得已”的双擎策略,事实上是非常苦恼的,运维和开发方式都无法统一
Hudi 文件为了保证元数据的一致性,在 0.8.0 版本之前不支持并发写
同时也已经在 Hudi 0.8.0 版本深度集成了 Flink 能力
Delta
简介
Delta Lake是一个开源项目,可以在数据湖之上构建Lakehouse 架构。Delta Lake 提供 ACID 事务、可扩展的元数据处理,并在现有数据湖之上统一流和批处理数据处理。
为 Apache Spark 和大数据 workloads 提供 ACID 事务能力
通过写和快照隔离之间的乐观并发控制(optimistic concurrency control),在写入数据期间提供一致性的读取,从而为构建在 HDFS 和云存储上的数据湖(data lakes)带来可靠性
Delta Lake 还提供内置数据版本控制,以便轻松回滚
优缺点
数据 merge 方面性能不如 Hudi
查询方面性能不如 Iceberg
与 Spark 的整合能力强
能力
Spark 上的 ACID 事务:可序列化的隔离级别确保读者永远不会看到不一致的数据。
可扩展的元数据处理:利用 Spark 分布式处理能力轻松处理 PB 级表的所有元数据,其中包含数十亿个文件。
流式和批处理统一:Delta Lake 中的表是批处理表,也是流式源和接收器。流数据摄取、批量历史回填、交互式查询都是开箱即用的。
架构实施:自动处理架构变化以防止在摄取期间插入不良记录。
时间旅行:数据版本控制支持回滚、完整的历史审计跟踪和可重复的机器学习实验。
Upserts 和 deletes:支持合并、更新和删除操作,以实现复杂的用例,如更改数据捕获、缓慢变化维度 (SCD) 操作、流式更新插入等。
阿里云DLA
简介
云原生数据湖分析(简称DLA)是新一代大数据解决方案,采取计算与存储完全分离的架构,支持数据库(RDS\PolarDB\NoSQL)与消息实时归档建仓,提供弹性的Spark与Presto,满足在线交互式查询、流处理、批处理、机器学习等诉求,也是传统Hadoop方案上云的有竞争力的解决方案
类型
Presto
Spark
dla+oss优势
一体化、端到端(入湖=>管理=>ETL =>分析查询),产品体验好;组件精耕细作Presto、Spark
云原生、弹性强、一分钟可弹300节点参与计算
内置大量优化+弹性,比开源自建集群至少降低50%+的成本
链路大量优化、Hudi大量优化,产品化支持(实现中)
低(即开即用、零运维成本)
支持数据库模式库、表授权模式,多租户
针对阿里云OSS & OTS &ADB 等数据源深度优化,Presto与Spark内核大量优化
支持数据源source
OSS
配置模式
数仓模式
用户直接上传数据到OSS,并期望构建可分析与计算的标准数据仓库。
自由模式
已存在OSS数据,但OSS的路径不清晰。期望通过元信息发现,构建可分析的库表分区。
RDS
Tablestore
PolarDB
Redis
MongoDB
AnalyticDB for MySQL
支持写入sink
OSS
元数据发现
OSS数据源
Tablestore数据源
SLS的OSS投递数据源
痛点
多源头数据需要统⼀存储管理,并需要便捷的融合分析。
源头数据元信息不确定或变化大,需要⾃动识别和管理;简单的元信息发现功能时效性不够。
全量建仓或直连数据库进行分析对源库造成的压⼒较大,需要卸载线上压⼒规避故障。
建仓延迟较⻓(T+1天),需要T+10m的低延迟入湖。
更新频繁致小文件多,分析性能差,需要Upsert⾃动合并。
海量数据在事务库或传统数仓中存储成本高,需要低成本归档。
源库⾏存储格式或非分析型格式,分析能力弱,需要⽀持列式存储格式。
⾃建⼤数据数据平台运维成本高,需要产品化、云原生、⼀体化的⽅案。
常见数仓的存储不开放,需要⾃建能力、开源可控。
数据入湖
DLA Lakehouse实时入湖
T+1全量同步一键建仓
T+1多库合并建仓
ActionTrail日志清洗
SQL
常见DDL
CREATE SCHEMA:创建SCHEMA/DATABASE。
CREATE TABLE:创建表。
DROP SCHEMA语法:删除SCHEMA/DATABASE。
DROP TABLE:删除表。
ALTER TABLE:更改表的结构及分区信息。
OSS的元数据操作
MSCK REPAIR TABLE:同步OSS数据源上实际的数据分区信息到元数据分区中。
MSCK REPAIR TABLE SYNC_DIR:同步OSS数据源 一个目录的分区信息到元数据分区中。
数据库的元数据
MSCK REPAIR DATABASE:自动关联源数据库的所有表。
CREATE TABLE LIKE MAPPING:自动根据源端的表的结构 推断表结构。
查询DDL
SHOW SCHEMAS:查询用户所有的SCHEMA/DATABASE。
SHOW TABLES:查询用户当前SCHEMA下的表。
SHOW CREATE TABLE:查看建表语句。
SHOW PARTITIONS:列出表的所有分区信息。
SHOW QUERY_TASK:查询用户的查询任务信息。
DML
INSERT:插入数据。
SELECT:查询。
ACL
GRANT:为账号授权。
REVOKE: 撤销账号权限。
数据类型
数值类型
TINYINT , 1 byte 有符号整数, -128 至 127
SMALLINT, 2 byte 有符号整数, -32,768 至 32,767
INT/INTEGER, 4 byte 有符号整数, -2,147,483,648 至 2,147,483,647
BIGINT, 8 byte 有符号整数, -9,223,372,036,854,775,808 至 9,223,372,036,854,775,807
FLOAT, 4 byte 单精度浮点数
DOUBLE, 8 byte 双精度浮点数
DECIMAL(p, s) ,固定精度和范围的数值类型
字符类型
STRING , 存储变长的超大文本字符,可以使用单引号或者双引号
VARCHAR, 存储变长文本,但是长度上只允许在1-65355之间
CHAR, 用来存储定长的字符
时间日期类
TIMESTAMP , 支持纳秒级别的UNIX时间戳
DATE, 用来存储日期数据:年月日,YYYY-MM-DD
集合数据
ARRAY , 格式为ARRAY,元素访问通过0下标开始
MAP , 是一组key-value元组集合,key只能是基本类型,值可以是任意类型。map的元素访问则使用[],例如map[‘key1’]
STRUCT , 类似对象,如果数据类型定义为colum1 STRUCT{var1 STRING,var2 int},使用colum1.var2 来访问
DDL
CREATE SCHEMA
CREATE TABLE
CREATE TABLE LIKE MAPPING
ALTER TABLE
DROP TABLE
DROP SCHEMA语法
CREATE VIEW
DROP VIEW
MSCK REPAIR DATABASE
MSCK REPAIR TABLE SYNC_DIR
MSCK REPAIR TABLE
SHOW
基于Tag和树状结构的库、表元信息管理
DML
SELECT
insert
KILL
函数
聚合函数: 聚合函数主要应用于一组数据计算出一个结果
IP地址解析函数: IP地址解析函数
二进制函数: 二进制函数
位运算函数: 位运算函数
转换函数: 尝试隐式转换数值类型和字符类型值到正确的类型
日期和时间函数: 日期和时间函数
漏斗数据分析函数: 漏斗数据分析函数
地理空间函数: 地理空间函数
JSON函数: JSON函数
数学函数: 数学函数
手机号码函数: 手机号码函数
事件路径分析函数: 事件路径分析函数支持按照事件时间和时间戳得到事件顺序发生的时间序列
SQL分析函数: SQL分析函数
字符串函数: 字符串函数
条件运算函数: 条件运算函数
窗口函数: 窗口函数
连接数据源
OSS
OceanBase
Kudu
HDFS
HiveMetastore
RDS
Tablestore
AnalyticDB for MySQL 2.0
AnalyticDB for MySQL 3.0
AnalyticDB for PostgreSQL
MongoDB
MaxCompute
ElasticSearch
Druid
Redis
CDN
连接
通过代码连接DLA
PreparedStatement
Druid
Druid连接池
JDBC程序连接
通过客户端连接DLA
DBeaver
Navicat for MySQL
SQL WorkBench/J
通过MySQL命令行工具连接DLA
MySQL命令行
Superset连接数据湖分析
spark开发
创建和执行Spark作业
作业配置指南
Spark UI
配置数据源网络
Spark SQL
Spark UDF
PySpark
Spark MLlib
Spark Streaming
生态工具
Airflow调度DLA Spark作业
Jupyter交互式作业开发
Spark-Submit命令行工具
Spark-SQL命令行工具
ETL调度
DataWorks
调度DLA Spark 任务
调度DLA Presto任务
循环调度DLA Presto任务
DMS
调度DLA Presto任务
调度DLA Spark任务
监控与报警
查看Presto监控
查看Spark监控
管理报警
实践
基于SLS+OSS+DLA构建海量、低成本日志分析方案
友盟数据分析
DCDN日志分析
联合查询多个MySQL实例
漏斗分析
OSS文件类型转换(SQL模式)
快速搭建DataV数据大屏
Quick BI
JSON数据行列转换
API
Serverless Spark
SubmitSparkSQL
SubmitSparkJob
GetJobStatus
GetJobDetail
GetJobLog
GetJobAttemptLog
KillSparkJob
ListSparkJob
ListSparkJobAttempt
ListSparkStatements
GetSparkStatement
ExecuteSparkStatement
CancelSparkStatement
GetSparkSessionState
服务管理
CreateInstance
ReleaseInstance
ValidateVirtualClusterName
元数据
权限管理
GrantPrivileges
RevokePrivileges
数据库管理
CreateDatabase
GetDatabase
GetAllDatabases
AlterDatabase
DropDatabase
分区管理
AddPartitions
GetPartition
GetPartitions
DropPartition
表管理
CreateTable
GetTable
GetAllTables
AlterTable
DropTable
Iceberg
简介
Netflix开发的数据湖产品
是一种可伸缩的表存储格式,内置了许多最佳实践
和HIVE一样成为开放的静态数据存储标准, 标准清晰, 和语言无关和项目无关
强大的扩展性以及可靠性: 透明简单的使用, 用户只需写入数据所有元数据的变更都是自动和底层序列化方式无关的, 支持并发写
解决存储可用性问题: 更好的schema管理方式、时间旅行、多版本回滚支持等
传统数仓痛点
流批混合的作业难以基于同一套基础组件搭建;
难以保证端到端的”有且仅有一次“和”强一致“的语义;
流批衔接,即流式数据落地,通常环节多,流程长,时效性差;
难以保证ACID事务和读写分离,导致下游出现脏读等错误;如果通过外部逻辑实现ACID事务和强一致性,会进一步加长整个流程;
已写入的数据很难修正,或者只能以数据文件甚至整个分区这种较大的粒度进行操作,费时费力;
数据落地和处理过程难以实现端到端的增量处理等。
术语
数据文件(data files)
清单文件(Manifest file)
清单列表(Manifest list)
快照(Snapshot)
快照
快照(snapshot)表示表数据文件的一个完整集合
每次更新操作会生成一个新的快照。
读取数据的时候使用当前的快照,Iceberg 使用乐观锁机制来创建新的快照,然后提交
优点
优化数据入库流程:Iceberg 提供 ACID 事务能力,上游数据写入即可见,不影响当前数据处理任务,这大大简化了 ETL;Iceberg 提供了 upsert、merge into 能力,可以极大地缩小数据入库延迟;
支持更多的分析引擎:优秀的内核抽象使之不绑定特定的计算引擎,目前 Iceberg 支持的计算引擎有 Spark、Flink、Presto 以及 Hive。
统一数据存储和灵活的文件组织:提供了基于流式的增量计算模型和基于批处理的全量表计算模型。批处理和流任务可以使用相同的存储模型,数据不再孤立;Iceberg 支持隐藏分区和分区进化,方便业务进行数据分区策略更新。支持 Parquet、Avro 以及 ORC 等存储格式。
增量读取处理能力:Iceberg 支持通过流式方式读取增量数据,支持 Structed Streaming 以及 Flink table Source。
所有的修改都是原子性的;
没有耗时的文件系统操作;
快照是索引好的,以便加速读取;
CBO metrics 信息是可靠的;
更新支持版本,支持物化视图。
新 partition 模式:避免了查询时n次调用 namenode 的 list 方法,降低 namenode 压力,提升查询性能
新 metadata 模式:文件级别列统计信息可以用来根据 where 字段进行文件过滤,很多场景下可以大大减少扫描文件数,提升查询性能
元数据组织
实现基于快照的跟踪方式;
表的元数据是不可修改的,并且始终向前迭代;
当前的快照可以回退。
表写入
Overwrit的行为dynamic overwrite,即当某个partition中含有输入DataFrame中的行的时候,该partition才会被新数据完全覆盖;其他partition则保持不变。而Spark 2.4中原生数据源(如parquet)的默认行为是static overwrite;
操作粒度是文件级别,并不是行级别;
mode必须显式指定,没有默认行为。
iceberg能力
ACID事务;
时间旅行(time travel),以访问之前版本的数据;
完备的自定义类型、分区方式和操作的抽象;
列和分区方式可以进化,而且进化对用户无感,即无需重新组织或变更数据文件;
隐式分区,使SQL不用针对分区方式特殊优化;
面向云存储的优化等;
flink集成
写入
相对底层的DataStream实现:实现SinkFunction和checkpoint相关接口
相对高层的Table和SQL实现:按照对insert、delele和update的不同要求,实现StreamTableSink相关接口
读取
相对底层的DataStream实现:实现SourceFunctin和checkpoint相关接口
相对高层的Table和SQL实现:实现StreamTableSource,并辅助以ProjectableTableSource以支持projection从Flink下推到Iceberg,以及FilterableTableSource以支持表达式下推到Iceberg
数据和元数据的压缩合并
Minor compaction:仅合并元数据文件(rewrite manifest),不操作数据文件;
Major compaction:合并元数据和数据文件。未来还需要处理数据文件和Merge-on-Read产生的delete文件的合并;
Optimization:合并元数据和数据文件的同时,清理过期的snapshot以及这些snapshot对应的元数据和数据文件。
目标
1、成为静态数据交换的开放规范,维护一个清晰的格式规范,支持多语言,支持跨项目的需求等。
2、提升扩展性和可靠性。能够在一个节点上运行,也能在集群上运行。所有的修改都是原子性的,串行化隔离。原生支持云对象存储,支持多并发写
3、修复持续的可用性问题,比如模式演进,分区隐藏,支持时间旅行、回滚等
规划
1、支持 Spark 向量化以便实现快速的 bulk read,Presto 向量化已经支持。
2、行级别的删除,支持 MERGE INTO 等
0 条评论
下一页