小文件问题的影响 1.从Hive的角度看,小文件会开很多map,一个map开一个JVM去执行,所以这些任务的初始化,启动,执行会浪费大量的资源,严重影响性能。
2.在HDFS中,每个小文件对象约占150byte,如果小文件过多会占用大量内存。这样NameNode内存容量严重制约了集群的扩展。 ————————————————
小文件问题的解决方案 从小文件产生的途经就可以从源头上控制小文件数量,方法如下:
代码语言:javascript复制1.使用Sequencefile作为表存储格式,不要用textfile,在一定程度上可以减少小文件。
2.减少reduce的数量(可以使用参数进行控制)。
3.少用动态分区,用时记得按distribute by分区。
————————————————
对于已有的小文件,我们可以通过以下几种方案解决:
代码语言:javascript复制1.使用hadoop archive命令把小文件进行归档。
2.重建表,建表时减少reduce数量。
3.通过参数进行调节,设置map/reduce端的相关参数,如下:
设置map输入合并小文件的相关参数:
[java] view plain copy //每个Map最大输入大小(这个值决定了合并后文件的数量) set mapred.max.split.size=256000000; //一个节点上split的至少的大小(这个值决定了多个DataNode上的文件是否需要合并) set mapred.min.split.size.per.node=100000000; //一个交换机下split的至少的大小(这个值决定了多个交换机上的文件是否需要合并) set mapred.min.split.size.per.rack=100000000; //执行Map前进行小文件合并 set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
设置map输出和reduce输出进行合并的相关参数: [java] view plain copy //设置map端输出进行合并,默认为true set hive.merge.mapfiles = true //设置reduce端输出进行合并,默认为false set hive.merge.mapredfiles = true //设置合并文件的大小 set hive.merge.size.per.task = 25610001000 //当输出文件的平均大小小于该值时,启动一个独立的MapReduce任务进行文件merge。 set hive.merge.smallfiles.avgsize=16000000
————————————————
3.Write good SQL :
说道sql优化很惭愧,自己sql很烂,不多比比了,但是sql优化确实很关键。。。
4.存储格式:
可以使用列裁剪,分区裁剪,orc,parquet等存储格式。
代码语言:javascript复制Hive支持ORCfile,这是一种新的表格存储格式,通过诸如谓词下推,压缩等技术来提高执行速度提升。
对于每个HIVE表使用ORCFile应该是一件容易的事情,并且对于获得HIVE查询的快速响应时间非常有益。
作为一个例子,考虑两个大表A和B(作为文本文件存储,其中一些列未在此处指定,即行试存储的缺点)以及一个简单的查询,如:
SELECT A.customerID, A.name, A.age, A.address join
B.role, B.department, B.salary
ON A.customerID=B.customerID;
此查询可能需要很长时间才能执行,因为表A和B都以TEXT形式存储,进行全表扫描。
将这些表格转换为ORCFile格式通常会显着减少查询时间:
ORC支持压缩存储(使用ZLIB或如上所示使用SNAPPY),但也支持未压缩的存储。
CREATE TABLE A_ORC (
customerID int, name string, age int, address string
) STORED AS ORC tblproperties (“orc.compress" = “SNAPPY”);
INSERT INTO TABLE A_ORC SELECT * FROM A;
CREATE TABLE B_ORC (
customerID int, role string, salary float, department string
) STORED AS ORC tblproperties (“orc.compress" = “SNAPPY”);
INSERT INTO TABLE B_ORC SELECT * FROM B;
SELECT A_ORC.customerID, A_ORC.name,
A_ORC.age, A_ORC.address join
B_ORC.role, B_ORC.department, B_ORC.salary
ON A_ORC.customerID=B_ORC.customerID;
5.压缩格式:
压缩格式 | UNIX工具 | 算 法 | 文件扩展名 | 可分割 |
---|---|---|---|---|
DEFLATE | 无 | DEFLATE | .deflate | No |
gzip | gzip | DEFLATE | .gz | No |
LZ4 | 无 | LZ4 | .LZ4 | NO |
bzip | bzip | bzip | .bz2 | YES |
LZO | lzop | LZO | .lzo | YES if indexed |
Snappy | 无 | Snappy | .snappy | NO |
Size(MB).png
Time(s).png
大数据场景下存储格式压缩格式尤为关键,可以提升计算速度,减少存储空间,降低网络io,磁盘io,所以要选择合适的压缩格式和存储格式,那么首先就了解这些东西,作者以前博客已经进行了详细的说明,
可以看出压缩比越高,压缩时间越长,压缩比:Snappy < LZ4 < LZO < GZIP < BZIP2
gzip:
优点:压缩比在四种压缩方式中较高;hadoop本身支持,在应用中处理gzip格式的文件就和直接处理文本一样;有hadoop native库;大部分linux系统都自带gzip命令,使用方便。 缺点:不支持split。
lzo压缩
优点:压缩/解压速度也比较快,合理的压缩率;支持split,是hadoop中最流行的压缩格式;支持hadoop native库;需要在linux系统下自行安装lzop命令,使用方便。 缺点:压缩率比gzip要低;hadoop本身不支持,需要安装;lzo虽然支持split,但需要对lzo文件建索引,否则hadoop也是会把lzo文件看成一个普通文件(为了支持split需要建索引,需要指定inputformat为lzo格式)。
snappy压缩
优点:压缩速度快;支持hadoop native库。 缺点:不支持split;压缩比低;hadoop本身不支持,需要安装;linux系统下没有对应的命令。
bzip2压缩
优点:支持split;具有很高的压缩率,比gzip压缩率都高;hadoop本身支持,但不支持native;在linux系统下自带bzip2命令,使用方便。 缺点:压缩/解压速度慢;不支持native。 ————————————————
6.MAP JOIN
MapJoin.png
MapJoin简单说就是在Map阶段将小表读入内存,顺序扫描大表完成Join。
(1)通过MapReduce Local Task,将小表读入内存,生成HashTableFiles上传至Distributed Cache中,这里会对HashTableFiles进行压缩。 (2)MapReduce Job在Map阶段,每个Mapper从Distributed Cache读取HashTableFiles到内存中,顺序扫描大表,在Map阶段直接进行Join,将数据传递给下一个MapReduce任务。 也就是在map端进行join避免了shuffle。 ————————————————
7.引擎的选择
代码语言:javascript复制Hive可以使用ApacheTez执行引擎而不是古老的Map-Reduce引擎。
我不会详细讨论在这里提到的使用Tez的许多好处; 相反,我想提出一个简单的建议:
如果它没有在您的环境中默认打开,请在您的Hive查询的开头将以下内容设置为'true'来使用Tez:
设置hive.execution.engine = tez;
通过上述设置,您执行的每个HIVE查询都将利用Tez。
目前Hive On Spark还处于试验阶段,慎用。。
8.Use Vectorization
代码语言:javascript复制向量化查询执行通过一次性批量执行1024行而不是每次单行执行,从而提高扫描,聚合,筛选器和连接等操作的性能。
在Hive 0.13中引入,此功能显着提高了查询执行时间,并可通过两个参数设置轻松启用:
设置hive.vectorized.execution.enabled = true;
设置hive.vectorized.execution.reduce.enabled = true;
9.cost based query optimization
代码语言:javascript复制Hive 自0.14.0开始,加入了一项”Cost based Optimizer”来对HQL执行计划进行优化,这个功能通
过”hive.cbo.enable”来开启。在Hive 1.1.0之后,这个feature是默认开启的,它可以自动优化HQL中多个JOIN的顺序,并
选择合适的JOIN算法.
Hive在提交最终执行前,优化每个查询的执行逻辑和物理执行计划。这些优化工作是交给底层来完成。
根据查询成本执行进一步的优化,从而产生潜在的不同决策:如何排序连接,执行哪种类型的连接,并行度等等。
要使用基于成本的优化(也称为CBO),请在查询开始处设置以下参数:
设置hive.cbo.enable = true;
设置hive.compute.query.using.stats = true;
设置hive.stats.fetch.column.stats = true;
设置hive.stats.fetch.partition.stats = true;
10.模式选择
本地模式
对于大多数情况,Hive可以通过本地模式在单台机器上处理所有任务。 对于小数据,执行时间可以明显被缩短。通过set hive.exec.mode.local.auto=true(默认为false)设置本地模式。 hive> set hive.exec.mode.local.auto; hive.exec.mode.local.auto=false
并行模式
Hive会将一个查询转化成一个或者多个阶段。这样的阶段可以是MapReduce阶段、抽样阶段、合并阶段、limit阶段。 默认情况下,Hive一次只会执行一个阶段,由于job包含多个阶段,而这些阶段并非完全互相依赖, 即:这些阶段可以并行执行,可以缩短整个job的执行时间。设置参数:set hive.exec.parallel=true,或者通过配置文件来完成。 hive> set hive.exec.parallel; hive.exec.parallel=false
严格模式
Hive提供一个严格模式,可以防止用户执行那些可能产生意想不到的影响查询,通过设置 Hive.mapred.modestrict来完成 set Hive.mapred.modestrict; Hive.mapred.modestrict is undefined
11.JVM重用
Hadoop通常是使用派生JVM来执行map和reduce任务的。这时JVM的启动过程可能会造成相当大的开销, 尤其是执行的job包含偶成百上千的task任务的情况。JVM重用可以使得JVM示例在同一个job中时候使用N此。 通过参数mapred.job.reuse.jvm.num.tasks来设置。
12.推测执行
Hadoop推测执行可以触发执行一些重复的任务,尽管因对重复的数据进行计算而导致消耗更多的计算资源, 不过这个功能的目标是通过加快获取单个task的结果以侦测执行慢的TaskTracker加入到没名单的方式来提高整体的任务执行效率。
Hadoop的推测执行功能由2个配置控制着,通过mapred-site.xml中配置
mapred.map.tasks.speculative.execution=true
mapred.reduce.tasks.speculative.execution=true ————————————————
Hive日常调优参数汇总
--压缩配置: -- map/reduce 输出压缩(一般采用序列化文件存储) set hive.exec.compress.output=true; set mapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec; set mapred.output.compression.type=BLOCK;
--任务中间压缩 set hive.exec.compress.intermediate=true; set hive.intermediate.compression.codec=org.apache.hadoop.io.compress.SnappyCodec; set hive.intermediate.compression.type=BLOCK;
--优化 set hive.exec.dynamic.partition.mode=nonstrict;--设置非严格模式 set hive.exec.dynamic.partition=true;--设置动态分区 set hive.exec.max.dynamic.partitions.pernode=1000;--设置动态分区每个节点最多可划分为多少个分区 set hive.exec.max.dynamic.partitions=2000;--设置动态分区时的分区最大数量 set mapred.reduce.tasks = 20;--设置reduce的任务数量,可用于优化插入分区表时的执行效率 set hive.exec.reducers.max=100;--设置reduce最大数量 set spark.executor.cores=4;--设置每个executor用的core set spark.executor.memory=8g;--设置每个executor的内存大小 set mapreduce.map.memory.mb=8192;--设置map任务的内存大小(container大小) set mapreduce.reduce.memory.mb=8192;--设置reduce任务使用内存大小 set mapred.reduce.child.java.opts=-server -Xmx4000m -Djava.net.preferIPv4Stack=true; --map端内存溢出可以参考下面两个参数 set mapred.map.child.java.opts=-server -Xmx2048m -Djava.net.preferIPv4Stack=true; set mapreduce.map.child.java.opts="-Xmx3072m" set hive.execution.engine=mr;--设置执行hive引擎为mr set hive.merge.mapredfiles= true;--合并小文件 set hive.merge.mapfiles = true; set hive.merge.size.per.task = 256000000; set hive.merge.smallfiles.avgsize = 256000000; set hive.input.format = org.apache.hadoop.hive.ql.io.CombineHiveInputFormat; set hive.optimize.cp = true; set hive.exec.parallel=true; set hive.exec.parallel.thread.number=8; set mapreduce.job.running.map.limit=10;--限制map运行数量 set hive.mapjoin.smalltable.filesize=26214400;--默认是25M set hive.exec.max.created.files = 200000;--增大hive文件创建数量 set yarn.app.mapreduce.am.resource.mb=4096; set hive.tez.java.opts=-Xmx8192m -XX:MaxPermSize=256m; SET hive.tez.container.size=10240; set mapreduce.input.fileinputformat.split.maxsize=256000000; set mapreduce.input.fileinputformat.split.minsize.per.node=256000000; set mapreduce.input.fileinputformat.split.minsize.per.rack=256000000; set hive.exec.reducers.bytes.per.reducer=5120000000;--设置每个reducer处理的数据 --总共有三种策略{"HYBRID", "BI", "ETL"}), 默认是"HYBRID","This is not a user level config. -- BI strategy is used when the requirement is to spend less time in split generation as opposed to query execution (split generation does not read or cache file footers). -- ETL strategy is used when spending little more time in split generation is acceptable (split generation reads and caches file footers). -- HYBRID chooses between the above strategies based on heuristics."), set hive.exec.orc.split.strategy=BI;(ORC split generation failed with exception: java.lang.OutOfMemoryError) set hive.mapjoin.localtask.max.memory.usage=0.999;--本地任务可以使用内存的百分比 默认值:0.90 -- map join做group by操作时,可使用多大的内存来存储数据。若数据太大则不会保存在内存里 默认值:0.55 set hive.mapjoin.followby.gby.localtask.max.memory.usage;
--本地mr设置 set hive.exec.mode.local.auto=true; --开启本地mr --设置local mr的最大输入数据量,当输入数据量小于这个值的时候会采用local mr的方式 set hive.exec.mode.local.auto.inputbytes.max=50000000; --设置local mr的最大输入文件个数,当输入文件个数小于这个值的时候会采用local mr的方式 set hive.exec.mode.local.auto.tasks.max=10; --当这三个参数同时成立时候,才会采用本地mr
set mapreduce.map.java.opts=-Xmx4096m -XX:-UseGCOverheadLimit -- GC overhead limit exceeded
set io.sort.mb=1024; --采样 set hive.limit.optimize.enable=true --- 开启对数据源进行采样的功能 set hive.limit.row.max.size --- 设置最小的采样容量 set hive.limit.optimize.limit.file --- 设置最大的采样样本数
set mapred.max.split.size=134217728; --决定每个map处理的最大的文件大小,可以根据总文件大小以及这个参数的设置调整map的数量,动态调整,当map数量比较小且执行非常慢时,可以将此参数调小 set mapred.min.split.size.per.node=1024000000;--每个节点,动态调整,当map数量比较小且执行非常慢时,可以将此参数调小 set mapred.min.split.size.per.rack=1024000000;--每个机架 --mapred.max.split.size <= mapred.min.split.size.per.node <= mapred.min.split.size.per.rack set hive.auto.convert.join=true; --hive自动识别小表,小表自动加载到内存,reduce端Common Join 转化为map join,可解决数据倾斜问题,map端jpoin --不产生shuffle set hive.skewjoin.key=100000; --这个是join的键对应的记录条数超过这个值则会进行分拆,值根据具体数据量设置 set hive.optimize.skewjoin = true;--如果是join 过程出现倾斜 应该设置为true,hive 在运行的时候没有办法判断哪个key 会产生多大的倾斜,所以使用这个参数控制倾斜的阈值,如果超过这个值,新的值会发送给那些还没有达到的reduce, 一般可以设置成(处理的总记录数/reduce个数)的2-4倍都可以接受 set hive.groupby.mapaggr.checkinterval=100000;--这个是group的键对应的记录条数超过这个值则会进行分拆,值根据具体数据量设置(map端聚合操作的记录条数) set hive.map.aggr.hash.min.reduction=0.5;--解释:预先取100000条数据聚合,如果聚合后的条数小于100000*0.5,则不再聚合。 set hive.auto.convert.join.noconditionaltask=True;--将多个map join合并为一个,Hive在基于输入文件大小的前提下将普通JOIN转换成MapJoin,并是否将多个MJ合并成一个 set hive.auto.convert.join.noconditionaltask.size=100000000;--多个mapjoin转换为1个时,所有小表的文件大小总和的最大值。 set hive.groupby.skewindata=false;--当选项设定为true,生成的查询计划会有两个MRJob。第一个MRJob 中, --Map的输出结果集合会随机分布到Reduce中,每个Reduce做部分聚合操作,并输出结果,这样处理的结果是相同的GroupBy Key --有可能被分发到不同的Reduce中,从而达到负载均衡的目的;第二个MRJob再根据预处理的数据结果按照GroupBy Key分布到 --Reduce中(这个过程可以保证相同的GroupBy Key被分布到同一个Reduce中),最后完成最终的聚合操作。 set hive.optimize.index.filter=true;--自动使用索引,使用聚合索引优化group by操作,如果是orc表,可以使用orc的索引,加快读取hive表的数据 set mapreduce.job.reduce.slowstart.completedmaps=0.8;--当Map Task完成的比例达到该值后才会为Reduce Task申请资源,默认是0.05,需要特别注意的是, --在JobImpl中,如果处于Uber模式下,会将mapreduce.job.reduce.slowstart.completedmaps参数设置为1,这很好理解,因为不管Map Task,还是Reduce Task,均是串行执行的,所以当Map Task完成的比例达到多少值后才会为Reduce Task申请资源,这个值百分百应该是1
set hive.new.job.grouping.set.cardinality = 30;--grouping sets 数量较多时即cube维度过多,这条设置的意义在于告知解释器,group by之前,每条数据复制量在30份以内。 -- 关闭hive推测执行 set hive.mapred.reduce.tasks.speculative.execution = false; set mapreduce.map.speculative = false; set mapreduce.reduce.speculative = false;
--hive on spark
set spark.executor.memory=4g; set spark.executor.cores=2; set spark.executor.instances=50; set spark.serializer=org.apache.spark.serializer.KryoSerializer; set spark.default.parallelism = 300; set spark.locality.wait = 6; set spark.locality.wait.process=6; set spark.locality.wait.node=6; set spark.locality.wait.rack=6; set spark.shuffle.consolidateFiles=true;--map端文件合并 set spark.shuffle.memoryFraction=0.5;
set mapreduce.map.java.opts=-Xmx2000m -XX:-UseGCOverheadLimit
--map倾斜(数据量大且map分配数据量不合理) set hive.exec.parallel=true; set hive.exec.parallel.thread.number=2; set hive.groupby.skewindata=true; set mapred.max.split.size=256000000; set mapred.min.split.size.per.node=256000000; set mapred.min.split.size.per.rack=256000000; set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat; -- 当mapreduce.input.fileinputformat.split.maxsize > mapreduce.input.fileinputformat.split.minsize > dfs.blockSize的情况下,此时的splitSize 将由mapreduce.input.fileinputformat.split.minsize参数决定。 -- 当mapreduce.input.fileinputformat.split.maxsize > dfs.blockSize > mapreduce.input.fileinputformat.split.minsize的情况下,此时的splitSize 将由dfs.blockSize配置决定。(第二次优化符合此种情况) -- 当dfs.blockSize > mapreduce.input.fileinputformat.split.maxsize > mapreduce.input.fileinputformat.split.minsize的情况下,此时的splitSize 将由mapreduce.input.fileinputformat.split.maxsize参数决定。 --maxSplitSize > minSplitSizeNode > minSplitSizeRack
set mapreduce.input.fileinputformat.split.maxsize=256000000;--map端文件切片大小 set mapreduce.input.fileinputformat.split.minsize.per.node=256000000;--同一节点的数据块形成切片时,切片大小的最小值 set mapreduce.input.fileinputformat.split.minsize.per.rack=256000000;--同一机架的数据块形成切片时,切片大小的最小值 ----reduce端小文件合并(即MR任务结束后进行merge) set hive.merge.mapfiles=true; set hive.merge.mapredfiles=true; set hive.merge.size.per.task=256000000; set hive.merge.smallfiles.avgsize=256000000;
set dfs.namenode.handler.count=20;--设置该值的一般原则是将其设置为集群大小的自然对数乘以20,即20logN,N为集群大小。 https://blog.csdn.net/turk/article/details/79723963 set mapreduce.task.timeout=36000000;--MapReduce设置参数防止超时 --COST BASED QUERY OPTIMIZATION(CBO) cbo可以优化hive的每次查询,使用CBO,需要开启下面四个选项。 set hive.cbo.enable=true;--如果数据已经根据相同的key做好聚合,则去除多余的map/reduce作业 set hive.compute.query.using.stats=true; set hive.stats.fetch.column.stats=true; set hive.stats.fetch.partition.stats=true;
set mapreduce.reduce.shuffle.parallelcopies=10; shuffle开启的fetcher线程数 apache默认5,choudera默认10 -- shuffle使用的内存比例。 mapreduce.reduce.shuffle.input.buffer.percent=0.6:--可以从0.2开始向上调。当只有20%的heap size分配给shuffle buffer的时候不容易出现OOM。 -- 单个shuffle任务能使用的内存限额,设置为0.15,即为 Shuffle内存 * 0.15。 -- 低于此值可以输出到内存,否则输出到磁盘。 mapreduce.reduce.shuffle.memory.limit.percent: -- shuffle的数据量到Shuffle内存 ** 0.9的时候,启动合并。 mapreduce.reduce.shuffle.merge.percent:设置为0.9。
set mapreduce.reduce.shuffle.memory.limit.percent=0.1; set hive.map.aggr.hash.percentmemory = 0.25;--Hive Map 端聚合的哈稀存储所占用虚拟机的内存比例。 当内存的Map大小,占到JVM配置的Map进程的25%的时候(默认是50%),就将这个数据flush到reducer去,以释放内存Map的空间。 set hive.map.aggr.hash.force.flush.memory.threshold=0.9 --map端做聚合操作是hash表的最大可用内容,大于该值则会触发flush set hive.ignore.mapjoin.hint=false; --(默认值:true;是否忽略mapjoin hint 即HQL 语句中的 mapjoin 标记) set hive.auto.convert.join.noconditionaltask=true; --(默认值:true;将普通的join转化为普通的mapjoin时,是否将多个mapjoin转化为一个mapjoin) set hive.auto.convert.join.noconditionaltask.size=60000000;--(将多个mapjoin转化为一个mapjoin时,其表的最大值) set hive.stats.autogather=false;--即插入数据时会优化统计,如此在大的动态分区时load数据后会有一段很长时间的统计,且操作hive元数据表,例如每个分区的文件数,行数等等。耗时比较长时可能会timeout,需要将其设成false。
-- Hadoop任务可能引起OOM错误的原因有很多。一般情况下,首先检查是否重设了hadoop参数:mapred.child.java.opts,一般设为-Xmx2000m,即使用2G的最大堆内存。 -- Hive中可能引起OOM的原因及相关的修复设定如下表所示:
-- 原因:map aggregation -- map aggregation使用哈希表存储group by/distinct key和他们的aggregation结果。 -- aggregate结果字段过多,或group by/distinct key的散度过大,可能导致内存占用过多。 -- 修复: -- 减小hive.map.aggr.hash.percentmemory设定(默认为0.5,即使用50%的child堆内存)。
-- 原因:join -- join需要cache所有相同join key的非驱动表的记录 -- 修复: -- 检查是否把大表设定为驱动表(大表写在join的最右边)。 -- 如果已经设定正确的驱动表,减小hive.join.emit.interval设定(默认为1000,即每1000行的join结果集输出一次)。
-- 原因:map join -- map join需要cache全部小表的所有数据 -- 修复: -- 检查小表是否足够小。如果小表超过1G,考虑不要使用map join。