(六)Hive优化

2020-09-20 19:45:08 浏览数 (1)

小文件问题的影响 1.从Hive的角度看,小文件会开很多map,一个map开一个JVM去执行,所以这些任务的初始化,启动,执行会浪费大量的资源,严重影响性能。

2.在HDFS中,每个小文件对象约占150byte,如果小文件过多会占用大量内存。这样NameNode内存容量严重制约了集群的扩展。 ————————————————

小文件问题的解决方案 从小文件产生的途经就可以从源头上控制小文件数量,方法如下:

代码语言:javascript复制
1.使用Sequencefile作为表存储格式,不要用textfile,在一定程度上可以减少小文件。

2.减少reduce的数量(可以使用参数进行控制)。

3.少用动态分区,用时记得按distribute by分区。

————————————————

对于已有的小文件,我们可以通过以下几种方案解决:

代码语言:javascript复制
1.使用hadoop archive命令把小文件进行归档。

2.重建表,建表时减少reduce数量。

3.通过参数进行调节,设置map/reduce端的相关参数,如下:

设置map输入合并小文件的相关参数:

[java] view plain copy //每个Map最大输入大小(这个值决定了合并后文件的数量) set mapred.max.split.size=256000000; //一个节点上split的至少的大小(这个值决定了多个DataNode上的文件是否需要合并) set mapred.min.split.size.per.node=100000000; //一个交换机下split的至少的大小(这个值决定了多个交换机上的文件是否需要合并) set mapred.min.split.size.per.rack=100000000; //执行Map前进行小文件合并 set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;

设置map输出和reduce输出进行合并的相关参数: [java] view plain copy //设置map端输出进行合并,默认为true set hive.merge.mapfiles = true //设置reduce端输出进行合并,默认为false set hive.merge.mapredfiles = true //设置合并文件的大小 set hive.merge.size.per.task = 25610001000 //当输出文件的平均大小小于该值时,启动一个独立的MapReduce任务进行文件merge。 set hive.merge.smallfiles.avgsize=16000000

————————————————

3.Write good SQL :

说道sql优化很惭愧,自己sql很烂,不多比比了,但是sql优化确实很关键。。。

4.存储格式:

可以使用列裁剪,分区裁剪,orc,parquet等存储格式。

代码语言:javascript复制
Hive支持ORCfile,这是一种新的表格存储格式,通过诸如谓词下推,压缩等技术来提高执行速度提升。
对于每个HIVE表使用ORCFile应该是一件容易的事情,并且对于获得HIVE查询的快速响应时间非常有益。
作为一个例子,考虑两个大表A和B(作为文本文件存储,其中一些列未在此处指定,即行试存储的缺点)以及一个简单的查询,如:
SELECT A.customerID, A.name, A.age, A.address join
B.role, B.department, B.salary
ON A.customerID=B.customerID;
此查询可能需要很长时间才能执行,因为表A和B都以TEXT形式存储,进行全表扫描。
将这些表格转换为ORCFile格式通常会显着减少查询时间:

  
  
ORC支持压缩存储(使用ZLIB或如上所示使用SNAPPY),但也支持未压缩的存储。
    CREATE TABLE A_ORC (
    customerID int, name string, age int, address string
    ) STORED AS ORC tblproperties (“orc.compress" = “SNAPPY”);

    INSERT INTO TABLE A_ORC SELECT * FROM A;


    CREATE TABLE B_ORC (
    customerID int, role string, salary float, department string
    ) STORED AS ORC tblproperties (“orc.compress" = “SNAPPY”);

    INSERT INTO TABLE B_ORC SELECT * FROM B;

    SELECT A_ORC.customerID, A_ORC.name,
    A_ORC.age, A_ORC.address join
    B_ORC.role, B_ORC.department, B_ORC.salary
    ON A_ORC.customerID=B_ORC.customerID;

5.压缩格式:

压缩格式

UNIX工具

算 法

文件扩展名

可分割

DEFLATE

DEFLATE

.deflate

No

gzip

gzip

DEFLATE

.gz

No

LZ4

LZ4

.LZ4

NO

bzip

bzip

bzip

.bz2

YES

LZO

lzop

LZO

.lzo

YES if indexed

Snappy

Snappy

.snappy

NO

Size(MB).png

Time(s).png

大数据场景下存储格式压缩格式尤为关键,可以提升计算速度,减少存储空间,降低网络io,磁盘io,所以要选择合适的压缩格式和存储格式,那么首先就了解这些东西,作者以前博客已经进行了详细的说明,

可以看出压缩比越高,压缩时间越长,压缩比:Snappy < LZ4 < LZO < GZIP < BZIP2

gzip:

优点:压缩比在四种压缩方式中较高;hadoop本身支持,在应用中处理gzip格式的文件就和直接处理文本一样;有hadoop native库;大部分linux系统都自带gzip命令,使用方便。 缺点:不支持split。

lzo压缩

优点:压缩/解压速度也比较快,合理的压缩率;支持split,是hadoop中最流行的压缩格式;支持hadoop native库;需要在linux系统下自行安装lzop命令,使用方便。 缺点:压缩率比gzip要低;hadoop本身不支持,需要安装;lzo虽然支持split,但需要对lzo文件建索引,否则hadoop也是会把lzo文件看成一个普通文件(为了支持split需要建索引,需要指定inputformat为lzo格式)。

snappy压缩

优点:压缩速度快;支持hadoop native库。 缺点:不支持split;压缩比低;hadoop本身不支持,需要安装;linux系统下没有对应的命令。

bzip2压缩

优点:支持split;具有很高的压缩率,比gzip压缩率都高;hadoop本身支持,但不支持native;在linux系统下自带bzip2命令,使用方便。 缺点:压缩/解压速度慢;不支持native。 ————————————————

6.MAP JOIN

MapJoin.png

MapJoin简单说就是在Map阶段将小表读入内存,顺序扫描大表完成Join。

(1)通过MapReduce Local Task,将小表读入内存,生成HashTableFiles上传至Distributed Cache中,这里会对HashTableFiles进行压缩。 (2)MapReduce Job在Map阶段,每个Mapper从Distributed Cache读取HashTableFiles到内存中,顺序扫描大表,在Map阶段直接进行Join,将数据传递给下一个MapReduce任务。 也就是在map端进行join避免了shuffle。 ————————————————

7.引擎的选择

代码语言:javascript复制
Hive可以使用ApacheTez执行引擎而不是古老的Map-Reduce引擎。 
我不会详细讨论在这里提到的使用Tez的许多好处; 相反,我想提出一个简单的建议:
如果它没有在您的环境中默认打开,请在您的Hive查询的开头将以下内容设置为'true'来使用Tez:
设置hive.execution.engine = tez;
通过上述设置,您执行的每个HIVE查询都将利用Tez。
目前Hive On Spark还处于试验阶段,慎用。。

8.Use Vectorization

代码语言:javascript复制
向量化查询执行通过一次性批量执行1024行而不是每次单行执行,从而提高扫描,聚合,筛选器和连接等操作的性能。
在Hive 0.13中引入,此功能显着提高了查询执行时间,并可通过两个参数设置轻松启用:
设置hive.vectorized.execution.enabled = true;
设置hive.vectorized.execution.reduce.enabled = true;

9.cost based query optimization

代码语言:javascript复制
Hive 自0.14.0开始,加入了一项”Cost based Optimizer”来对HQL执行计划进行优化,这个功能通  
过”hive.cbo.enable”来开启。在Hive 1.1.0之后,这个feature是默认开启的,它可以自动优化HQL中多个JOIN的顺序,并
选择合适的JOIN算法.
Hive在提交最终执行前,优化每个查询的执行逻辑和物理执行计划。这些优化工作是交给底层来完成。
根据查询成本执行进一步的优化,从而产生潜在的不同决策:如何排序连接,执行哪种类型的连接,并行度等等。
要使用基于成本的优化(也称为CBO),请在查询开始处设置以下参数:
设置hive.cbo.enable = true;

设置hive.compute.query.using.stats = true;

设置hive.stats.fetch.column.stats = true;

设置hive.stats.fetch.partition.stats = true;

10.模式选择

本地模式

对于大多数情况,Hive可以通过本地模式在单台机器上处理所有任务。 对于小数据,执行时间可以明显被缩短。通过set hive.exec.mode.local.auto=true(默认为false)设置本地模式。 hive> set hive.exec.mode.local.auto; hive.exec.mode.local.auto=false

并行模式

Hive会将一个查询转化成一个或者多个阶段。这样的阶段可以是MapReduce阶段、抽样阶段、合并阶段、limit阶段。 默认情况下,Hive一次只会执行一个阶段,由于job包含多个阶段,而这些阶段并非完全互相依赖, 即:这些阶段可以并行执行,可以缩短整个job的执行时间。设置参数:set hive.exec.parallel=true,或者通过配置文件来完成。 hive> set hive.exec.parallel; hive.exec.parallel=false

严格模式

Hive提供一个严格模式,可以防止用户执行那些可能产生意想不到的影响查询,通过设置 Hive.mapred.modestrict来完成 set Hive.mapred.modestrict; Hive.mapred.modestrict is undefined

11.JVM重用

Hadoop通常是使用派生JVM来执行map和reduce任务的。这时JVM的启动过程可能会造成相当大的开销, 尤其是执行的job包含偶成百上千的task任务的情况。JVM重用可以使得JVM示例在同一个job中时候使用N此。 通过参数mapred.job.reuse.jvm.num.tasks来设置。

12.推测执行

Hadoop推测执行可以触发执行一些重复的任务,尽管因对重复的数据进行计算而导致消耗更多的计算资源, 不过这个功能的目标是通过加快获取单个task的结果以侦测执行慢的TaskTracker加入到没名单的方式来提高整体的任务执行效率。

Hadoop的推测执行功能由2个配置控制着,通过mapred-site.xml中配置

mapred.map.tasks.speculative.execution=true

mapred.reduce.tasks.speculative.execution=true ————————————————

Hive日常调优参数汇总

--压缩配置: -- map/reduce 输出压缩(一般采用序列化文件存储) set hive.exec.compress.output=true; set mapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec; set mapred.output.compression.type=BLOCK;

--任务中间压缩 set hive.exec.compress.intermediate=true; set hive.intermediate.compression.codec=org.apache.hadoop.io.compress.SnappyCodec; set hive.intermediate.compression.type=BLOCK;

--优化 set hive.exec.dynamic.partition.mode=nonstrict;--设置非严格模式 set hive.exec.dynamic.partition=true;--设置动态分区 set hive.exec.max.dynamic.partitions.pernode=1000;--设置动态分区每个节点最多可划分为多少个分区 set hive.exec.max.dynamic.partitions=2000;--设置动态分区时的分区最大数量 set mapred.reduce.tasks = 20;--设置reduce的任务数量,可用于优化插入分区表时的执行效率 set hive.exec.reducers.max=100;--设置reduce最大数量 set spark.executor.cores=4;--设置每个executor用的core set spark.executor.memory=8g;--设置每个executor的内存大小 set mapreduce.map.memory.mb=8192;--设置map任务的内存大小(container大小) set mapreduce.reduce.memory.mb=8192;--设置reduce任务使用内存大小 set mapred.reduce.child.java.opts=-server -Xmx4000m -Djava.net.preferIPv4Stack=true; --map端内存溢出可以参考下面两个参数 set mapred.map.child.java.opts=-server -Xmx2048m -Djava.net.preferIPv4Stack=true; set mapreduce.map.child.java.opts="-Xmx3072m" set hive.execution.engine=mr;--设置执行hive引擎为mr set hive.merge.mapredfiles= true;--合并小文件 set hive.merge.mapfiles = true; set hive.merge.size.per.task = 256000000; set hive.merge.smallfiles.avgsize = 256000000; set hive.input.format = org.apache.hadoop.hive.ql.io.CombineHiveInputFormat; set hive.optimize.cp = true; set hive.exec.parallel=true; set hive.exec.parallel.thread.number=8; set mapreduce.job.running.map.limit=10;--限制map运行数量 set hive.mapjoin.smalltable.filesize=26214400;--默认是25M set hive.exec.max.created.files = 200000;--增大hive文件创建数量 set yarn.app.mapreduce.am.resource.mb=4096; set hive.tez.java.opts=-Xmx8192m -XX:MaxPermSize=256m; SET hive.tez.container.size=10240; set mapreduce.input.fileinputformat.split.maxsize=256000000; set mapreduce.input.fileinputformat.split.minsize.per.node=256000000; set mapreduce.input.fileinputformat.split.minsize.per.rack=256000000; set hive.exec.reducers.bytes.per.reducer=5120000000;--设置每个reducer处理的数据 --总共有三种策略{"HYBRID", "BI", "ETL"}), 默认是"HYBRID","This is not a user level config. -- BI strategy is used when the requirement is to spend less time in split generation as opposed to query execution (split generation does not read or cache file footers). -- ETL strategy is used when spending little more time in split generation is acceptable (split generation reads and caches file footers). -- HYBRID chooses between the above strategies based on heuristics."), set hive.exec.orc.split.strategy=BI;(ORC split generation failed with exception: java.lang.OutOfMemoryError) set hive.mapjoin.localtask.max.memory.usage=0.999;--本地任务可以使用内存的百分比 默认值:0.90 -- map join做group by操作时,可使用多大的内存来存储数据。若数据太大则不会保存在内存里 默认值:0.55 set hive.mapjoin.followby.gby.localtask.max.memory.usage;

--本地mr设置 set hive.exec.mode.local.auto=true; --开启本地mr --设置local mr的最大输入数据量,当输入数据量小于这个值的时候会采用local mr的方式 set hive.exec.mode.local.auto.inputbytes.max=50000000; --设置local mr的最大输入文件个数,当输入文件个数小于这个值的时候会采用local mr的方式 set hive.exec.mode.local.auto.tasks.max=10; --当这三个参数同时成立时候,才会采用本地mr

set mapreduce.map.java.opts=-Xmx4096m -XX:-UseGCOverheadLimit -- GC overhead limit exceeded

set io.sort.mb=1024; --采样 set hive.limit.optimize.enable=true --- 开启对数据源进行采样的功能 set hive.limit.row.max.size --- 设置最小的采样容量 set hive.limit.optimize.limit.file --- 设置最大的采样样本数

set mapred.max.split.size=134217728; --决定每个map处理的最大的文件大小,可以根据总文件大小以及这个参数的设置调整map的数量,动态调整,当map数量比较小且执行非常慢时,可以将此参数调小 set mapred.min.split.size.per.node=1024000000;--每个节点,动态调整,当map数量比较小且执行非常慢时,可以将此参数调小 set mapred.min.split.size.per.rack=1024000000;--每个机架 --mapred.max.split.size <= mapred.min.split.size.per.node <= mapred.min.split.size.per.rack set hive.auto.convert.join=true; --hive自动识别小表,小表自动加载到内存,reduce端Common Join 转化为map join,可解决数据倾斜问题,map端jpoin --不产生shuffle set hive.skewjoin.key=100000; --这个是join的键对应的记录条数超过这个值则会进行分拆,值根据具体数据量设置 set hive.optimize.skewjoin = true;--如果是join 过程出现倾斜 应该设置为true,hive 在运行的时候没有办法判断哪个key 会产生多大的倾斜,所以使用这个参数控制倾斜的阈值,如果超过这个值,新的值会发送给那些还没有达到的reduce, 一般可以设置成(处理的总记录数/reduce个数)的2-4倍都可以接受 set hive.groupby.mapaggr.checkinterval=100000;--这个是group的键对应的记录条数超过这个值则会进行分拆,值根据具体数据量设置(map端聚合操作的记录条数) set hive.map.aggr.hash.min.reduction=0.5;--解释:预先取100000条数据聚合,如果聚合后的条数小于100000*0.5,则不再聚合。 set hive.auto.convert.join.noconditionaltask=True;--将多个map join合并为一个,Hive在基于输入文件大小的前提下将普通JOIN转换成MapJoin,并是否将多个MJ合并成一个 set hive.auto.convert.join.noconditionaltask.size=100000000;--多个mapjoin转换为1个时,所有小表的文件大小总和的最大值。 set hive.groupby.skewindata=false;--当选项设定为true,生成的查询计划会有两个MRJob。第一个MRJob 中, --Map的输出结果集合会随机分布到Reduce中,每个Reduce做部分聚合操作,并输出结果,这样处理的结果是相同的GroupBy Key --有可能被分发到不同的Reduce中,从而达到负载均衡的目的;第二个MRJob再根据预处理的数据结果按照GroupBy Key分布到 --Reduce中(这个过程可以保证相同的GroupBy Key被分布到同一个Reduce中),最后完成最终的聚合操作。 set hive.optimize.index.filter=true;--自动使用索引,使用聚合索引优化group by操作,如果是orc表,可以使用orc的索引,加快读取hive表的数据 set mapreduce.job.reduce.slowstart.completedmaps=0.8;--当Map Task完成的比例达到该值后才会为Reduce Task申请资源,默认是0.05,需要特别注意的是, --在JobImpl中,如果处于Uber模式下,会将mapreduce.job.reduce.slowstart.completedmaps参数设置为1,这很好理解,因为不管Map Task,还是Reduce Task,均是串行执行的,所以当Map Task完成的比例达到多少值后才会为Reduce Task申请资源,这个值百分百应该是1

set hive.new.job.grouping.set.cardinality = 30;--grouping sets 数量较多时即cube维度过多,这条设置的意义在于告知解释器,group by之前,每条数据复制量在30份以内。 -- 关闭hive推测执行 set hive.mapred.reduce.tasks.speculative.execution = false; set mapreduce.map.speculative = false; set mapreduce.reduce.speculative = false;

--hive on spark

set spark.executor.memory=4g; set spark.executor.cores=2; set spark.executor.instances=50; set spark.serializer=org.apache.spark.serializer.KryoSerializer; set spark.default.parallelism = 300; set spark.locality.wait = 6; set spark.locality.wait.process=6; set spark.locality.wait.node=6; set spark.locality.wait.rack=6; set spark.shuffle.consolidateFiles=true;--map端文件合并 set spark.shuffle.memoryFraction=0.5;

set mapreduce.map.java.opts=-Xmx2000m -XX:-UseGCOverheadLimit

--map倾斜(数据量大且map分配数据量不合理) set hive.exec.parallel=true; set hive.exec.parallel.thread.number=2; set hive.groupby.skewindata=true; set mapred.max.split.size=256000000; set mapred.min.split.size.per.node=256000000; set mapred.min.split.size.per.rack=256000000; set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat; -- 当mapreduce.input.fileinputformat.split.maxsize > mapreduce.input.fileinputformat.split.minsize > dfs.blockSize的情况下,此时的splitSize 将由mapreduce.input.fileinputformat.split.minsize参数决定。 -- 当mapreduce.input.fileinputformat.split.maxsize > dfs.blockSize > mapreduce.input.fileinputformat.split.minsize的情况下,此时的splitSize 将由dfs.blockSize配置决定。(第二次优化符合此种情况) -- 当dfs.blockSize > mapreduce.input.fileinputformat.split.maxsize > mapreduce.input.fileinputformat.split.minsize的情况下,此时的splitSize 将由mapreduce.input.fileinputformat.split.maxsize参数决定。 --maxSplitSize > minSplitSizeNode > minSplitSizeRack

set mapreduce.input.fileinputformat.split.maxsize=256000000;--map端文件切片大小 set mapreduce.input.fileinputformat.split.minsize.per.node=256000000;--同一节点的数据块形成切片时,切片大小的最小值 set mapreduce.input.fileinputformat.split.minsize.per.rack=256000000;--同一机架的数据块形成切片时,切片大小的最小值 ----reduce端小文件合并(即MR任务结束后进行merge) set hive.merge.mapfiles=true; set hive.merge.mapredfiles=true; set hive.merge.size.per.task=256000000; set hive.merge.smallfiles.avgsize=256000000;

set dfs.namenode.handler.count=20;--设置该值的一般原则是将其设置为集群大小的自然对数乘以20,即20logN,N为集群大小。 https://blog.csdn.net/turk/article/details/79723963 set mapreduce.task.timeout=36000000;--MapReduce设置参数防止超时 --COST BASED QUERY OPTIMIZATION(CBO) cbo可以优化hive的每次查询,使用CBO,需要开启下面四个选项。 set hive.cbo.enable=true;--如果数据已经根据相同的key做好聚合,则去除多余的map/reduce作业 set hive.compute.query.using.stats=true; set hive.stats.fetch.column.stats=true; set hive.stats.fetch.partition.stats=true;

set mapreduce.reduce.shuffle.parallelcopies=10; shuffle开启的fetcher线程数 apache默认5,choudera默认10 -- shuffle使用的内存比例。 mapreduce.reduce.shuffle.input.buffer.percent=0.6:--可以从0.2开始向上调。当只有20%的heap size分配给shuffle buffer的时候不容易出现OOM。 -- 单个shuffle任务能使用的内存限额,设置为0.15,即为 Shuffle内存 * 0.15。 -- 低于此值可以输出到内存,否则输出到磁盘。 mapreduce.reduce.shuffle.memory.limit.percent: -- shuffle的数据量到Shuffle内存 ** 0.9的时候,启动合并。 mapreduce.reduce.shuffle.merge.percent:设置为0.9。


set mapreduce.reduce.shuffle.memory.limit.percent=0.1; set hive.map.aggr.hash.percentmemory = 0.25;--Hive Map 端聚合的哈稀存储所占用虚拟机的内存比例。 当内存的Map大小,占到JVM配置的Map进程的25%的时候(默认是50%),就将这个数据flush到reducer去,以释放内存Map的空间。 set hive.map.aggr.hash.force.flush.memory.threshold=0.9 --map端做聚合操作是hash表的最大可用内容,大于该值则会触发flush set hive.ignore.mapjoin.hint=false; --(默认值:true;是否忽略mapjoin hint 即HQL 语句中的 mapjoin 标记) set hive.auto.convert.join.noconditionaltask=true; --(默认值:true;将普通的join转化为普通的mapjoin时,是否将多个mapjoin转化为一个mapjoin) set hive.auto.convert.join.noconditionaltask.size=60000000;--(将多个mapjoin转化为一个mapjoin时,其表的最大值) set hive.stats.autogather=false;--即插入数据时会优化统计,如此在大的动态分区时load数据后会有一段很长时间的统计,且操作hive元数据表,例如每个分区的文件数,行数等等。耗时比较长时可能会timeout,需要将其设成false。

-- Hadoop任务可能引起OOM错误的原因有很多。一般情况下,首先检查是否重设了hadoop参数:mapred.child.java.opts,一般设为-Xmx2000m,即使用2G的最大堆内存。 -- Hive中可能引起OOM的原因及相关的修复设定如下表所示:

-- 原因:map aggregation -- map aggregation使用哈希表存储group by/distinct key和他们的aggregation结果。 -- aggregate结果字段过多,或group by/distinct key的散度过大,可能导致内存占用过多。 -- 修复: -- 减小hive.map.aggr.hash.percentmemory设定(默认为0.5,即使用50%的child堆内存)。

-- 原因:join -- join需要cache所有相同join key的非驱动表的记录 -- 修复: -- 检查是否把大表设定为驱动表(大表写在join的最右边)。 -- 如果已经设定正确的驱动表,减小hive.join.emit.interval设定(默认为1000,即每1000行的join结果集输出一次)。

-- 原因:map join -- map join需要cache全部小表的所有数据 -- 修复: -- 检查小表是否足够小。如果小表超过1G,考虑不要使用map join。

0 人点赞