老工在职场多年,从事过海量(PB级)数据的关系型数据库数据处理工作,后由于数据平台升级的要求,将数据迁移到Hadoop集群,做了多年的数据研发和数据产品的研发工作,从业务理解、数据模型构建、数据采集、数据清洗,到数据产品前端/服务端的研发都做过,基本涵盖了数据的生命周期。对于Hive调优,老工自有一番理解。下面将从一个过度优化的案例说起。
从一个过度优化案例说起
某天,老工在对小白的代码进行代码评审,发现了一个去重计数的代码案例,下面具体介绍。
【案例2.10】 去重计数案例。
代码语言:javascript复制select count(1) from(
select s_age
from student_tb_orc
group by s_age
) b
这是简单统计年龄的枚举值个数,为什么不用distinct?
【案例2.11】 简化的去重计数。
代码语言:javascript复制select count(distinct s_age)
from student_tb_orc
小白认为:案例2.10的代码在数据量特别大的情况下能够有效避免Reduce端的数据倾斜,案例2.10可能会比案例2.11效率高。
我们先不管数据量特别大这个问题,就当前的业务和环境下,案例2.11一定会比案例2.10的效率高,原因有以下几点:
(1)进行去重的列是s_age列,它的业务含义表示年龄。既然是年龄,说明它的可枚举值非常有限,如果转化成MapReduce来解释的话,在Map阶段,每个Map会对s_age去重。由于s_age枚举值有限,因而每个Map得到的s_age也有限,最终得到reduce的数据量也就是map数量*s_age枚举值的个数。
假如执行案例2.10的代码Map的数量有100个,s_age的最大枚举值有100个,每个Map过滤后的数据都含有s_age的所有枚举值,且s_age是int型占4个字节,那么传输到Reduce的数据量就是10 000条记录,总数据量是40KB,这么小的数据量,不需要避免数据倾斜。
(2)案例2.11中,distinct的命令会在内存中构建一个hashtable,查找去重的时间复杂度是O(1);案例2.10中,group by在不同版本间变动比较大,有的版本会用构建hashtable的形式去重,有的版本会通过排序的方式,排序最优时间复杂度无法到O(1 )。另外,案例2.10会转化为两个任务,会消耗更多的磁盘网络I/O资源。
(3)最新的Hive 3.0中新增了count(distinct)优化,通过配置hive.optimize.countdistinct,即使真的出现数据倾斜也可以自动优化,自动改变SQL执行的逻辑。
(4)案例2.11比案例2.10代码简洁,表达的意思简单明了,如果没有特殊的问题,代码简洁就是优。
为了佐证这个想法,可以一起执行下这两段代码,比较一下代码的执行结果。老工执行完后,分别贴出了上面的两个案例,即案例2.10和案例2.11的执行结果。
案例2.10的执行结果如下。
代码语言:javascript复制INFO : Query ID = hive_20181022145656_9bf4913b-006f-4211-9d73-5ac6f0161033
INFO : Total jobs = 2
INFO : Launching Job 1 out of 2
INFO : Starting task [Stage-1:MAPRED] in serial mode
...此处省略非关键的打印信息
INFO : MapReduce Total cumulative CPU time: 39 seconds 590 msec
INFO : Ended Job = job_1537177728748_3164
INFO : Launching Job 2 out of 2
INFO : Starting task [Stage-2:MAPRED] in serial mode
....此处省略非关键的打印信息
INFO : MapReduce Total cumulative CPU time: 7 seconds 710 msec
INFO : Ended Job = job_1537177728748_3165
INFO : MapReduce Jobs Launched:
INFO : Stage-Stage-1: Map: 3 Reduce: 4 Cumulative CPU: 39.59 sec HDFS
Read: 55151260 HDFS Write: 464 SUCCESS
INFO : Stage-Stage-2: Map: 3 Reduce: 1 Cumulative CPU: 7.71 sec HDFS
Read: 8683 HDFS Write: 6 SUCCESS
INFO : Total MapReduce CPU Time Spent: 47 seconds 300 msec
案例2.11的执行结果如下:
代码语言:javascript复制INFO : Query ID = hive_20181022145353_3973c188-bae1-40ea-a82a-980a61562e96
INFO : Total jobs = 1
INFO : Launching Job 1 out of 1
INFO : Starting task [Stage-1:MAPRED] in serial mode
...此处省略非关键的打印信息
INFO : MapReduce Total cumulative CPU time: 28 seconds 360 msec
INFO : Ended Job = job_1537177728748_3162
INFO : MapReduce Jobs Launched:
INFO : Stage-Stage-1: Map: 3 Reduce: 1 Cumulative CPU: 28.36 sec HDFS
Read: 55143184 HDFS Write: 6 SUCCESS
INFO : Total MapReduce CPU Time Spent: 28 seconds 360 msec
案例2.10和2.11执行结果对比:
- 案例2.10总共耗时47秒;
- 案例2.11总共耗时28秒。
看到案例2.10和案例2.11的执行结果,通过执行计划可以查看两者执行过程中的逻辑差别。
如果读者之前对执行计划不熟悉,也没关系,只要能看懂下面执行计划中的几个关键字,理清SQL的执行逻辑就好。随后老工贴出了两个案例的执行计划,并逐一做了解释。案例2.10的执行计划如下:
代码语言:javascript复制STAGE DEPENDENCIES:
Stage-1 is a root stage
Stage-2 depends on stages: Stage-1
Stage-0 depends on stages: Stage-2
STAGE PLANS:
//第一个Stage
Stage: Stage-1
Map Reduce
//Map的操作
Map Operator Tree:
TableScan
alias: student_tb_orc
Select Operator
expressions: s_age (type: bigint)
outputColumnNames: s_age
Group By Operator
keys: s_age (type: bigint)
mode: hash
outputColumnNames: _col0
Reduce Output Operator
key expressions: _col0 (type: bigint)
sort order:
Map-reduce partition columns: _col0 (type: bigint)
//Reduce的操作
Reduce Operator Tree:
Group By Operator
keys: KEY._col0 (type: bigint)
mode: mergepartial
outputColumnNames: _col0
Select Operator
Group By Operator
aggregations: count(1)
mode: hash
outputColumnNames: _col0
//第二个Stage
Stage: Stage-2
Map Reduce
Map Operator Tree:
TableScan
Reduce Output Operator
sort order:
value expressions: _col0 (type: bigint)
Reduce Operator Tree:
Group By Operator
aggregations: count(VALUE._col0)
mode: mergepartial
outputColumnNames: _col0
注意:原有的执行计划太长,为了突出重点,方便阅读,将执行计划中的部分信息省略了。
图2.6 案例2.10执行计划简化图
上面有两个Stage,即Stage-1和Stage-2(Stage-0一般表示计算完后的操作,对程序集群中的运行没有影响),分别表示两个任务,说明这个SQL会转化成两个MapReduce。我们先只关注上面执行结果中的黑体字,整个案例2.10的执行计划结构可以抽象成如图2.6所示的形式。
在Stage-1框中,整个作业又被抽象成Map和Reduce两个操作,分别用S-1 MAP和S-1 REDUCE表示。我们循着S-1 MAP/REDUCE来解读案例2.10的执行计划。
按S-1 Map框的缩进解读案例2.10的执行计划如下:
(1)扫描操作。
(2)在步骤1的基础上执行列筛选(列投影)的操作。
(3)在步骤2的基础上按s_age列分组聚合(group by),最后只输出key值,value的值抛弃,不输出。
按S-1 Reduce框的缩进解读案例2.10的执行计划如下:
(1)按KEY._col0(s_age)聚合。
(2)计算步骤(1)中每个s_age包含的学生个数,即count(1),最终输出key(s_age),抛弃无用的计算结果,即每个s_age包含的学生个数这个结果抛弃。
注意:这里只是算出每个年龄段的个数,而计算结果是要计算出不同年龄枚举值的个数。
经过上面的分析知道,Stage-1其实表达的就是子查询select s_age from student_tb_orc group by s_age的实际逻辑。输出的结果只是去重后的s_age。
为了计算去重后s_age的个数,Hive启动了第二个MapReduce作业,在执行计划里面用Stage-2表示。Stage-2被抽象成Map和Reduce两个操作。在图2.6中分别用S-2 MAP和S-2 REDUCE框表示,我们循着S-2 MAP/ REDUCE来解读案例2.11的执行计划。
图2.7 案例2.10的程序流程图
按S-2 Map框的缩进解读案例2.11的执行计划如下:
(1)读取Stage-1输出的结果。
(2)直接输出一列_col0,由于没有指定要去读的列,因而这里只是输出了每个s_age所在文件行的偏移量。
按S-2 Reduce框的缩进解读案例2.11的执行计划计算vlaue._col0(map输出的_col0)的个数,并输出。
整个Stage-2的逻辑就是select count(1) from (…)a这个SQL的逻辑。为了方便理解,可以对照图2.7的程序流程图来理解逻辑。
接着来看案例2.11对应的执行计划:
代码语言:javascript复制explain
select count(distinct s_age) from student_tb_orc;
STAGE DEPENDENCIES:
Stage-1 is a root stage
Stage-0 depends on stages: Stage-1
STAGE PLANS:
//唯一的Stage
Stage: Stage-1
Map Reduce
Map Operator Tree:
TableScan
alias: student_tb_orc
Select Operator
expressions: s_age (type: bigint)
outputColumnNames: s_age
Group By Operator
aggregations: count(DISTINCT s_age)
keys: s_age (type: bigint)
mode: hash
outputColumnNames: _col0, _col1
Reduce Output Operator
key expressions: _col0 (type: bigint)
sort order:
Reduce Operator Tree:
Group By Operator
aggregations: count(DISTINCT KEY._col0:0._col0)
mode: mergepartial
outputColumnNames: _col0
案例2.11的执行计划相对于案例2.10来说简单得多。同时,也可以看到只有一个Stage-1,即只有一个MapReduce作业。将上述执行计划抽象成图2.8的结构来进行解读。
图2.8 案例2.11Stage-1的执行计划
按S-1 Map框的缩进解读案例2.11的执行计划如下:
(1)获取表的数据。
(2)列的投影,筛选出s_age列。
(3)以s_age作为分组列,并计算s_age去重后的个数,最终输出的只有s_age列,计算s_age去重后个数的值会被抛弃。
注意:这里计算s_age去重后的个数,仅仅只是操作一个Map内处理的数据,即只是对部分数据去重。一个任务中有多个Map,如果存在相同的值则是没有做去重,要做到全局去重,就只能在Reduce中做。
按S-1 Reduce框的缩进解读案例2.11的执行计划。可以看到,Reduce阶段只是对key._col0(s_age)进行全局去重,并输出该值。为了方便理解,可以对照图2.9来理解。
图2.9 案例2.11的程序流程图
对比上面两个执行计划的逻辑我们可以知道,案例2.10是将去重(distinct)和计数放到两个MapReduce作业中分别处理;而案例2.11是将去重和计数放到一个MapReduce作业中完成。下面将两个案例流程放在一起对比,如图2.10所示。
图2.10 案例2.10和案例2.11的逻辑对比图
从图2.10中可以知道,案例2.10的数据处理逻辑集中在Stage-1-Map、Stage-1-Reduce和Stage-2-Reduce这3个部分;案例2.11的数据处理逻辑集中在Stage-1-Map、Stage-1-Reduce这两个部分。
从实际业务来讲,不同s_age的枚举个数相比于源表student_tb_orc的总数是非常有限的,且两个用到的算法相似,因此在这里可以认为案例2.10整体的数据处理逻辑的总体耗时和案例2.11的数据处理复杂度近似。这一点在YARN的日志中也会看到。这两个案例的时间差主要集中在数据传输和中间任务的创建下,就是图2.10中的虚线框部分,因此通过distinct关键字比子查询的方式效率更高。
采用案例2.10的写法,什么时候会比案例2.11高呢?在有数据倾斜的情况下,案例2.10的方式会比案例2.11更优。什么是数据倾斜?是指当所需处理的数据量级较大时,某个类型的节点所需要处理的数据量级,大于同类型的节点一个数量级(10倍)以上。这里的某个类型的节点可以指代执行Map或者Reduce的节点。
当数据大到一定的量级时,案例2.10有两个作业,可以把处理逻辑分散到两个阶段中,即第一个阶段先处理一部分数据,缩小数据量,第二个阶段在已经缩小的数据集上继续处理。而案例2.11,经过Map阶段处理的数据还非常多时,所有的数据却都需要交给一个Reduce节点去处理,就好比千军万马过独木桥一样,不仅无法利用到分布式集群的优势,还要浪费大量时间在等待,而这个等待的时间远比案例2.10多个MapReduce所延长的流程导致额外花费的时间还多。
如前面所说,在Hive 3.0中即使遇到数据倾斜,案例2.11将hive.optimize.countdistinct设置为true,则整个写法也能达到案例2.10的效果。
调优讲究适时调优,过早进行调优有可能做的是无用功甚至产生负效应,在调优上投入的工作成本和回报不成正比。调优需要遵循一定的原则。
本文摘编自《Hive性能调优实战》,经出版方授权发布。