(七)Hive总结

2020-09-20 19:43:59 浏览数 (1)

1.6.1 Hive的架构

hive架构.png

1.6.2 Hive和数据库比较

Hive 和数据库除了拥有类似的查询语言,再无类似之处。 1)数据存储位置 Hive 存储在 HDFS 。数据库将数据保存在块设备或者本地文件系统中。 2)数据更新 Hive中不建议对数据的改写。而数据库中的数据通常是需要经常进行修改的, 3)执行延迟 Hive 执行延迟较高。数据库的执行延迟较低。当然,这个是有条件的,即数据规模较小,当数据规模大到超过数据库的处理能力的时候,Hive的并行计算显然能体现出优势。 4)数据规模 Hive支持很大规模的数据计算;数据库可以支持的数据规模较小。

1.6.3 内部表和外部表

1)管理表:当我们删除一个管理表时,Hive也会删除这个表中数据。管理表不适合和其他工具共享数据。 2)外部表:删除该表并不会删除掉原始数据,删除的是表的元数据

1.6.4 四个By区别

1)Sort By:分区内有序; 2)Order By:全局排序,只有一个Reducer; 3)Distrbute By:类似MR中Partition,进行分区,结合sort by使用。 4) Cluster By:当Distribute by和Sorts by字段相同时,可以使用Cluster by方式。Cluster by除了具有Distribute by的功能外还兼具Sort by的功能。但是排序只能是升序排序,不能指定排序规则为ASC或者DESC。

1.6.5 窗口函数

RANK() 排序相同时会重复,总数不会变 DENSE_RANK() 排序相同时会重复,总数会减少 ROW_NUMBER() 会根据顺序计算 1) OVER():指定分析函数工作的数据窗口大小,这个数据窗口大小可能会随着行的变而变化 2)CURRENT ROW:当前行 3)n PRECEDING:往前n行数据 4) n FOLLOWING:往后n行数据 5)UNBOUNDED:起点,UNBOUNDED PRECEDING 表示从前面的起点, UNBOUNDED FOLLOWING表示到后面的终点 6) LAG(col,n):往前第n行数据 7)LEAD(col,n):往后第n行数据 8) NTILE(n):把有序分区中的行分发到指定数据的组中,各个组有编号,编号从1开始,对于每一行,NTILE返回此行所属的组的编号。注意:n必须为int类型。

1.6.6 自定义UDF、UDTF

在项目中是否自定义过UDF、UDTF函数,以及用他们处理了什么问题,及自定义步骤? 1)自定义过。 2)用UDF函数解析公共字段;用UDTF函数解析事件字段。 自定义UDF:继承UDF,重写evaluate方法 自定义UDTF:继承自GenericUDTF,重写3个方法:initialize(自定义输出的列名和类型),process(将结果返回forward(result)),close 为什么要自定义UDF/UDTF,因为自定义函数,可以自己埋点Log打印日志,出错或者数据异常,方便调试.

1.6.7 Hive优化

1)MapJoin 如果不指定MapJoin或者不符合MapJoin的条件,那么Hive解析器会将Join操作转换成Common Join,即:在Reduce阶段完成join。容易发生数据倾斜。可以用MapJoin把小表全部加载到内存在map端进行join,避免reducer处理。 2)行列过滤 列处理:在SELECT中,只拿需要的列,如果有,尽量使用分区过滤,少用SELECT *。 行处理:在分区剪裁中,当使用外关联时,如果将副表的过滤条件写在Where后面,那么就会先全表关联,之后再过滤。 3)列式存储 4)采用分区技术 5)合理设置Map数 (1)通常情况下,作业会通过input的目录产生一个或者多个map任务。 主要的决定因素有:input的文件总个数,input的文件大小,集群设置的文件块大小。 (2)是不是map数越多越好? 答案是否定的。如果一个任务有很多小文件(远远小于块大小128m),则每个小文件也会被当做一个块,用一个map任务来完成,而一个map任务启动和初始化的时间远远大于逻辑处理的时间,就会造成很大的资源浪费。而且,同时可执行的map数是受限的。 (3)是不是保证每个map处理接近128m的文件块,就高枕无忧了? 答案也是不一定。比如有一个127m的文件,正常会用一个map去完成,但这个文件只有一个或者两个小字段,却有几千万的记录,如果map处理的逻辑比较复杂,用一个map任务去做,肯定也比较耗时。 针对上面的问题2和3,我们需要采取两种方式来解决:即减少map数和增加map数; 6)小文件进行合并 在Map执行前合并小文件,减少Map数:CombineHiveInputFormat具有对小文件进行合并的功能(系统默认的格式)。HiveInputFormat没有对小文件合并功能。 7)合理设置Reduce数 Reduce个数并不是越多越好 (1)过多的启动和初始化Reduce也会消耗时间和资源; (2)另外,有多少个Reduce,就会有多少个输出文件,如果生成了很多个小文件,那么如果这些小文件作为下一个任务的输入,则也会出现小文件过多的问题; 在设置Reduce个数的时候也需要考虑这两个原则:处理大数据量利用合适的Reduce数;使单个Reduce任务处理数据量大小要合适; 8)常用参数 // 输出合并小文件 SET hive.merge.mapfiles = true; -- 默认true,在map-only任务结束时合并小文件 SET hive.merge.mapredfiles = true; -- 默认false,在map-reduce任务结束时合并小文件 SET hive.merge.size.per.task = 268435456; -- 默认256M SET hive.merge.smallfiles.avgsize = 16777216; -- 当输出文件的平均大小小于16m该值时,启动一个独立的map-reduce任务进行文件merge 9)开启map端combiner(不影响最终业务逻辑) set hive.map.aggr=true; 10)压缩(选择快的) 设置map端输出、中间结果压缩。(不完全是解决数据倾斜的问题,但是减少了IO读写和网络传输,能提高很多效率) 11)开启JVM重用

1.6.8 Hive解决数据倾斜方法

业务背景

用户轨迹工程的性能瓶颈一直是etract_track_info,其中耗时大户主要在于trackinfo与pm_info进行左关联的环节,trackinfo与pm_info两张表均为GB级别,左关联代码块如下:

from trackinfo a left outer join pm_info b on (a.ext_field7 = b.id)

改动为上面代码后,效果仍然不理想,耗时为1.5小时。 第二次优化 考虑到trackinfo表的ext_field7字段缺失率很高(为空、字段长度为零、字段填充了非整数)情况,做进行左关联时空字段的关联操作实际上没有意义,因此,如果左表关联字段ext_field7为无效字段,则不需要关联,因此,改为如下:

from trackinfo a left outer join pm_info b on (a.ext_field7 is not null and length(a.ext_field7) > 0 and a.ext_field7 rlike '^[0-9] $' and a.ext_field7 = b.id)

上面代码块的作用是,如果左表关联字段ext_field7为无效字段时(为空、字段长度为零、字段填充了非整数),不去关联右表,由于空字段左关联以后取到的右表字段仍然为null,所以不会影响结果。 改动为上面代码后,效果仍然不理想,耗时为50分钟。 第三次优化 想了很久,第二次优化效果效果不理想的原因,其实是在左关联中,虽然设置了左表关联字段为空不去关联右表,但是这样做,左表中未关联的记录(ext_field7为空)将会全部聚集在一个reduce中进行处理,体现为reduce进度长时间处在99%。 换一种思路,解决办法的突破点就在于如何把左表的未关联记录的key尽可能打散,因此可以这么做:若左表关联字段无效(为空、字段长度为零、字段填充了非整数),则在关联前将左表关联字段设置为一个随机数,再去关联右表,这么做的目的是即使是左表的未关联记录,它的key也分布得十分均匀

from trackinfo a left outer join pm_info b on ( case when (a.ext_field7 is not null and length(a.ext_field7) > 0 and a.ext_field7 rlike '^[0-9] $') then cast(a.ext_field7 as bigint) else cast(ceiling(rand() * -65535) as bigint) end = b.id )

1)怎么产生的数据倾斜? 不同数据类型关联产生数据倾斜 情形:比如用户表中user_id字段为int,log表中user_id字段既有string类型也有int类型。当按照user_id进行两个表的Join操作时。 后果:处理此特殊值的reduce耗时;只有一个reduce任务 默认的Hash操作会按int型的id来进行分配,这样会导致所有string类型id的记录都分配到一个Reducer中。 解决方式:把数字类型转换成字符串类型 select * from users a left outer join logs b on a.usr_id = cast(b.user_id as string)

2)解决数据倾斜的方法? (1)group by 注:group by 优于distinct group 解决方式:采用sum() group by的方式来替换count(distinct)完成计算。 (2)mapjoin (3)开启数据倾斜时负载均衡 set hive.groupby.skewindata=true; 思想:就是先随机分发并处理,再按照key group by来分发处理。 操作:当选项设定为true,生成的查询计划会有两个MRJob。 第一个MRJob 中,Map的输出结果集合会随机分布到Reduce中,每个Reduce做部分聚合操作,并输出结果,这样处理的结果是相同的GroupBy Key有可能被分发到不同的Reduce中,从而达到负载均衡的目的; 第二个MRJob再根据预处理的数据结果按照GroupBy Key分布到Reduce中(这个过程可以保证相同的原始GroupBy Key被分布到同一个Reduce中),最后完成最终的聚合操作。 点评:它使计算变成了两个mapreduce,先在第一个中在 shuffle 过程 partition 时随机给 key 打标记,使每个key 随机均匀分布到各个 reduce 上计算,但是这样只能完成部分计算,因为相同key没有分配到相同reduce上。 所以需要第二次的mapreduce,这次就回归正常 shuffle,但是数据分布不均匀的问题在第一次mapreduce已经有了很大的改善,因此基本解决数据倾斜。因为大量计算已经在第一次mr中随机分布到各个节点完成。 (4)控制空值分布 将为空的key转变为字符串加随机数或纯随机数,将因空值而造成倾斜的数据分不到多个Reducer。 注:对于异常值如果不需要的话,最好是提前在where条件里过滤掉,这样可以使计算量大大减少 实践中,可以使用case when对空值赋上随机值。此方法比直接写is not null更好,因为前者job数为1,后者为2. 使用case when实例1: select userid, name from user_info a join ( select case when userid is null then cast (rand(47)* 100000 as int ) else userid end from user_read_log ) b on a.userid = b.userid 使用case when实例2: select '

{date}000000' and case when a.new_brand_id is null then concat('hive',rand() ) else a.new_brand_id end = f.brand_id; 如果上述的方法还不能解决,比如当有多个JOIN的时候,建议建立临时表,然后拆分HIVE SQL语句。 1.6.9 用的是动态分区吗?动态分区的底层原理是什么? a. 静态分区与动态分区的主要区别在于静态分区是手动指定,而动态分区是通过数据来进行判断。 b. 详细来说,静态分区的列实在编译时期,通过用户传递来决定的;动态分区只有在 SQL 执行时才能决定。 c. 动态分区是基于查询参数的位置去推断分区的名称,从而建立分区

  1. Hive里边字段的分隔符用的什么?为什么用t?有遇到过字段里边有t的情况吗,怎么处理的? hive 默认的字段分隔符为ascii码的控制符01(^A),建表的时候用fields terminated by '01'

遇到过字段里边有t的情况,自定义InputFormat,替换为其他分隔符再做后续处理。

遇错namenode挂了,手动重启: hadoop-daemon.sh start namenode

0 人点赞