关于较大规模hadoop集群的小文件问题

2021-01-06 17:44:17 浏览数 (1)

上一遍记录了当时集群资源死锁的问题,后来想了想其实小文件较多也会让集群变慢,小文件较多在执行作业时rpc时间就会增加,从而拖垮了job的执行速度。

常见的小文件处理策略

目前比较常见的小文件处理策略主要包含以下几种:

1. 在数据进入集群之前,将小文件进行合并

2. 小文件写入集群之后,定期合并小文件

3. 使用HBase存储数据

4. 使用HAR格式

1.1写入前合并

这种方式,很容易理解,但是在实际实现过程中往往比较难实现。例如,实时系统中,往往因为时间间隔小,而导致数据通常都比较小。

1.2写入后合并

这种方式,是目前最经常使用 的方式。通常使用一个MR任务来对小文件进行合并操作,也就是将多个小文件合并成为大文件,然后删除原有小文件的操作。对于部分计算引擎,本身支持对结果文件进行合并的功能,例如 Hive。

1.3使用HBase存储数据

HBase本身具有Compacation机制,会对数据进行归并的操作。因此能够比较好的规避小文件的问题,但是HBase的数据存储适合固定场景,不能够满足所有场景的需求。

1.4使用Hadoop Archive

Hadoop Archive是一种特殊的数据格式,通常一个Hadoop Archive对应了一组数据目录,同时包含数据信息以及相应的索引数据。

可以使用如下的命令创建Hadoop Archive

hadoop archive -archiveName name -p <parent> [-r <replication factor>] <src>* <dest>

创建完成后的har文件,可以像使用正常hadoop命令来进行访问,在MR中访问也可以像正常HDFS文件一样,区别是需要更换一个协议。

har:///archivepath/fileinarchive

写入后合并

2.1运算结果数据小文件

解决运算结果数据小文件的问题,需要从以下两个角度进行考虑:

1. 对于后续会生成的运算结果,需要在运算的过程中控制结果文件的大小。

2. 对于已经在集群上的运算结果,采取文件合并的方式

由于不同的引擎,相应使用的方法不同,目前集群主要使用了hive,Impala,Spark进行数据计算。接下来分别介绍这三种引擎的小文件处理方式。

2.2Hive

      Hive支持结果文件的合并功能,可以通过设定参数,在任务最后,对结果文件进行合并。以下是常用的设定。

参数设定

参数含义

默认值

hive.merge.mapfiles

Merge small files at the end of a map-only job.

True

hive.merge.mapredfiles

Merge small files at the end of a map-reduce job.

False

hive.merge.size.per.task

Size of merged files at the end of the job.

256000000

hive.merge.smallfiles.avgsize

When the average output file size of a job is less than this number, Hive will start an additional map-reduce job to merge the output files into bigger files. This is only done for map-only jobs if hive.merge.mapfiles is true, and for map-reduce jobs if hive.merge.mapredfiles is true.

16000000

hive.merge.orcfile.stripe.level

When hive.merge.mapfiles, hive.merge.mapredfiles or hive.merge.tezfiles is enabled while writing a table with ORC file format, enabling this configuration property will do stripe-level fast merge for small ORC files.

True

因此,如果希望对Hive的运算结果文件大小进行控制,

只需要在运行hive sql时添加如下设定即可启用小文件合并功能。

SET hive.merge.size.per.task=256000000; SET hive.merge.smallfiles.avgsize=16000000000; SET hive.merge.mapfiles=true; SET hive.merge.mapredfiles=true;

Hive分区小文件合并

如果是数据已经运算完毕,小文件已经产生,可以通过如下的语句将小文件进行合并。

SET hive.merge.size.per.task=256000000; SET hive.merge.smallfiles.avgsize=16000000000; SET hive.merge.mapfiles=true; SET hive.merge.mapredfiles=true; Insert overwrite table  t  partition (partcol1=val1 …) select * from table t where partcol1=val1;

即将分区的数据进行重新,于此同时启用hive合并小文件的功能。

Hive on Spark

和传统的Hive on MR类似,Hive on Spark同样支持小文件合并功能。可以通过设置hive.merge.sparkfiles=true,来启用该功能。

https://issues.apache.org/jira/browse/HIVE-8043,Support merging small files[Spark Branch]提供了小文件合并的功能,从Hive1.1.0版本开始支持。

因此,与Hive on MR类似。在进行数据运算时,可以通过添加该参数来实现最终的小文件合并。

如果,小文件已经生成,可以通过如下的语句重新分区。

SET hive.merge.sparkfiles=true SET hive.merge.size.per.task=256000000; SET hive.merge.smallfiles.avgsize=16000000000; Insert overwrite table  t  partition (partcol1=val1 …) select * from table t where partcol1=val1;

Impala

由于Hive和Impala互通元数据库,并且,Hive可以直接操作Impala的数据,因此可以使用Hive的方式对Impala数据进行合并。

对于parquet文件格式,可以通过如下的设定,设定单个Parquet文件的大小。

例如

set PARQUET_FILE_SIZE=512m; INSERT OVERWRITE parquet_table SELECT * FROM text_table;

1.1.1.2 Spark

Spark在进行运算时,往往因为尽量并行化的需求,partition比较多,最终生成的结果按照Partition生成了很多碎小的结果文件,也是导致Spark结果文件比较小的主要原因。这种情况下,可以调用下面的方法,将分区缩小,从而将最终的结果文件个数会少,但是相对每个文件都会大很多。

DataFrame.coalesceval numbersDf2 = numbersDf.coalesce(2)

该方法会按照指定的partition数目,产生一个新的DataFrame,通常用于缩减Partition数目。

0 人点赞