hive调优
2022-04-07 13:22:28 0 举报
AI智能生成
hive调优记录整理总结
作者其他创作
大纲/内容
6.向量化
Hive中的向量化查询执行大大减少了典型查询操作(如扫描,过滤器,聚合和连接)的CPU使用率。
标准查询执行系统一次处理一行,在处理下一行之前,单行数据会被查询中的所有运算符进行处理,导致CPU使用效率非常低。在向量化查询执行中,数据行被批处理在一起(默认=> 1024行),表示为一组列向量。
要使用向量化查询执行,必须以ORC格式(CDH 5)存储数据,并设置以下变量。
SET hive.vectorized.execution.enabled=true
在CDH 6中默认启用Hive查询向量化,启用查询向量化后,还可以设置其他属性来调整查询向量化的方式,具体可以参考cloudera官网
7.谓词下推
默认生成的执行计划会在可见的位置执行过滤器,但在某些情况下,某些过滤器表达式可以被推到更接近首次看到此特定数据的运算符的位置。
select a.*, b.* from a join b on (a.col1 = b.col1)where a.col1 > 15 and b.col2 > 16
如果没有谓词下推,则在完成JOIN处理之后将执行过滤条件**(a.col1> 15和b.col2> 16)**。因此,在这种情况下,JOIN将首先发生,并且可能产生更多的行,然后在进行过滤操作。
使用谓词下推,这两个谓词**(a.col1> 15和b.col2> 16)**将在JOIN之前被处理,因此它可能会从a和b中过滤掉连接中较早处理的大部分数据行,因此,建议启用谓词下推。
通过将hive.optimize.ppd设置为true可以启用谓词下推
SET hive.optimize.ppd=true
8.输入格式选择
Hive支持TEXTFILE, SEQUENCEFILE, AVRO, RCFILE, ORC,以及PARQUET文件格式,可以通过两种方式指定表的文件格式:
CREATE TABLE … STORE AS :即在建表时指定文件格式,默认是TEXTFILE
ALTER TABLE … [PARTITION partition_spec] SET FILEFORMAT :修改具体表的文件格式
ALTER TABLE … [PARTITION partition_spec] SET FILEFORMAT :修改具体表的文件格式
如果未指定文件存储格式,则默认使用的是参数hive.default.fileformat设定的格式。
如果数据存储在小于块大小的小文件中,则可以使用SEQUENCE文件格式。如果要以减少存储空间并提高性能的优化方式存储数据,则可以使用ORC文件格式,而当列中嵌套的数据过多时,Parquet格式会很有用。因此,需要根据拥有的数据确定输入文件格式。
9.启动严格模式
如果要查询分区的Hive表,但不提供分区谓词(分区列条件),则在这种情况下,将针对该表的所有分区发出查询,这可能会非常耗时且占用资源。因此,我们将下面的属性定义为strict,以指示在分区表上未提供分区谓词的情况下编译器将引发错误。
SET hive.partition.pruning=strict
10.基于成本的优化
Hive在提交最终执行之前会优化每个查询的逻辑和物理执行计划。基于成本的优化会根据查询成本进行进一步的优化,从而可能产生不同的决策:比如如何决定JOIN的顺序,执行哪种类型的JOIN以及并行度等。
可以通过设置以下参数来启用基于成本的优化。
set hive.cbo.enable=true;
set hive.compute.query.using.stats=true;
set hive.stats.fetch.column.stats=true;
set hive.stats.fetch.partition.stats=true;
set hive.compute.query.using.stats=true;
set hive.stats.fetch.column.stats=true;
set hive.stats.fetch.partition.stats=true;
可以使用统计信息来优化查询以提高性能。基于成本的优化器(CBO)还使用统计信息来比较查询计划并选择最佳计划。通过查看统计信息而不是运行查询,效率会很高。
收集表的列统计信息:
ANALYZE TABLE mytable COMPUTE STATISTICS FOR COLUMNS;
查看my_db数据库中my_table中my_id列的列统计信息:
DESCRIBE FORMATTED my_db.my_table my_id
1.多次INSERT单次扫描表
FROM my_table
INSERT INTO temp_table_20201115 SELECT * WHERE dt ='2020-11-15'
INSERT INTO temp_table_20201116 SELECT * WHERE dt ='2020-11-16';
INSERT INTO temp_table_20201115 SELECT * WHERE dt ='2020-11-15'
INSERT INTO temp_table_20201116 SELECT * WHERE dt ='2020-11-16';
2.分区表
CREATE TABLE table_name (
col1 data_type,
col2 data_type
)PARTITIONED BY (
partition1 data_type, partition2 data_type,….
);
col1 data_type,
col2 data_type
)PARTITIONED BY (
partition1 data_type, partition2 data_type,….
);
3.分桶表
通常,当很难在列上创建分区时,我们会使用分桶,比如某个经常被筛选的字段,如果将其作为分区字段,会造成大量的分区。在Hive中,会对分桶字段进行哈希,从而提供了中额外的数据结构,进行提升查询效率。
与分区表类似,分桶表的组织方式是将HDFS上的文件分割成多个文件。分桶可以加快数据采样,也可以提升join的性能(join的字段是分桶字段),因为分桶可以确保某个key对应的数据在一个特定的桶内(文件),所以巧妙地选择分桶字段可以大幅度提升join的性能。通常情况下,分桶字段可以选择经常用在过滤操作或者join操作的字段。
我们可以使用set.hive.enforce.bucketing = true启用分桶设置。
当使用分桶表时,最好将bucketmapjoin标志设置为true,具体配置参数为
SET hive.optimize.bucketmapjoin = true
CREATE TABLE table_name PARTITIONED BY (partition1 data_type, partition2 data_type,….) CLUSTERED BY (column_name1, column_name2, …) SORTED BY (column_name [ASC|DESC], …)] INTO num_buckets BUCKETS;
4.对中间数据启用压缩
复杂的Hive查询通常会转换为一系列多阶段的MapReduce作业,并且这些作业将由Hive引擎连接起来以完成整个查询。因此,此处的“中间输出”是指上一个MapReduce作业的输出,它将用作下一个MapReduce作业的输入数据。
压缩可以显著减少中间数据量,从而在内部减少了Map和Reduce之间的数据传输量。
set hive.exec.compress.intermediate=true;
set hive.intermediate.compression.codec=org.apache.hadoop.io.compress.SnappyCodec;
set hive.intermediate.compression.type=BLOCK;
set hive.intermediate.compression.codec=org.apache.hadoop.io.compress.SnappyCodec;
set hive.intermediate.compression.type=BLOCK;
为了将最终输出到HDFS的数据进行压缩,可以使用以下属性
set hive.exec.compress.output=true;
下面是一些可以使用的压缩编解码器
org.apache.hadoop.io.compress.DefaultCodec
org.apache.hadoop.io.compress.GzipCodec
org.apache.hadoop.io.compress.BZip2Codec
com.hadoop.compression.lzo.LzopCodec
org.apache.hadoop.io.compress.Lz4Codec
org.apache.hadoop.io.compress.SnappyCodec
org.apache.hadoop.io.compress.GzipCodec
org.apache.hadoop.io.compress.BZip2Codec
com.hadoop.compression.lzo.LzopCodec
org.apache.hadoop.io.compress.Lz4Codec
org.apache.hadoop.io.compress.SnappyCodec
5.Map端JOIN
map端join适用于当一张表很小(可以存在内存中)的情况,即可以将小表加载至内存。Hive从0.7开始支持自动转为map端join,具体配置如下:
SET hive.auto.convert.join=true;
-- hivev0.11.0之后默认trueSET
hive.mapjoin.smalltable.filesize=600000000;
-- 默认 25m
SET hive.auto.convert.join.noconditionaltask=true;
-- 默认true,所以不需要指定map join hint
SET hive.auto.convert.join.noconditionaltask.size=10000000;
-- 控制加载到内存的表的大小
-- hivev0.11.0之后默认trueSET
hive.mapjoin.smalltable.filesize=600000000;
-- 默认 25m
SET hive.auto.convert.join.noconditionaltask=true;
-- 默认true,所以不需要指定map join hint
SET hive.auto.convert.join.noconditionaltask.size=10000000;
-- 控制加载到内存的表的大小
一旦开启map端join配置,Hive会自动检查小表是否大于hive.mapjoin.smalltable.filesize配置的大小,如果大于则转为普通的join,如果小于则转为map端join。
首先,Task A(客户端本地执行的task)负责读取小表a,并将其转成一个HashTable的数据结构,写入到本地文件,之后将其加载至分布式缓存。
然后,Task B任务会启动map任务读取大表b,在Map阶段,根据每条记录与分布式缓存中的a表对应的hashtable关联,并输出结果
注意:map端join没有reduce任务,所以map直接输出结果,即有多少个map任务就会产生多少个结果文件。
数据倾斜
空值引发的数据倾斜
第一种:可以直接不让null值参与join操作,即不让null值有shuffle阶段
第二种:因为null值参与shuffle时的hash结果是一样的,那么我们可以给null值随机赋值,这样它们的hash结果就不一样,就会进到不同的reduce中:
SELECT *
FROM log a
LEFT JOIN users b ON CASE
WHEN a.user_id IS NULL THEN concat('hive_', rand())
ELSE a.user_id
END = b.user_id;
SELECT *
FROM log a
LEFT JOIN users b ON CASE
WHEN a.user_id IS NULL THEN concat('hive_', rand())
ELSE a.user_id
END = b.user_id;
不同数据类型引发的数据倾斜
如果key字段既有string类型也有int类型,默认的hash就都会按int类型来分配,那我们直接把int类型都转为string就好了,这样key字段都为string,hash时就按照string类型分配了:
不可拆分大文件引发的数据倾斜
这种数据倾斜问题没有什么好的解决方案,只能将使用GZIP压缩等不支持文件分割的文件转为bzip和zip等支持文件分割的压缩方式。
所以,我们在对文件进行压缩时,为避免因不可拆分大文件而引发数据读取的倾斜,在数据压缩的时候可以采用bzip2和Zip等支持文件分割的压缩算法。
所以,我们在对文件进行压缩时,为避免因不可拆分大文件而引发数据读取的倾斜,在数据压缩的时候可以采用bzip2和Zip等支持文件分割的压缩算法。
数据膨胀引发的数据倾斜
在Hive中可以通过参数 hive.new.job.grouping.set.cardinality 配置的方式自动控制作业的拆解,该参数默认值是30。表示针对grouping sets/rollups/cubes这类多维聚合的操作,如果最后拆解的键组合大于该值,会启用新的任务去处理大于该值之外的组合。如果在处理数据时,某个分组聚合的列有较大的倾斜,可以适当调小该值。
表连接时引发的数据倾斜
通常做法是将倾斜的数据存到分布式缓存中,分发到各个Map任务所在节点。在Map阶段完成join操作,即MapJoin,这避免了 Shuffle,从而避免了数据倾斜。
MapJoin是Hive的一种优化操作,其适用于小表JOIN大表的场景,由于表的JOIN操作是在Map端且在内存进行的,所以其并不需要启动Reduce任务也就不需要经过shuffle阶段,从而能在一定程度上节省资源提高JOIN效率。
在Hive 0.11版本及之后,Hive默认启动该优化,也就是不在需要显示的使用MAPJOIN标记,其会在必要的时候触发该优化操作将普通JOIN转换成MapJoin,可以通过以下两个属性来设置该优化的触发时机:
hive.auto.convert.join=true 默认值为true,自动开启MAPJOIN优化。
hive.mapjoin.smalltable.filesize=2500000 默认值为2500000(25M),通过配置该属性来确定使用该优化的表的大小,如果表的大小小于此值就会被加载进内存中。
hive.auto.convert.join=true 默认值为true,自动开启MAPJOIN优化。
hive.mapjoin.smalltable.filesize=2500000 默认值为2500000(25M),通过配置该属性来确定使用该优化的表的大小,如果表的大小小于此值就会被加载进内存中。
注意:使用默认启动该优化的方式如果出现莫名其妙的BUG(比如MAPJOIN并不起作用),就将以下两个属性置为fase手动使用MAPJOIN标记来启动该优化:
hive.auto.convert.join=false (关闭自动MAPJOIN转换操作)
hive.ignore.mapjoin.hint=false (不忽略MAPJOIN标记)
再提一句:将表放到Map端内存时,如果节点的内存很大,但还是出现内存溢出的情况,我们可以通过这个参数 mapreduce.map.memory.mb 调节Map端内存的大小。
hive.auto.convert.join=false (关闭自动MAPJOIN转换操作)
hive.ignore.mapjoin.hint=false (不忽略MAPJOIN标记)
再提一句:将表放到Map端内存时,如果节点的内存很大,但还是出现内存溢出的情况,我们可以通过这个参数 mapreduce.map.memory.mb 调节Map端内存的大小。
确实无法减少数据量引发的数据倾斜
这类问题最直接的方式就是调整reduce所执行的内存大小。
调整reduce的内存大小使用mapreduce.reduce.memory.mb这个配置。
调整reduce的内存大小使用mapreduce.reduce.memory.mb这个配置。
refer:https://mp.weixin.qq.com/s/9sVQrx4zC3ijTZgbrcqzFA
0 条评论
下一页