hive 处理已经存在的小文件方案

2023-07-04 14:49:53 浏览数 (2)

方案一

归档,archive。Hive 具有内置支持,可将现有分区中的文件转换为 Hadoop 存档(HAR),这样一个曾经由 100 个文件组成的分区只能占用约 3 个文件(取决于设置)。

然而,归档之后只能查询,不支持更新、写入操作。

代码语言:javascript复制
set hive.archive.enabled=true;
set hive.archive.har.parentdir.settable=true;
set har.partfile.size=1099511627776;

#对表的某个分区进行归档
alter table test_rownumber2 archive partition(dt='20230324');

#解档
alter table test_rownumber2 unarchive partition(dt='20230324');

参考:https://www.docs4dev.com/docs/zh/apache-hive/3.1.1/reference/LanguageManual_Archiving.html

方案二

对于orc文件,可以使用 hive 自带的 concatenate 命令,自动合并小文件

代码语言:javascript复制
#对于非分区表
alter table A concatenate;

#对于分区表
alter table B partition(dt='2021-05-07',hr='12') concatenate;

注意: 

1、concatenate 命令只支持 RCFILE 和 ORC 文件类型。 

2、使用concatenate命令合并小文件时不能指定合并后的文件数量,但可以多次执行该命令。 

3、当多次使用concatenate后文件数量不在变化,这个跟参数 mapreduce.input.fileinputformat.split.minsize=256mb 的设置有关,可设定每个文件的最小size。

方案三 (推荐)

使用 sql, insert overwrite  重新生成数据。

通过创建和原表表结构的中间表,再将数据从原表导入到中间表。确认数据一致后将中间表改名为原表名,将原表修改为临时表名,最后删除临时表。

代码语言:javascript复制
#1.创建临时表(创建临时表时需和原表的表结构一致),
create table test.test_table_hive_merge like test.test_table_hive;

#如果存储在cos、ofs上,创建表后可能需要修改存储路径,默认是hdfs路径。
alter table test_table_hive_merge set location 'cosn://hadoop-test-cos-1251458/warehouse/test.db/test_table_hive_merge';


#2.设置合并文件相关会话参数
set hive.exec.dynamic.partition=true; 
set hive.exec.dynamic.partition.mode=nonstrict;
set hive.exec.max.dynamic.partitions=3000;
set hive.exec.max.dynamic.partitions.pernode=5000;
set hive.merge.mapfiles=true ;
set hive.merge.mapredfiles=true ;	
set hive.merge.tezfiles=true ;
set hive.merge.sparkfiles=true ;
set hive.merge.smallfiles.avgsize=256000000;
set hive.merge.size.per.task=512000000;


#3.合并文件至临时表中,执行前保证没有数据写入原表
#如果有多级分区,将分区名放到partition中
INSERT OVERWRITE TABLE test.test_table_hive_merge  partition(batch_date) SELECT * FROM test.test_table_hive;


#4.查看原表和临时表数据量
set hive.compute.query.using.stats=false ;
set hive.fetch.task.conversion=none;
SELECT count(*) FROM test.test_table_hive;
SELECT count(*) FROM test.test_table_hive_merge;


#5.确认表数据一致后,将原表修改名称为临时表,将中表修改为原表名,使用alter语句。
alter table test.test_table_hive rename to test.test_table_hive_tmp;
alter table test.test_table_hive_merge rename to test.test_table_hive;


#6.查看合并后的分区数和小文件数量
hdfs dfs -count  cosn://hadoop-test-cos-1251458/warehouse/test.db/test_table_hive
hdfs dfs -du -h  cosn://hadoop-test-cos-1251458/warehouse/test.db/test_table_hive/batch_date=20210608
hdfs dfs -du -h  cosn://hadoop-test-cos-1251458/warehouse/test.db/test_table_hive/batch_date=20210608 | wc -l


#7.观察一段时间后再删除临时表
drop  table test.test_table_hive_tmp ;

注修改hive表名的时候,对应表的存储路径会发生变化,如果有任务上传数据到具体路径,需要注意可能需要修改。

相关脚本:

代码语言:javascript复制
#!/bin/bash

if [ ! -n "$1" ] ;then
    echo "you have not input a file! The file content is the table name, default.test "
else
    echo "the file you input is $1"
fi

hive=/usr/local/service/hive/bin/hive
#使用beeline的话需要修改hs2的地址
#hive=' /usr/local/service/hive/bin/beeline --showDbInPrompt=true -u "jdbc:hive2://10.0.0.30:7001" -n hadoop --outputformat=tsv2 --showHeader=false --silent=true '
#主要传递的参数
hive_sets=" set hive.exec.dynamic.partition=true; set hive.exec.dynamic.partition.mode=nonstrict; set hive.exec.max.dynamic.partitions=3000; set  hive.exec.max.dynamic.partitions.pernode=5000; set hive.merge.mapfiles=true; set hive.merge.mapredfiles=true; set hive.merge.tezfiles=true; set hive.merge.sparkfiles=true; set hive.merge.smallfiles.avgsize=256000000; set hive.merge.size.per.task=512000000; "
#临时分区存储文件
touch /tmp/partitions_tmp.txt  ;

#最大支持深度4级分区
funGetPart(){
    pCount=$1 ;
    tName=$2 ;
    taHe=`$3 -n 1 /tmp/partitions_tmp.txt ` ;
    if [ ${pCount} -eq 2 ];then
        partition=$(echo  ${taHe} | awk -F '=|/' '{print $1}')
    elif [ ${pCount} -eq 4 ];then
        partition=$(echo  ${taHe} | awk -F '=|/' '{print $1","$3}')
    elif [ ${pCount} -eq 6 ];then
        partition=$(echo  ${taHe} | awk -F '=|/' '{print $1","$3","$5}')
    elif [ ${pCount} -eq 8 ];then
        partition=$(echo  ${taHe} | awk -F '=|/' '{print $1","$3","$5","$7}')
    else
        echo -e "33[31m Parsing is not supported  n 33[0m"
        exit -1 ;
    fi

}

lineTables=$(cat $1)

for dataTab in ${lineTables}
do
    #判断表名是否为预期的,预期为 库名.表名
    dataCount=` echo ${dataTab}| awk -F "." '{print NF}' ` 
    if [ ${dataCount} -ne 2  ];then
       echo -e "33[31m The file content is incorrect!!! 33[0m"
       echo -e "33[31m For example: default.test  n 33[0m"
       exit -1 ;
    fi

    #判断表的分区是否正常
    $hive  -e "SHOW PARTITIONS ${dataTab} ; " > /tmp/partitions_tmp.txt
    if [ ! -s /tmp/partitions_tmp.txt ];then
        echo -e "33[31m The table => ${dataTab} <= no partitione information n 33[0m"
        exit -1 ;
    fi
    ##先判断提取的2行分区个数
    tpartCount=`(tail -n 1 /tmp/partitions_tmp.txt | awk -F '=|/' '{print NF}')`
    hpartCount=`(head -n 1 /tmp/partitions_tmp.txt | awk -F '=|/' '{print NF}')`    
    if [ "${tpartCount}" = "" ] || [ "${hpartCount}" = "" ];then
        echo -e "33[31m The table => ${dataTab} <= no partitione information n 33[0m"
        exit -1 ;
    fi
    if [ ${tpartCount} -ne ${hpartCount} ];then
        echo -e "33[31m The table => ${dataTab} <= have different number of partitions n 33[0m"
        exit -1 ;
    fi
    ##再判断提取的2行分区信息
    funGetPart ${hpartCount} ${dataTab} head
    hpartValue=${partition}
    funGetPart ${tpartCount} ${dataTab} tail
    tpartValue=${partition}
    if [ "${hpartValue}" != "${tpartValue}" ];then
        echo -e "33[31m The table => ${dataTab} <= have different value of partitions n 33[0m"
        exit -1 ;
    fi    


    echo "当前处理的表: ${dataTab} , 分区:${tpartValue} "
    echo -e "==> Create table :  ${dataTab}_merge  from: ${dataTab} n "  
    #当已经存在中间表的时候,选择是否继续执行,默认是退出执行
    #${hive} -e "create table if not exists ${dataTab}_merge like ${dataTab} ; "
    ${hive} -e "create table ${dataTab}_merge like ${dataTab} ; "
    if [ $? -ne 0 ];then
       echo -e "33[31m  ${dataTab}_merge Creat fail n 33[0m" ;
       exit -1 ;
    fi
    echo -e "==> Overwrite table :  ${dataTab}_merge , partition: ${tpartValue} , from: ${dataTab} n "  
    ${hive} -e " ${hive_sets} INSERT OVERWRITE TABLE  ${dataTab}_merge partition(${tpartValue}) SELECT * FROM ${dataTab} ; "
    if [ $? -eq 0 ];then
       echo -e "33[32m  ${dataTab}_merge Overwrite success n 33[0m" ;
       #将原表修改名称为临时表,将中间表修改为原表名
       echo -e "==> Rename  ${dataTab} To ${dataTab}_tmp n " 
       ${hive} -e " alter table ${dataTab} rename to ${dataTab}_tmp ; "
       echo -e "==> Rename  ${dataTab}_merge To ${dataTab} n " 
       ${hive} -e " alter table  ${dataTab}_merge rename to ${dataTab} ; "
       echo -e "==> 后期需要删除的表名: ${dataTab}_tmp ; drop table ${dataTab}_tmp n ; " 
    else
       echo -e "33[31m => ${dataTab} <= merged fail n 33[0m" 
       exit -1 ;
    fi

sleep 1
done
wait

方案四

对于txt格式的文件可以使用 hadoop getmerge 来合并小文件。

使用  getmerge 命令先合并数据到本地,再通过put上传数据回去。

代码语言:javascript复制
hadoop fs -getmerge  /user/hive/warehouse/xxxx.db/xxxx/pdate=20220815/*  /home/hadoop/pdate/20220815

hadoop fs -rm  /user/hive/warehouse/xxxx.db/xxxx/pdate=20220815/*

hadoop fs -mkdir -p /user/hive/warehouse/xxxx.db/xxxx/pdate=20220815

hadoop fs -put  /home/hadoop/pdate/20220815  /user/hive/warehouse/xxxx.db/xxxx/pdate=20220815/*

相关脚本参考

代码语言:javascript复制
#!/bin/bash

if [ ! -n "$1" ] ;then
    echo "you have not input a file!"
else
    echo "the file you input is $1"
fi

lineurl=$(cat $1)

hadoop=/usr/local/service/hadoop/bin/hadoop

localbak=/home/hadoop/pdate

for line in $lineurl
do

    echo "当前处理:  $line "
    file_name=`echo $line | awk -F "=" '{print $NF}'`
    table_name=`echo $line | awk -F "/" '{print $(NF-1)}'`

    mkdir -p ${localbak}/${table_name}
    echo "=> Getmerge  data to ${localbak}/${table_name}/${file_name} "  
    ${hadoop} fs -getmerge  ${line}/*  ${localbak}/${table_name}/${file_name}
    echo "=> Rm ${line} "
    ${hadoop} fs -rm ${line}/*
    ${hadoop} fs -mkdir -p ${line}
    echo "=> Put  ${localbak}/${table_name}/${file_name}  To  ${line}  "  
    ${hadoop} fs -put ${localbak}/${table_name}/${file_name}  ${line}/

    ${hadoop} fs -test -f ${line}/${file_name}
    if [ $? -eq 0 ];then
       echo -e "33[32m $line merged success n 33[0m" 
    else
       echo -e "33[31m $line merged fail n 33[0m" 
    fi

sleep 1
done
wait

参考:

https://jishuin.proginn.com/p/763bfbd631ad

https://cloud.tencent.com/developer/article/1514064

https://developer.aliyun.com/article/952137

https://www.jianshu.com/p/e5e4fd4b039c

0 人点赞