(学习之路)Hive数据倾斜解决办法

2023-02-25 18:48:50 浏览数 (2)

‍大家好,我是小轩

hive是基于大数据开发的一组用于数据仓库的api,其主要功能是将HQL(HIVE SQL)转换成MapReduce执行。所以对hive的优化几乎等于对MapReduce的优化,主要在io和数据倾斜方面进行优化。

本文主要在以下几个方面进行介绍

  • 合并小文件
  • 压缩文件
  • join倾斜优化
  • group by倾斜优化
  • 合并小文件

map针对每一个文件产生一个或多个map任务,如果输入小文件过多,则会产生许多map任务处理每个小文件,严重耗费了资源。

通过如下设置可以对输入小文件进行合并操作

代码语言:javascript复制
 set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
  • 压缩文件

可以通过压缩中间文件减少io消耗,提高效率

hive中存储格式和压缩格式如下:

  • 存储格式
  1. Text File text格式,此为默认的格式。可以使用Gzip或者Bzip2压缩格式,不支持分割
  2. SequenceFile 二进制文件格式,支持NONE/RECORD/BLOCK压缩格式
  3. RCFile
  4. Avro Files
  5. ORC Files
  6. Parquet 列存储格式,推荐使用此种文件格式
  7. Custom INPUTFORMAT and OUTPUTFORMAT 用户自定义文件格式
  • 压缩格式

压缩格式主要有 bzip2、gzip、lzo、snappy等

在进行shuffle中,由于进行数据传输,会产生较大的io。此时对map输出文件进行压缩,能够减小数据文件大小,降低io,提高执行效率,一般建议采用SnappyCodec压缩格式,此格式有较高的压缩比和低cpu消耗

代码语言:javascript复制
set hive. exec.compress.intermediate=true;
set hive.intermediate.compression.codec=org.apache.hadoop.io.compress.SnappyCodec;

数据倾斜指由于数据表中某些值数据量较大时,导致某些reducer上数据量较大。

在执行过程中会出现其它reducer都已完成,某些reducer还在执行且进度条一直呈现99%,严重影响了整个任务的执行效率。数据倾斜优化就是要解决某些值数据量较大的情况。

  • join 倾斜优化

join过程中出现的数据倾斜,具体解决办法为转map join和设置参数优化,关于join分为以下几种

  • map join

当大表和小表join出现数据倾斜时,可以将小表缓存至内存,在map端进行join操作,设置如下:

代码语言:javascript复制
set hive. auto.convert. join.noconditionaltask = true;
set hive. auto.convert. join.noconditionaltask.size = 10000000;
代码语言:javascript复制
set hive.auto.convert.join=true;
set hive.mapjoin.smalltable.filesize=25M;
  • hive.optimize.skew join 参数

如果大表和大表进行join操作,则可采用skew join

skew join原理

  1. 对于skewjoin.key,在执行job时,将它们存入临时的HDFS目录。其它数据正常执行
  2. 对倾斜数据开启map join操作,对非倾斜值采取普通join操作
  3. 将倾斜数据集和非倾斜数据及进行合并操作

相关文档:

https://weidongzhou.wordpress.com/2017/06/08/join-type-in-hive-skewed-join/

https://cwiki.apache.org/confluence/display/Hive/Skewed Join Optimization

  • Join Type in Hive : Skewed Join

https://weidongzhou.wordpress.com/2017/06/08/join-type-in-hive-skewed-join/

1、hive.optimize.skewjoin.compiletime

如果建表语句元数据中指定了skew key,则使用set hive.optimize.skewjoin.compiletime=true开启skew join。

可以通过如下建表语句指定SKEWED key:

代码语言:javascript复制
CREATE TABLE list_bucket_single ( key STRING, value STRING)
SKEWED BY ( key) ON ( 1, 5, 6) [STORED AS DIRECTORIES];

2、hive.optimize.skewjoin

该参数为在运行时动态指定数据进行skewjoin,一般和hive.skewjoin.key参数一起使用

代码语言:javascript复制
set hive.optimize.skewjoin= true;
set hive.skewjoin. key= 100000;

以上参数表示当记录条数超过100000时采用skewjoin操作

3、区别

hive.optimize.skewjoin.compiletime和hive.optimize.skewjoin区别为前者为编译时参数,后者为运行时参数。前者在生成执行计划时根据元数据生成skewjoin,此参数要求倾斜值一定;后者为运行过程中根据数据条数进行skewjoin优化。hive.optimize.skewjoin实际上应该重名为为hive.optimize.skewjoin.runtime参数,考虑兼容性没有进行重命名

  • group by 倾斜优化

group by语句中出现的倾斜,通过改变写法或参数设置

  • 写法调整

对于确定的倾斜值,先均匀分布到各个reducer上,然后开启新一轮reducer进行统计操作。写法如下

代码语言:javascript复制
  -- 正常写法
select 
  key, count( 1) as cnt 
from tb_name 
group by key;

-- 改进后写法
select a. key
       , sum(cnt) as cnt
    from ( select key
               , if( key = 'key001',random(),0)
               , count( 1) as cnt
            from tb_name
          group by key, 
                    if( key = 'key001',random(),0)
         ) t
    group by t. key;
  • 参数设置

如果在不确定倾斜值的情况下,可以设置hive.groupby.skewindata参数

代码语言:javascript复制
set hive.groupby.skewindata= true;
select key
    , count( 1) as cnt
  from tb_name
group by key;

其原理和上述写法调整中类似,是先对key值进行均匀分布,然后开启新一轮reducer求值

以上优化方式为一般且常见的优化方式,对于具体问题应该进行具体分析

hive在跑数据时经常会出现数据倾斜的情况,使的作业经常reduce完成在99%后一直卡住,最后的1%花了几个小时都没跑完,这种情况就很可能是数据倾斜的原因,解决方法要根据具体情况来选择具体的方案

1、join的key值发生倾斜,key值包含很多空值或是异常值

这种情况可以对异常值赋一个随机值来分散key

如:

代码语言:javascript复制
select userid , name
from user_info a
join (
select  case  when userid  is  null  then  cast ( rand ( 47 )* 100000  as i nt )
else userid end
from user_read_log
)b  on a . userid  = b . userid

通过rand函数将为null的值分散到不同的值上,在key值比较就能解决数据倾斜的问题

注:对于异常值如果不需要的话,最好是提前过滤掉,这样计算量可以大大减少

2、当key值都是有效值时,解决办法为设置以下几个参数

代码语言:javascript复制
set hive.exec.reducers.bytes.per.reducer = 1000000000

也就是每个节点的reduce 默认是处理1G大小的数据,如果你的join 操作也产生了数据倾斜,那么你可以在hive 中设定

代码语言:javascript复制
set hive.optimize.skewjoin = true;
set hive.skewjoin.key = skew_key_threshold (default = 100000)

hive 在运行的时候没有办法判断哪个key 会产生多大的倾斜,所以使用这个参数控制倾斜的阈值,如果超过这个值,新的值会发送给那些还没有达到的reduce, 一般可以设置成你

(处理的总记录数/reduce个数)的2-4倍都可以接受.

倾斜是经常会存在的,一般select 的层数超过2层,翻译成执行计划多于3个以上的MapReducejob 都很容易产生倾斜,建议每次运行比较复杂的sql 之前都可以设一下这个参数. 如果你不知道设置多少,可以就按官方默认的1个reduce 只处理1G 的算法,那么 skew_key_threshold = 1G/平均行长. 或者默认直接设成250000000 (差不多算平均行长4个字节)

3、reduce数太少

代码语言:javascript复制
set mapred.reduce.tasks=800;

默认是先设置这个参数:

hive.exec.reducers.bytes.per.reducer,设置了后hive会自动计算reduce的个数,因此两个参数一般不同时使用

4、对于group by 产生倾斜的问题

set hive.map.aggr=true (开启map端combiner);

//在Map端做combiner,假如map各条数据基本上不一样, 聚合没什么意义,做combiner反而画蛇添足,hive里也考虑的比较周到通过参数

代码语言:javascript复制
hive.groupby.mapaggr.checkinterval = 100000 (默认)
hive.map.aggr.hash.min.reduction=0.5(默认)

两个参数的意思是:预先取100000条数据聚合,如果聚合后的条数/100000>0.5,则不再聚合

set hive.groupby.skewindata=true;// 决定 group by 操作是否支持倾斜的数据。注意:只能对单个字段聚合. 控制生成两个MR Job,第一个MR Job Map的输出结果随机分配到reduce做次预汇总,减少某些key值条数过多某些key条数过小造成的数据倾斜问题

5、小表与大表关联

此时,可以通过mapjoin来优化

代码语言:javascript复制
set hive.auto. convert . join =  true ; //将小表刷入内存中  
set hive.mapjoin.smalltable.filesize = 2500000 ;//刷入内存表的大小(字节)

0 人点赞