大厂都在用的Hive优化

2021-03-05 14:35:19 浏览数 (1)

前言

Hive作为大数据分析领域常用的仓库工具,即使是现在流式计算如火如荼背景下,Hive依然倍受各大厂商挚爱。使用Hive过程中,面对各种各样的查询需求,需要具有针对性的优化下面内容就给大家分别介绍下。

1. 启用压缩

压缩可以使磁盘上的数据量变小,例如,文本文件格式能够压缩40%甚至更高的比例,这样可以通过降低I/O来提高查询速度。除非产生的数据用于外部系统,或者存在格式兼容性问题,建议总是启用压缩。压缩与解压缩会消耗CPU资源,但是Hive产生的作业往往是IO密集型的。因此CPU开销通常不是问题。

1.1 查询出可用的编解码器:

代码语言:javascript复制
set io.compression.codecs;

结果如下:

代码语言:javascript复制
io.compression.codecs=org.apache.hadoop.io.compress.GzipCodec,
          org.apache.hadoop.io.compress.DefaultCodec,
          org.apache.hadoop.io.compress.BZip2Codec,
          org.apache.hadoop.io.compress.SnappyCodec

1.2 启用中间数据压缩。

一个复杂的Hive查询再提交后,通常被转化成一系列中间阶段的MapReduce作业,Hive引擎将这些作业串联起来完成整个查询。可以将这些中间数据进行压缩。这里所说的中间数据指的是上一个MR作业的输出,这个输出将会被下一个MR作业作为输入数据使用。

代码语言:javascript复制
 set hive.exec.compress.intermediate=true;
 set hive.intermediate.compression.codec=org.apache.hadoop.io.compress.SnappyCodec;
 set hive.intermediate.compression.type=BLOCK;

1.3 启用结果压缩

当Hive输出接入到表中时,输出内容同样可以进行压缩。

代码语言:javascript复制
 set hive.exec.compress.output=true;
 set mapreduce.output.fileoutputformat.compress=true;
 set mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.SnappyCodec;
 set mapreduce.output.fileoutputformat.compress.type=BLOCK;

2. 优化连接查询

可以通过配置Map连接和倾斜连接的相关属性提升连接查询的性能。

2.1 自动Map连接

当连接一个大表和一个小表时,自动Map连接是一个非常有用特性。如果启动该特性,小表将保存在每个节点的本地缓存中,并在Map节点与大表进行连接。开启自动Map连接提供了两个好处。首先,将小标装进缓存将节省每个数据节点上的读取时间。其次,它避免了Hive查询中的倾斜连接,因为每个数据块的连接操作已经在Map阶段完成了。

代码语言:javascript复制
set hive.auto.convert.join=true;
set hive.auto.convert.join.noconditionaltask=true;
set hive.auto.convert.join.noconditionaltask.size=100000000;
set hive.auto.convert.join.use.nonstaged=true;

其中:

  • hive.auto.convert.join:是否启用基于输入文件的大小,将普通连接转化为Map连接的优化机制。
  • hive.auto.convert.join.noconditionaltask:是否启用基于输入文件的大小,将普通连接转化为Map连接的优化机制。假设参与连接的表(或分区)有N个,如果打开这个 参数,并且有N-1个表(或分区)的大小总和小于hive.auto.convert.join.noconditionaltask.size参数指定的值,那么会直接将连接转为Map连接。
  • hive.auto.convert.join.noconditionaltask.size:如果hive.auto.convert.join.noconditionaltask是关闭的,则本参数不起作用。否则,如果参与连接的N个表(或分区)中的N-1个 的总大小小于这个参数的值,则直接将连接转为Map连接。默认值为10MB。
  • hive.auto.convert.join.use.nonstaged:对于条件连接,如果从一个小的输入流可以直接应用于join操作而不需要过滤或者投影,那么不需要通过MapReduce的本地任务在 分布式缓存中预存。当前该参数在vectorization或tez执行引擎中不工作。

2.2 倾斜连接

两个大表连接时,会先基于连接键分别对两个表进行排序,然后连接它们。Mapper将特定键值的所有行发送给同一个Reducer。例如:表A的id列有1,2,3,4四个值,表B的id有1,2,3三个值,查询语句如下:

代码语言:javascript复制
select A.id from A join B on A.id = B.id

一系列Mapper读取表中的数据并基于键发送给Reducer。如id=1行进入Reducer R1,id = 2的行进入Reducer R2的行等。这些Reducer产生A B的交集并输出。Reducer R4只从A获取行,不产生查询结果。

现在假设id=1的数据行是高度倾斜的,则R2和R3会很快完成,而R1需要很长时间,将成为整个查询的瓶颈。配置倾斜连接的相关属性可以有效优化倾斜连接。

代码语言:javascript复制
set hive.optimize.skewjoin=true;
set hive.skewjoin.key=1000000;
set hive.skewjoin.mapjoin.map.tasks=10000;
set hive.skewjoin.mapjoin.min.split=33554432;

其中:

  • hive.optimize.skewjoin:是否为连接表中的倾斜键创建单独的执行计划。它基于存储在元数据中的倾斜键。在编译时,Hive为倾斜键和其他键值生成各自的查询计划。
  • hive.skewjoin.key:决定如何确定连接中的倾斜键。在连接操作中,如果同一键值所对应的数据行数超过该参数值,则认为该键是一个倾斜连接键。
  • hive.skewjoin.mapjoin.map.tasks:指定倾斜连接中,用于Map连接作业的任务数。该参数应该与hive.skewjoin.mapjoin.min.split一起使用,执行细粒度的控制。
  • hive.skewjoin.mapjoin.min.split:通过指定最小split的大小,确定Map连接作业的任务数。该参数应该与hive.skewjoin.mapjoin.map.tasks一起使用,执行细粒度的控制。

2.3 桶Map连接

如果连接中使用的表是特定列分桶的,可以开启桶Map连接提升性能。

代码语言:javascript复制
set hive.optimize.bucketmapjoin=true;
set hive.optimize.bucketmapjoin.sortedmerge=true;

其中:

  • hive.optimize.bucketmapjoin:是否尝试桶Map连接。
  • hive.optimize.bucketmapjoin.sortedmerge:是否尝试在Map连接中使用归并排序。

3. 避免使用Order by 全局排序

Hive中使用order by 子句实现全局排序,order by只用一个reduce产生结果,对于大数据集,这种做法效率很低。如果不需要全局有序,则可以使用sort by子句,该子句为每个reduce生成一个排好序的文件。如果需要控制一个特定数据行流向哪个reducer,可以使用用distribute by 子句。如:

代码语言:javascript复制
select id,name,salary, dept from employee distribute by dept sort by id asc, name desc;

属于一个dept的数据会分配到一个reducer进行处理,同一个dept的所有记录会按照id,name列排序。最终的结果集是全局有序的。

4. 启用Tex或者Spark执行引擎。

代码语言:javascript复制
set hive.execution.engine=tex;
或者
set hive.execution.engine=spark;

5. 优化Limit操作

默认时limit操作仍然会执行整个查询,然后返回限定的行数。在有些情况下这种处理方式很浪费,因此可以通过设置下面的属性避免此行为。

代码语言:javascript复制
set hive.limit.optimize.enable=true;
set hive.limit.row.max.size=true;
set hive.limit.optimize.limit.file=10;
set hive.limit.optimize.fetch.max=50000;

其中:

  • hive.limit.optimize.enable:是否启用limit优化。当使用limit语句时,对源数据进行抽样。
  • hive.limit.row.max.size:在使用limit做数据的子集查询时保证的最大行数据量。
  • hive.limit.optimize.limit.file:在使用limit做数据子集查询时,采样的最大文件数。
  • hive.limit.optimize.fetch.max:使用简单limit数据抽样时,允许的最大行数。

6. 启用并行执行

每条Hive SQL语句都被转化成一个或者多个阶段执行,可能是一个MapReduce阶段,采样阶段,限制阶段等。默认时,Hive在任意时刻只能执行其中一个阶段。如果组成一个特定作业的多个执行阶段是彼此独立的,那么它们可以并行执行,从而整个作业得以更快完成。设置下面的属性启用并执行。

代码语言:javascript复制
set hive.exec.parallel=true;
set hive.exec.parallel.thread.number=8;

其中:

  • hive.exec.parallel:是否并行执行作业。
  • hive.exec.parallel.thread.number:最多可以并行执行的作业数。

7. 启用MapReduce严格模式

Hive提供了一个严格模式,可以防止用户执行那些可能产生负面影响的查询。通过设置下面的属性启用MapReduce严格模式。

代码语言:javascript复制
set hive.mapred.mode=strict

严格模式禁止3种类型的查询

  • 对于分区表,where子句中不包含分区字段过滤条件的查询不允许执行。
  • 对于使用了order by子句的查询,要求必须使用limit子句,否则不允许执行。
  • 限制笛卡尔积查询。

8. 使用单一Reducer执行多个Group By

通过为group by操作开启单一reduce任务属性,可以将一个查询中的多个group by操作联合发送给单一MapReduce作业。

代码语言:javascript复制
set hive.multigroupby.singlereducer=true;

9. 控制并行Reduce任务

Hive通过将查询任务分成一个或者多个MapReduce任务达到并行的目的。确定最佳的mapper个数和reducer个数取决于多个变量,例如输入的数据量以及对这些数据执行的操作类型等。如果有太多的mapper或者reducer任务,会导致启动、调度和运行作业过程产生过多的开销,而设置的数量太少,那么就可能没有重分利用好集群内在的并发性。对于一个Hive查询,可以设置下面的属性来控制并行reducer任务的个数。

代码语言:javascript复制
set hive.exec.reducers.bytes.per.reducer=256000000;
set hive.exec.reducers.max=1009;

其中:

  • hive.exec.reducers.bytes.per.reducer:每个reducer的字节数,默认值为256MB。Hive是按照输入的数据量大小来确定reducer个数的。例如,如果输入的数据是1GB,将 使用4个reducer。
  • hive.exec.reducers.max:将会使用的最大reducer个数。

10. 启用向量化

向量化特性在Hive 0.13.1版本中被首次引入。通过查询执行向量化,使Hive从单行处理数据改为批量处理方式,具体来说是一次1024行而不是原来的每次只处理一行,这大大提升了指令流水线和缓存的利用率,从而提高了扫描、聚合、过滤和链接等操作的性能。可以设置下面的属性启用查询执行向量化。

代码语言:javascript复制
set hive.vectorized.execution.enabled=true;
set hive.vectorized.execution.reduce.enabled=true;
set hive.vectorized.execution.reduce.groupby.enabled=true;

其中:

  • hive.vectorized.execution.enabled:如果该标志设置为true,则开启查询执行的向量模式,默认值为false。
  • hive.vectorized.execution.reduce.enabled:如果该标志设置为true,则开启查询执行reduce端的向量模式,默认值为true。
  • hive.vectorized.execution.reduce.groupby.enabled:如果该标志设置为true,则开启查询执行reduce端group by操作的向量模式,默认值为true。

11. 启用基于成本的优化器

Hive 0.14 版本开始提供基于成本优化器(CBO)特性。使用过Oracle数据库的同学对CBO一定不会陌生。与Oracle类似,Hive的CBO也可以根据查询成本指定执行计划,例如确定表链接的顺序、以何种方式执行链接、使用的并行度等。设置下面的属性启用基于成本优化器。

代码语言:javascript复制
set hive.cbo.enable=true;
set hive.compute.query.using.stats=true;
set hive.stats.fetch.partition.stats=true;
set hive.stats.fetch.column.stats=true;

其中:

  • hive.cbo.enable:控制是否启用基于成本的优化器,默认值是true。Hive的CBO使用Apache Calcite框架实现。
  • hive.compute.query.using.stats:该属性的默认值为false。如果设置为true,Hive在执行某些查询时,例如select count(1),只利用元数据存储中保存的状态信息返回结果。为了收集基本状态信息,需要将hive.stats.autogather属性配置为true。为了收集更多的状态信息,需要运行analyzetable查询命令。
  • hive.stats.fetch.partition.stats:该属性的默认值为true。操作树中所标识的统计信息,需要分区级别的基本统计,如每个分区的行数、数据量大小和文件大小等。分区 统计信息从元数据存储中获取。如果存在很多分区,要为每个分区收集统计信息可能会消耗大量的资源。这个标志可被用于禁止从元数据存储中获取分区统计。当 该标志设置为false时,Hive从文件系统获取文件大小,并根据表结构估算行数。
  • hive.stats.fetch.column.stats:该属性的默认值为false。操作树中所标识的统计信息,需要列统计。列统计信息从元数据存储中获取。如果存在很多列,要为每个列收 集统计信息可能会消耗大量的资源。这个标志可被用于禁止从元数据存储中获取列统计。

可以使用HQL的analyze table语句收集一个表中所有列相关的统计信息,例如下面的语句收集sales_order_face表的统计信息。

代码语言:javascript复制
analyze table sales_order_fact compute statistics for climuns;
analyze table sales_order_fact compute statistics for columns order_number,customer_sk;

12. 使用合适的存储格式

Hive支持多种数据格式,如textFile、sequenceFile、RCFFile、ORCFile等。在合适的场景下使用合适的存储格式,有助于提升查询性能。

下面是选择存储格式建议的场景:

  • 如果数据有参数的分隔符,那么可以选择TEXTFILE格式。
  • 如果数据所在文件比块尺寸小、可以选择SEQUCEFILE格式。
  • 如果想执行数据分析,并高效地存储数据,可以选择RCFFILE。
  • 如果希望减少数据所需要的存储空间并提升性能,可以选择ORCFILE。

彩蛋

资源获取 获取Flink面试题,Spark面试题,程序员必备软件,hive面试题,Hadoop面试题,Docker面试题,简历模板,优质的文章等资源请去 下方链接获取GitHub自行下载 https://github.com/lhh2002/Framework-Of-BigData Gitee 自行下载 https://gitee.com/li_hey_hey/dashboard/projects

代码语言:javascript复制
代码语言:javascript复制
-End-

0 人点赞