上一遍记录了当时集群资源死锁的问题,后来想了想其实小文件较多也会让集群变慢,小文件较多在执行作业时rpc时间就会增加,从而拖垮了job的执行速度。
常见的小文件处理策略
目前比较常见的小文件处理策略主要包含以下几种:
1. 在数据进入集群之前,将小文件进行合并
2. 小文件写入集群之后,定期合并小文件
3. 使用HBase存储数据
4. 使用HAR格式
1.1写入前合并
这种方式,很容易理解,但是在实际实现过程中往往比较难实现。例如,实时系统中,往往因为时间间隔小,而导致数据通常都比较小。
1.2写入后合并
这种方式,是目前最经常使用 的方式。通常使用一个MR任务来对小文件进行合并操作,也就是将多个小文件合并成为大文件,然后删除原有小文件的操作。对于部分计算引擎,本身支持对结果文件进行合并的功能,例如 Hive。
1.3使用HBase存储数据
HBase本身具有Compacation机制,会对数据进行归并的操作。因此能够比较好的规避小文件的问题,但是HBase的数据存储适合固定场景,不能够满足所有场景的需求。
1.4使用Hadoop Archive
Hadoop Archive是一种特殊的数据格式,通常一个Hadoop Archive对应了一组数据目录,同时包含数据信息以及相应的索引数据。
可以使用如下的命令创建Hadoop Archive
hadoop archive -archiveName name -p <parent> [-r <replication factor>] <src>* <dest> |
---|
创建完成后的har文件,可以像使用正常hadoop命令来进行访问,在MR中访问也可以像正常HDFS文件一样,区别是需要更换一个协议。
har:///archivepath/fileinarchive |
---|
写入后合并
2.1运算结果数据小文件
解决运算结果数据小文件的问题,需要从以下两个角度进行考虑:
1. 对于后续会生成的运算结果,需要在运算的过程中控制结果文件的大小。
2. 对于已经在集群上的运算结果,采取文件合并的方式
由于不同的引擎,相应使用的方法不同,目前集群主要使用了hive,Impala,Spark进行数据计算。接下来分别介绍这三种引擎的小文件处理方式。
2.2Hive
Hive支持结果文件的合并功能,可以通过设定参数,在任务最后,对结果文件进行合并。以下是常用的设定。
参数设定 | 参数含义 | 默认值 |
---|---|---|
hive.merge.mapfiles | Merge small files at the end of a map-only job. | True |
hive.merge.mapredfiles | Merge small files at the end of a map-reduce job. | False |
hive.merge.size.per.task | Size of merged files at the end of the job. | 256000000 |
hive.merge.smallfiles.avgsize | When the average output file size of a job is less than this number, Hive will start an additional map-reduce job to merge the output files into bigger files. This is only done for map-only jobs if hive.merge.mapfiles is true, and for map-reduce jobs if hive.merge.mapredfiles is true. | 16000000 |
hive.merge.orcfile.stripe.level | When hive.merge.mapfiles, hive.merge.mapredfiles or hive.merge.tezfiles is enabled while writing a table with ORC file format, enabling this configuration property will do stripe-level fast merge for small ORC files. | True |
因此,如果希望对Hive的运算结果文件大小进行控制,
只需要在运行hive sql时添加如下设定即可启用小文件合并功能。
SET hive.merge.size.per.task=256000000; SET hive.merge.smallfiles.avgsize=16000000000; SET hive.merge.mapfiles=true; SET hive.merge.mapredfiles=true;
Hive分区小文件合并
如果是数据已经运算完毕,小文件已经产生,可以通过如下的语句将小文件进行合并。
SET hive.merge.size.per.task=256000000; SET hive.merge.smallfiles.avgsize=16000000000; SET hive.merge.mapfiles=true; SET hive.merge.mapredfiles=true; Insert overwrite table t partition (partcol1=val1 …) select * from table t where partcol1=val1;
即将分区的数据进行重新,于此同时启用hive合并小文件的功能。
Hive on Spark
和传统的Hive on MR类似,Hive on Spark同样支持小文件合并功能。可以通过设置hive.merge.sparkfiles=true,来启用该功能。
https://issues.apache.org/jira/browse/HIVE-8043,Support merging small files[Spark Branch]提供了小文件合并的功能,从Hive1.1.0版本开始支持。
因此,与Hive on MR类似。在进行数据运算时,可以通过添加该参数来实现最终的小文件合并。
如果,小文件已经生成,可以通过如下的语句重新分区。
SET hive.merge.sparkfiles=true SET hive.merge.size.per.task=256000000; SET hive.merge.smallfiles.avgsize=16000000000; Insert overwrite table t partition (partcol1=val1 …) select * from table t where partcol1=val1;
Impala
由于Hive和Impala互通元数据库,并且,Hive可以直接操作Impala的数据,因此可以使用Hive的方式对Impala数据进行合并。
对于parquet文件格式,可以通过如下的设定,设定单个Parquet文件的大小。
例如
set PARQUET_FILE_SIZE=512m; INSERT OVERWRITE parquet_table SELECT * FROM text_table; |
---|
1.1.1.2 Spark
Spark在进行运算时,往往因为尽量并行化的需求,partition比较多,最终生成的结果按照Partition生成了很多碎小的结果文件,也是导致Spark结果文件比较小的主要原因。这种情况下,可以调用下面的方法,将分区缩小,从而将最终的结果文件个数会少,但是相对每个文件都会大很多。
DataFrame.coalesceval numbersDf2 = numbersDf.coalesce(2) |
---|
该方法会按照指定的partition数目,产生一个新的DataFrame,通常用于缩减Partition数目。