知行教育项目_Hive参数优化

2021-04-09 14:57:11 浏览数 (1)

文章目录

    • 4.3 Hive的分区
      • 4.3.1.1 为什么要分区
      • 4.3.1.2 静态分区
      • 4.3.1.3 动态分区
    • 4.5 Hive参数优化(基础)
      • 4.5.2 Yarn基础配置
      • 4.5.2.1.2 内存配置
      • 4.5.3.1 HiveServer2 的 Java 堆栈
    • 4.1.3 Hive分桶
      • 4.1.4 Hive分桶
      • 4.1.4.6.1 大小表关联
      • 4.1.4.7 Bucket-MapJoin

4.3 Hive的分区

我们知道传统的OLTP数据库一般都具有索引和表分区的功能,通过表分区能够在特定的区域检索数据,减少扫描成本,在一定程度上提高查询效率,我们还可以通过建立索引进一步提升查询效率。在Hive数仓中也有索引和分区的概念。

为了对表进行合理的管理以及提高查询效率,Hive可以将表组织成“分区”。 分区是表的部分列的集合,可以为频繁使用的数据建立分区,这样查找分区中的数据时就不需要扫描全表,这对于提高查找效率很有帮助。 分区是一种根据“分区列”(partition column)的值对表进行粗略划分的机制。Hive中每个分区对应着表很多的子目录,将所有的数据按照分区列放入到不同的子目录中去。

4.3.1.1 为什么要分区

庞大的数据集可能需要耗费大量的时间去处理。在许多场景下,可以通过分区的方法减少每一次扫描总数据量,这种做法可以显著地改善性能。 数据会依照单个或多个列进行分区,通常按照时间、地域或者是商业维度进行分区。 比如电影表,分区的依据可以是电影的种类和评级,另外,按照拍摄时间划分可能会得到均匀的结果。 为了达到性能表现的一致性,对不同列的划分应该让数据尽可能均匀分布。最好的情况下,分区的划分条件总是能够对应where语句的部分查询条件,这样才能充分利用分区带来的性能优势。

Hive的分区使用HDFS的子目录功能实现。每一个子目录包含了分区对应的列名和每一列的值。但是由于HDFS并不支持大量的子目录,这也给分区的使用带来了限制。我们有必要对表中的分区数量进行预估,从而避免因为分区数量过大带来一系列问题。 Hive查询通常使用分区的列作为查询条件。这样的做法可以指定MapReduce任务在HDFS中指定的子目录下完成扫描的工作。HDFS的文件目录结构可以像索引一样高效利用。 Hive(Inceptor)分区包括静态分区和动态分区。

4.3.1.2 静态分区

代码语言:javascript复制
根据插入时是否需要手动指定分区可以分为:静态分区:导入数据时需要手动指定分区。动态分区:导入数据时,系统可以动态判断目标分区。

1.创建静态分区 直接在 PARTITIONED BY 后面跟上分区键、类型即可。(分区键不能和任何列重名) 语法:

代码语言:javascript复制
CREATE [EXTERNAL] TABLE <table_name>
    (<col_name> <data_type> [, <col_name> <data_type> ...])
    -- 指定分区键和数据类型
    PARTITIONED BY  (<partition_key> <data_type>, ...) 
    [ROW FORMAT <row_format>] 
    [STORED AS TEXTFILE|ORC|CSVFILE]
    [LOCATION '<file_path>']    
[TBLPROPERTIES ('<property_name>'='<property_value>', ...)];
栗子:
--分区字段主要是时间,按年分区
CREATE TABLE device_open (
deviceid varchar(50),
...
)
PARTITIONED BY (year varchar(50))
ROW FORMAT DELIMITED FIELDS TERMINATED BY 't';

2.写入数据 语法: – 覆盖写入

代码语言:javascript复制
INSERT OVERWRITE TABLE <table_name> 
    PARTITION (<partition_key>=<partition_value>[, <partition_key>=<partition_value>, ...]) 
    SELECT <select_statement>;

– 追加写入

代码语言:javascript复制
INSERT INTO TABLE <table_name> 
    PARTITION (<partition_key>=<partition_value>[, <partition_key>=<partition_value>, ...])
SELECT <select_statement>;

栗子:

代码语言:javascript复制
insert overwrite table device_open partition(year=’2020’)
select
	...,
	original_device_open.month as month,
	original_device_open.day as day,
	original_device_open.hour as hour
FROM original_device_open

4.3.1.3 动态分区

1.创建 创建方式与静态分区表完全一样。 语法:

代码语言:javascript复制
--分区字段主要是时间,分为年,月,日,时
CREATE TABLE device_open (
deviceid varchar(50),
...
)
PARTITIONED BY (year varchar(50), month varchar(50), day varchar(50), hour varchar(50))
ROW FORMAT DELIMITED FIELDS TERMINATED BY 't';

2.写入 动态分区只需要给出分区键名称。 语法:

代码语言:javascript复制
-- 开启动态分区支持,并开启非严格模式
set hive.exec.dynamic.partition=true;
set hive.exec.dynamic.partition.mode=nonstrict;

insert overwrite table device_open partition(year,month,day,hour)
select
	...,
	original_device_open.year as year,
	original_device_open.month as month,
	original_device_open.day as day,
	original_device_open.hour as hour
FROM original_device_open

set hive.exec.dynamic.partition=true; 是开启动态分区 set hive.exec.dynamic.partition.mode=nonstrict; 这个属性默认值是strict,

就是要求分区字段必须有一个是静态的分区值。全部动态分区插入,需要设置为nonstrict非严格模式。 代码中标红的部分,partition(year,month,day,hour) 就是要动态插入的分区。对于大批量数据的插入分区,动态分区相当方便。

4.3.1.4 静态分区和动态分区混用 一张表可同时被静态和动态分区键分区,只是动态分区键需要放在静态分区键的后面(因为HDFS上的动态分区目录下不能包含静态分区的子目录)。 静态分区键要用 = 指定分区值;动态分区只需要给出分区键名称 。 spk 即静态分区static partition key, dpk 即动态分区dynamic partition key。 比如:

代码语言:javascript复制
insert overwrite table device_open partition(year='2017',month='05',day,hour)
select
	...,
	original_device_open.day as day,
	original_device_open.hour as hour
FROM original_device_open 
where original_device_open.year='2017' and original_device_open.month='05'

partition(year=‘2017’, month=‘05’, day, hour),year和month是静态分区字段,day和hour是动态分区字段,这里指将2017年5月份的数据插入分区表,对应底层的物理操作就是将2017年5月份的数据load到hdfs上对应2017年5月份下的所有day和hour目录中去。 注意混用的情况下,静态分区的上层必须也是静态分区,如果partition(year, month, day=’05’, hour=’08’),则会报错:FAILED: SemanticException [Error 10094]: Line 1:50 Dynamic partition cannot be the parent of a static partition ‘‘day’’。 4.3.1.5 有序动态分区 注意,如果个人电脑性能不好,出现因为动态分区而导致的内存溢出问题,可以设置hive.optimize.sort.dynamic.partition进行避免:

设置为true后,当启用动态分区时,reducer仅随时保持一个记录写入程序,从而降低对 reducer产生的内存压力。但同时也会使查询性能变慢。

动态分区其他相关属性设置:

4.5 Hive参数优化(基础)

此课程中关于Hive的优化,皆是基于Hive2.x的版本,对于Hive1.x旧版本的优化机制不再复述(新版本已改善或变更)。另外新版本中默认为开启状态的优化配置项,在工作中无需修改,也不再复述。

4.5.1 HDFS副本数 dfs.replication(HDFS) 文件副本数,通常设为3,不推荐修改。 如果测试环境只有二台虚拟机(2个datanode节点),此值要修改为2。

4.5.2 Yarn基础配置

4.5.2.1 NodeManager配置 4.5.2.1.1 CPU配置 配置项:yarn.nodemanager.resource.cpu-vcores 表示该节点服务器上yarn可以使用的虚拟CPU个数,默认值是8,推荐将值配置与物理CPU线程数相同,如果节点CPU核心不足8个,要调小这个值,yarn不会智能的去检测物理核心数。

查看 CPU 线程数:

代码语言:javascript复制
grep 'processor' /proc/cpuinfo | sort -u | wc -l

4.5.2.1.2 内存配置

配置项:yarn.nodemanager.resource.memory-mb 设置该nodemanager节点上可以为容器分配的总内存,默认为8G,如果节点内存资源不足8G,要减少这个值,yarn不会智能的去检测内存资源,一般按照服务器剩余可用内存资源进行配置。生产上根据经验一般要预留15-20%的内存,那么可用内存就是实际内存0.8,比如实际内存是64G,那么640.8=51.2G,我们设置成50G就可以了(固定经验值)。

通过CM所有主机查看剩余内存: 可以看到第一台剩余内存为31.3-4.1=27.2G。

注意,要同时设置yarn.scheduler.maximum-allocation-mb为一样的值,yarn.app.mapreduce.am.command-opts(JVM内存)的值要同步修改为略小的值(-Xmx1024m)。 4.5.2.1.3 本地目录 yarn.nodemanager.local-dirs(Yarn) NodeManager 存储中间数据文件的本地文件系统中的目录列表。 如果单台服务器上有多个磁盘挂载,则配置的值应当是分布在各个磁盘上目录,这样可以充分利用节点的IO读写能力。

4.5.2.2 MapReduce内存配置 当MR内存溢出时,可以根据服务器配置进行调整。 mapreduce.map.memory.mb 为作业的每个 Map 任务分配的物理内存量(MiB),默认为0,自动判断大小。 mapreduce.reduce.memory.mb 为作业的每个 Reduce 任务分配的物理内存量(MiB),默认为0,自动判断大小。 mapreduce.map.java.opts、mapreduce.reduce.java.opts Map和Reduce的JVM配置选项。

注意: mapreduce.map.java.opts一定要小于mapreduce.map.memory.mb; mapreduce.reduce.java.opts一定要小于mapreduce.reduce.memory.mb,格式-Xmx4096m。 注意: 此部分所有配置均不能大于Yarn的NodeManager内存配置。

4.5.3 Hive基础配置

4.5.3.1 HiveServer2 的 Java 堆栈

Hiveserver2异常退出,导致连接失败的问题。

解决方法:修改HiveServer2 的 Java 堆栈大小。

4.5.3.2 动态生成分区的线程数 hive.load.dynamic.partitions.thread 用于加载动态生成的分区的线程数。加载需要将文件重命名为它的最终位置,并更新关于新分区的一些元数据。默认值为15。 当有大量动态生成的分区时,增加这个值可以提高性能。根据服务器配置修改。

4.5.3.3 监听输入文件线程数 hive.exec.input.listing.max.threads Hive用来监听输入文件的最大线程数。默认值:15。 当需要读取大量分区时,增加这个值可以提高性能。根据服务器配置进行调整。

4.5.4.2 Reduce结果压缩 是否对任务输出结果压缩,默认值false。对传输数据进行压缩,既可以减少文件的存储空间,又可以加快数据在网络不同节点之间的传输速度。 配置项: 1.mapreduce.output.fileoutputformat.compress 是否启用 MapReduce 作业输出压缩。 2.mapreduce.output.fileoutputformat.compress.codec 指定要使用的压缩编码解码器,推荐SnappyCodec。 3.mapreduce.output.fileoutputformat.compress.type 指定MapReduce作业输出的压缩方式,默认值RECORD,可配置值有:NONE、RECORD、BLOCK。推荐使用BLOCK,即针对一组记录进行批量压缩,压缩效率更高。

4.5.4.3 Hive执行过程通用压缩设置 主要包括压缩/解码器设置和压缩方式设置: mapreduce.output.fileoutputformat.compress.codec(Yarn) map输出所用的压缩编码解码器,默认为org.apache.hadoop.io.compress.DefaultCodec; 推荐使用SnappyCodec:org.apache.hadoop.io.compress.SnappyCodec。 mapreduce.output.fileoutputformat.compress.type 输出产生任务数据的压缩方式,默认值RECORD,可配置值有:NONE、RECORD、BLOCK。推荐使用BLOCK,即针对一组记录进行批量压缩,压缩效率更高。

4.5.4.4 Hive多个Map-Reduce中间数据压缩 控制 Hive 在多个map-reduce作业之间生成的中间文件是否被压缩。压缩编解码器和其他选项由上面Hive通用压缩mapreduce.output.fileoutputformat.compress.*确定。

代码语言:javascript复制
set hive.exec.compress.intermediate=true;

4.5.4.5 Hive最终结果压缩 控制是否压缩查询的最终输出(到 local/hdfs 文件或 Hive table)。压缩编解码器和其他选项由 上面Hive通用压缩mapreduce.output.fileoutputformat.compress.*确定。

代码语言:javascript复制
set hive.exec.compress.output=true;

4.5.5 其他 4.5.5.1 JVM重用(不再支持) 随着Hadoop版本的升级,已自动优化了JVM重用选项,MRv2开始不再支持JVM重用。(旧版本配置项:mapred.job.reuse.jvm.num.tasks、mapreduce.job.jvm.numtasks) 4.5.5.2 Hive执行引擎(了解) CDH支持的引擎包括MapReduce和Spark两种,可自由选择,Spark不一定比MR快,Hive2.x和Hadoop3.x经过多次优化,Hive-MR引擎的性能已经大幅提升。 配置项:hive.execution.engine

4.1.3 Hive分桶

分桶是将数据集分解成更容易管理的若干部分的一个技术,是比分区更为细粒度的数据范围划分。

4.1.4 Hive分桶

分桶是将数据集分解成更容易管理的若干部分的一个技术,是比分区更为细粒度的数据范围划分。

4.1.4.1.1 数据采样 在真实的大数据分析过程中,由于数据量较大,开发和自测的过程比较慢,严重影响系统的开发进度。此时就可以使用分桶来进行数据采样。采样使用的是一个具有代表性的查询结果而不是全部结果,通过对采样数据的分析,来达到快速开发和自测的目的,节省大量的研发成本。

4.1.4.2 分桶和分区的区别 1.分桶对数据的处理比分区更加细粒度化:分区针对的是数据的存储路径;分桶针对的是数据文件; 2.分桶是按照列的哈希函数进行分割的,相对比较平均;而分区是按照列的值来进行分割的,容易造成数据倾斜; 3.分桶和分区两者不干扰,可以把分区表进一步分桶。

4.1.4.3 操作 1.创建分桶表 create table test_buck(id int, name string) clustered by(id) sorted by (id asc) into 6 buckets row format delimited fields terminated by ‘t’; CLUSTERED BY来指定划分桶所用列; SORTED BY对桶中的一个或多个列进行排序; into 6 buckets指定划分桶的个数。 分桶规则:HIVE对key的hash值除bucket个数取余数,保证数据均匀随机分布在所有bucket里。

查看分桶表信息

代码语言:javascript复制
desc formatted test_buck;

2.插入数据

代码语言:javascript复制
--启用桶表
set hive.enforce.bucketing=true;
insert into table test_buck select id, name from temp_buck;

hive.enforce.bucketing:启用桶表,数据分桶是否被强制执行,默认false,如果开启,则写入table数据时会启动分桶。

4.1.4.4 文本数据处理 注意:对于分桶表,不能使用load data的方式进行数据插入操作,因为load data导入的数据不会有分桶结构。 如何避免针对桶表使用load data插入数据的误操作呢?

代码语言:javascript复制
--限制对桶表进行load操作
set hive.strict.checks.bucketing = true;

也可以在CM的hive配置项中修改此配置,当针对桶表执行load data操作时会报错。

那么对于文本数据如何处理呢? (1.先创建临时表,通过load data将txt文本导入临时表。

代码语言:javascript复制
--创建临时表
create table temp_buck(id int, name string)
row format delimited fields terminated by 't';
--导入数据
load data local inpath '/tools/test_buck.txt' into table temp_buck;

(2.使用insert select语句间接的把数据从临时表导入到分桶表。

代码语言:javascript复制
--启用桶表
set hive.enforce.bucketing=true;
--限制对桶表进行load操作
set hive.strict.checks.bucketing = true;
--insert select
insert into table test_buck select id, name from temp_buck;
--分桶成功

4.1.4.5 数据采样 对表分桶一般有两个目的,提高数据查询效率、抽样调查。通过前面的讲解,我们已经可以对分桶表进行正常的创建并导入数据了。一般在实际生产中,对于非常大的数据集,有时用户需要使用的是一个具有代表性的查询结果而不是全部结果,比如在开发自测的时候。这个时候Hive就可以通过对表进行抽样来满足这个需求。

代码语言:javascript复制
select * from table tablesample(bucket x out of y on column)

hive根据y的大小,决定抽样的比例。y必须是table总bucket数的倍数或者因子。 例如,table总共分了10份bucket,当y=2时,抽取(10/2=)5个bucket的数据,当y=10时,抽取(10/10=)1个bucket的数据。

x表示从哪个bucket开始抽取,如果需要取多个分区,以后的分区号为当前分区号加上y。 例如,table总bucket数为6,tablesample(bucket 1 out of 2),表示总共抽取(6/2=)3个bucket的数据,从第1个bucket开始,抽取第1(x)个和第3(x y)个和第5(x y)个bucket的数据。

注意:x的值必须小于等于y的值。否则会抛出异常:FAILED: SemanticException [Error 10061]: Numerator should not be bigger than denominator in sample clause for table stu_buck。 栗子

代码语言:javascript复制
select * from test_buck tablesample(bucket 1 out of 10 on id);

注意:sqoop不支持分桶表,如果需要从sqoop导入数据到分桶表,可以通过中间临时表进行过度。ODS也可以不做分桶,从DWD明细层开始分桶。 4.1.4.6 Map Join MapJoin顾名思义,就是在Map阶段进行表之间的连接。而不需要进入到Reduce阶段才进行连接。这样就节省了在Shuffle阶段时要进行的大量数据传输。从而起到了优化作业的作用。 要使MapJoin能够顺利进行,那就必须满足这样的条件:除了一份表的数据分布在不同的Map中外,其他连接的表的数据必须在每个Map中有完整的拷贝。 所以并不是所有的场景都适合用MapJoin。它通常会用在如下的一些情景:在二个要连接的表中,有一个很大,有一个很小,这个小表可以存放在内存中而不影响性能。 这样我们就把小表文件复制到每一个Map任务的本地,再让Map把文件读到内存中待用。 在Hive v0.7之前,需要使用hint提示 /* mapjoin(table) */才会执行MapJoin。Hive v0.7之后的版本已经不需要给出MapJoin的指示就进行优化。现在可以通过如下配置参数来进行控制:

代码语言:javascript复制
set hive.auto.convert.join=true;

Hive还提供另外一个参数–表文件的大小作为开启和关闭MapJoin的阈值:

代码语言:javascript复制
--旧版本为hive.mapjoin.smalltable.filesize
set hive.auto.convert.join.noconditionaltask.size=512000000

注意,如果hive.auto.convert.join是关闭的,则本参数不起作用。否则,如果参与连接的N个表(或分区)中的N-1个 的总大小小于512MB,则直接将连接转为Map连接。默认值为20MB。

MapJoin的使用场景:

  1. 关联操作中有一张表非常小
  2. 不等值的链接操作

4.1.4.6.1 大小表关联

代码语言:javascript复制
select f.a,f.b from A t join B f  on ( f.a=t.a and f.ftime=20110802)  

该语句中B表有30亿行记录,A表只有100行记录,而且B表中数据倾斜特别严重,有一个key上有15亿行记录,在运行过程中特别的慢,而且在reduece的过程中遇到执行时间过长或者内存不够的问题。 MAPJION会把小表全部读入内存中,在map阶段直接拿另外一个表的数据和内存中表数据做匹配,由于在map时进行了join操作,省去了reduce运行的效率会高很多。 这样就不会由于数据倾斜导致某个reduce上落数据太多而失败。于是原来的sql可以通过使用hint的方式指定join时使用mapjoin。

代码语言:javascript复制
select /*  mapjoin(A)*/ f.a,f.b from A t join B f  on ( f.a=t.a and f.ftime=20110802) 

4.1.4.6.2 不等连接 mapjoin还有一个很大的好处是能够进行不等连接的join操作,如果将不等条件写在where中,那么mapreduce过程中会进行笛卡尔积,运行效率特别低,如果使用mapjoin操作,在map的过程中就完成了不等值的join操作,效率会高很多。

代码语言:javascript复制
select A.a ,A.b from A join B where A.a>B.a

4.1.4.7 Bucket-MapJoin

4.1.4.7.1 作用 两个表join的时候,小表不足以放到内存中,但是又想用map side join这个时候就要用到bucket Map join。其方法是两个join表在join key上都做hash bucket,并且把你打算复制的那个(相对)小表的bucket数设置为大表的倍数。这样数据就会按照key join,做hash bucket。小表依然复制到所有节点,Map join的时候,小表的每一组bucket加载成hashtable,与对应的一个大表bucket做局部join,这样每次只需要加载部分hashtable就可以了。 4.1.4.7.2 条件 1) set hive.optimize.bucketmapjoin = true; 2) 一个表的bucket数是另一个表bucket数的整数倍 3) bucket列 == join列 4) 必须是应用在map join的场景中

注意:如果表不是bucket的,则只是做普通join。

4.1.4.8 SMB Join 全称Sort Merge Bucket Join。 4.1.4.8.1 作用 大表对小表应该使用MapJoin来进行优化,但是如果是大表对大表,如果进行shuffle,那就非常可怕,第一个慢不用说,第二个容易出异常,此时就可以使用SMB Join来提高性能。SMB Join基于bucket-mapjoin的有序bucket,可实现在map端完成join操作,可以有效地减少或避免shuffle的数据量。SMB join的条件和Map join类似但又不同。

4.1.4.8.2 条件

bucket mapjoin SMB join set hive.optimize.bucketmapjoin = true; set hive.optimize.bucketmapjoin = true; set hive.auto.convert.sortmerge.join=true; set hive.optimize.bucketmapjoin.sortedmerge = true; set hive.auto.convert.sortmerge.join.noconditionaltask=true; 一个表的bucket数是另一个表bucket数的整数倍 小表的bucket数=大表bucket数 bucket列 == join列 Bucket 列 == Join 列 == sort 列 必须是应用在map join的场景中 必须是应用在bucket mapjoin 的场景中

4.1.4.8.3 确保分同列排序 hive并不检查两个join的表是否已经做好bucket且sorted,需要用户自己去保证join的表数据sorted,否则可能数据不正确。 有两个办法: 1)hive.enforce.sorting 设置为 true。开启强制排序时,插数据到表中会进行强制排序,默认false。 2)插入数据时通过在sql中用distributed c1 sort by c1 或者 cluster by c1

另外,表创建时必须是CLUSTERED且SORTED,如下: create table test_smb_2(mid string,age_id string) CLUSTERED BY(mid) SORTED BY(mid) INTO 500 BUCKETS; 综上,涉及到分桶表操作的齐全配置为:

代码语言:javascript复制
--写入数据强制分桶
set hive.enforce.bucketing=true;
--写入数据强制排序
set hive.enforce.sorting=true;
--开启bucketmapjoin
set hive.optimize.bucketmapjoin = true;
--开启SMB Join
set hive.auto.convert.sortmerge.join=true;
set hive.auto.convert.sortmerge.join.noconditionaltask=true;

开启MapJoin的配置(hive.auto.convert.join和hive.auto.convert.join.noconditionaltask.size),还有限制对桶表进行load操作(hive.strict.checks.bucketing)可以直接设置在hive的配置项中,无需在sql中声明。 自动尝试SMB联接(hive.optimize.bucketmapjoin.sortedmerge)也可以在设置中进行提前配置。

0 人点赞