1 Hive数据倾斜的现象
通常认为当所有的map task全部完成,并且99%的reduce task完成,只剩下一个或者少数几个reduce task一直在执行,这种情况下一般都是发生了数据倾斜。
即为在整个计算过程中,大量相同的key被分配到了同一个reduce任务上造成。Hive的数据倾斜本质上是MapReduce计算引擎的数据倾斜,一般来说容易发生在reduce阶段,map阶段的数据倾斜多是由于HDFS存储数据文件源的问题,reduce阶段则多是开发过程中程序员引起,需要通过手段进行优化。
本文仅讨论基于MR引擎的Hive数据倾斜现象,另外Spark、Flink中的数据倾斜择日再论。
1.1 Hive数据倾斜的场景
Hive数据倾斜是指在数据分布中存在不均匀的情况,业务问题或者业务数据本身的问题,某些数据比较集中,导致某些节点或分区上的数据量远远大于其他节点或分区,从而影响查询性能和任务的均衡执行,尤其是join。以下是一些可能导致Hive数据倾斜的场景:
- 连接操作中的键值倾斜:在进行join连接操作时,如果连接的键存在不均匀分布、数据类型不一致,会导致某些键对应的数据量远大于其他键,造成倾斜。表中作为关联条件的字段值为0或空值的较多,会造成shuffle时进入到一个reduce任务中。为什么是空值?因为空值在根据hash计算分区时,会在内存中被视为同样的hash,进而被放入一个分区进行计算。
- 分桶表和分区表的数据倾斜:如果在分桶表或分区表中,某些分桶或分区的数据量过大,超过了其他分桶或分区的数据量,就会造成倾斜。
- 聚合操作的倾斜:在执行聚合操作(如GROUP BY、COUNT、SUM等)时,如果被聚合的列数据分布不均匀,会导致聚合操作的任务负载不平衡,Count(distinct id ) 去重统计要慎用。
- 高基数列的倾斜:某些列的基数(唯一值的数量)很高,而其他列的基数较低,可能导致以高基数列为基准进行的连接或聚合操作产生数据倾斜。
- 随机写入场景:当数据随机写入分区表或分桶表时,可能会导致某些分区或分桶的数据量增长迅速,从而引发倾斜。
- 数据导入方式不均匀:如果使用了多个任务同时导入数据,而这些任务在导入数据时的输入源数据分布不均匀,就会导致数据倾斜。
1.2 解决数据倾斜问题的优化思路
1.2.1 代码层面:
- 检查连接键和分区键:检查连接和分组操作的键,确保数据分布均匀,避免倾斜。可以考虑在键中引入随机数,或者对键进行散列操作。
- 使用MapJoin和Broadcast Join:对于连接操作,使用MapJoin或Broadcast Join可以将小表复制到每个节点上,避免数据倾斜。
- 检查聚合操作:如果有聚合操作,尤其是GROUP BY,确保被聚合的列数据分布均匀,可以考虑使用采样数据进行预估。
- 调整存储格式:选择合适的列式存储格式(如ORC、Parquet),可以减少数据读取,提高性能。
- 数据倾斜监控和日志:在代码中添加数据倾斜监控和日志,便于发现和定位倾斜的数据。
- group by 代替 distinct:当要统计某一列的去重数时,如果数据量很大,count(distinct)就会非常慢,原因与order by类似,count(distinct)逻辑只会有很少的reducer来处理。
- 列裁剪和分区裁剪:所谓列裁剪就是在查询时只读取需要的列,分区裁剪就是只读取需要的分区。Hive中与列裁剪优化相关的配置项是
hive.optimize.cp
,与分区裁剪优化相关的则是hive.optimize.pruner
,默认都是true
。
1.2.2 配置层面:
- 动态分桶和分区:对于分桶和分区表,使用动态分桶和分区可以根据数据分布情况进行自动优化。
- 并行度设置:根据集群的规模和硬件配置,适当调整并行度,避免某些任务负载过重。
- 调整资源分配:分配合适的资源给任务,避免资源争夺导致倾斜。
1.2.3 参数调整:
- 调整shuffle参数:调整shuffle相关的参数,如mapreduce.reduce.shuffle.input.buffer.percent、mapreduce.reduce.shuffle.parallelcopies等。
- 调整内存参数:根据任务的实际需求,调整内存参数,避免内存不足引发倾斜。
1.2.4 其他思路:
- 数据抽样分析:使用抽样数据进行分析,了解数据分布情况,有助于更好地优化查询。
- 使用中间表:将复杂的查询过程分解成多个步骤,将中间结果保存在临时表中,减少大查询的复杂性。
- 使用UDF和UDAF:编写自定义函数和聚合函数,对倾斜数据进行特殊处理,分散数据分布。
- 数据重分布:通过数据重分布操作,将倾斜数据均匀地分布到不同节点上。
- 增加节点数:如果集群规模允许,可以考虑增加节点数,从而分担负载,减轻数据倾斜。
2 解决Hive数据倾斜问题的方法
解决方案需要具体问题具体分析,综合考虑资源、数据量等多种因素,以下方案有相互交叉的内容,需要研判考虑:
2.1 开启负载均衡
代码语言:javascript复制-- map端的Combiner,默认为ture
set hive.map.aggr=true;
-- 开启负载均衡
set hive.groupby.skewindata=true (默认为false)
这行代码是在Hive中用于处理数据倾斜的配置代码。它的作用是开启Hive中的负载均衡优化,以应对数据倾斜的情况。
具体来说:
-
hive.map.aggr=true
:默认情况下,Hive在执行聚合操作时(如GROUP BY、SUM、AVG等),会在Map端进行部分聚合(Partial Aggregation),以减少数据的传输量。这个配置项开启了Map端的部分聚合,可以在Map阶段对部分数据进行聚合,减少数据传输到Reducer的量。 -
hive.groupby.skewindata=true
:这个配置项是为了应对数据倾斜的情况。数据倾斜指的是在进行聚合操作时,部分数据分布不均匀,导致部分Reducer处理的数据量远大于其他Reducer。开启此配置项会在数据倾斜的情况下,将数据倾斜的Key单独划分到一个Reducer,以实现负载均衡,减少单个Reducer的负载。
总体来说,这两个配置项的作用是在MapReduce过程中,优化聚合操作和应对数据倾斜,从而提高作业的执行效率和稳定性。但是,这只是配置项的作用描述,具体的优化效果还需要根据实际数据和作业情况进行实验和观察。
2.2 引入随机性
通过在连接键或分区键中引入随机数、数据加盐等方式,将倾斜的数据打散,使其分布均匀化,减少倾斜。
- 使用随机前缀:
-- 创建分桶表,内部外部表也行
CREATE TABLE skewed_table (
id INT,
value STRING
)
CLUSTERED BY (id) INTO 4 BUCKETS;
-- 插入数据到分桶表
INSERT INTO TABLE skewed_table
SELECT id, value FROM source_data;
-- 添加随机前缀列
-- 这里使用FLOOR(rand() * 100)生成一个0到99的随机整数,作为随机前缀
SELECT id, value, FLOOR(rand() * 100) AS random_prefix
FROM skewed_table;
- 使用哈希函数:
-- 创建分桶表,内部外部表也行
CREATE TABLE skewed_table (
id INT,
value STRING
)
CLUSTERED BY (id) INTO 4 BUCKETS;
-- 插入数据到分桶表
INSERT INTO TABLE skewed_table
SELECT id, value FROM source_data;
-- 使用哈希函数生成分桶列
-- 这里使用MD5哈希函数将id列哈希为一个字符串,然后将哈希字符串转换为整数
SELECT id, value, CAST(CONV(SUBSTRING(MD5(CAST(id AS STRING)), 1, 8), 16, 10) % 4 AS INT) AS hash_bucket
FROM skewed_table;
- 使用窗口函数和随机数:
-- 创建分桶表,内部外部表也行
CREATE TABLE skewed_table (
id INT,
value STRING
)
CLUSTERED BY (id) INTO 4 BUCKETS;
-- 插入数据到分桶表
INSERT INTO TABLE skewed_table
SELECT id, value FROM source_data;
-- 使用窗口函数和随机数生成分桶列
-- 这里使用ROW_NUMBER()窗口函数和FLOOR(rand() * 4)生成一个随机分桶号
SELECT id, value, FLOOR(rand() * 4) AS random_bucket
FROM (
SELECT id, value, ROW_NUMBER() OVER (PARTITION BY id) AS row_num
FROM skewed_table
) t;
- 使用分桶表解决连接数据倾斜:
-- 创建两个分桶表
CREATE TABLE table1 (
id INT,
value STRING
)
CLUSTERED BY (id) INTO 4 BUCKETS;
CREATE TABLE table2 (
id INT,
data STRING
)
CLUSTERED BY (id) INTO 4 BUCKETS;
-- 插入数据到分桶表
INSERT INTO TABLE table1
SELECT id, value FROM source_data1;
INSERT INTO TABLE table2
SELECT id, data FROM source_data2;
-- 使用分桶表解决连接数据倾斜
-- 对两个表都使用相同的分桶列,并且分桶数也相同,可以减少连接时的数据倾斜
SELECT t1.id, t1.value, t2.data
FROM table1 t1
JOIN table2 t2 ON t1.id = t2.id;
2.3 使用MapJoin或Broadcast Join
对于连接操作,reduce join 转换成 MapJoin,使用MapJoin或Broadcast Join可以将小表复制到每个节点上,避免数据倾斜。
Map-side Join(MapJoin)是一种用于处理数据倾斜问题的方法,特别适用于一个小表和一个大表进行连接的场景。在MapJoin中,小表被缓存在内存中,并与大表进行连接操作,以减少大表的数据复制和数据倾斜问题。以下是如何使用MapJoin来解决数据倾斜问题的步骤:
- 准备数据: 假设有一个大表
big_table
和一个小表small_table
,需要根据某个共同的列进行连接。 - 设置MapJoin: 在Hive中,可以通过设置参数来启用MapJoin。
-- 设置MapJoin
set hive.auto.convert.join=true;
set hive.mapjoin.smalltable.filesize=25000000; -- 小表的大小阈值,单位为字节
set hive.auto.convert.join.noconditionaltask=true; -- 仅进行MapJoin,不使用Reduce阶段
- 对小表进行Bucket操作: 将小表进行Bucket操作,使其和大表具有相同的Bucket数量。
-- 对小表进行Bucket操作
CREATE TABLE small_table_bucketed
CLUSTERED BY (join_column) INTO N BUCKETS
AS
SELECT * FROM small_table;
- 执行MapJoin查询: 编写查询语句,使用MapJoin来连接大表和经过Bucket操作的小表。
SELECT /* MAPJOIN(small_table_bucketed) */ big_table.*, small_table_bucketed.*
FROM big_table
JOIN small_table_bucketed ON big_table.join_column = small_table_bucketed.join_column;
在这个过程中,MapJoin会将小表的数据加载到内存中,并在Map阶段进行连接操作,从而避免了大表的数据复制和数据倾斜问题。需要注意的是,MapJoin适用于小表和大表的大小阈值适当的情况下,如果小表过大,可能会导致内存不足的问题。
总之,MapJoin是一种有效的方法来解决数据倾斜问题,特别适用于小表和大表的连接操作,通过在Map阶段进行连接,减少了数据复制和数据倾斜的可能性。
2.4 调整数据存储格式
调整存储格式,如使用ORC或Parquet等列式存储格式,或者开启输出压缩,可以减少不必要的数据读取,改善数据倾斜。
代码语言:javascript复制// 开启Map端输出压缩
Configuration conf = job.getConfiguration();
conf.setBoolean("mapreduce.map.output.compress", true);
conf.setClass("mapreduce.map.output.compress.codec", GzipCodec.class, CompressionCodec.class);
这行代码是在MapReduce程序中使用Hadoop的Configuration
类来配置Map端的输出压缩。虽然这行代码本身并不直接处理数据倾斜,但它可以在一定程度上优化作业的性能,从而减轻数据倾斜造成的影响。
数据倾斜可能导致部分Reducer的负载过重,而启用Map端输出压缩可以在一定程度上减小传输数据量,从而减轻Reducer的负担。具体来说,这段代码的作用是:
-
conf.setBoolean("mapreduce.map.output.compress", true);
:这一行代码启用了Map端输出的压缩。MapReduce作业产生的中间数据(Map输出数据)在传输到Reducer之前可以进行压缩,减小数据的传输量,从而加快数据传输速度。 -
conf.setClass("mapreduce.map.output.compress.codec", GzipCodec.class, CompressionCodec.class);
:这一行代码指定了使用的压缩编解码器。在这个例子中,使用的是Gzip压缩编解码器(GzipCodec.class
),它可以对中间数据进行Gzip压缩。
通过开启Map端输出压缩,可以减小Map输出数据的传输量,从而减轻了网络传输的压力。这在数据倾斜的情况下可能会有一定的帮助,因为数据倾斜往往会导致部分Reducer需要处理较多的数据,通过减小传输数据量,可以加快数据的传输速度,从而在一定程度上减轻了数据倾斜带来的影响。然而,需要注意的是,这只是优化的一部分,实际情况可能还需要结合其他优化策略来解决数据倾斜问题。
2.5 分桶表、分区表
通过调整查询计划,如使用分桶表、分区表等,可以将任务负载均衡分配,减少数据倾斜。
分桶表是Hive中一种用于优化查询性能的技术,它可以在一定程度上帮助解决数据倾斜问题。分桶表将数据按照指定的列进行哈希分桶存储,每个分桶都包含了一部分数据,使得数据更加均匀地分布在不同的分桶中。当进行Join操作时,如果参与Join的两个表都是分桶表并且使用相同的分桶列,那么可以通过哈希分桶的方式来提高Join的效率,减轻数据倾斜问题。
下面是分桶表如何解决Join中的数据倾斜问题的基本步骤:
- 选择合适的分桶列: 首先,需要根据实际情况选择合适的列作为分桶列。通常情况下,可以选择参与Join的列作为分桶列。
- 创建分桶表: 将需要进行Join的表创建为分桶表,并指定分桶列和分桶数量。分桶数量应该根据数据量来合理设置,以确保数据能够均匀地分布在各个分桶中。
-- 创建分桶表A,指定分桶列bucket_col和分桶数量4
CREATE TABLE table_A (
id INT,
value STRING
)
CLUSTERED BY (bucket_col) INTO 4 BUCKETS;
-- 创建分桶表B,同样指定分桶列bucket_col和分桶数量4
CREATE TABLE table_B (
id INT,
data STRING
)
CLUSTERED BY (bucket_col) INTO 4 BUCKETS;
- 插入数据: 将数据插入到分桶表中。Hive会根据分桶列的哈希值将数据均匀地分配到不同的分桶中。
-- 插入数据到分桶表A
INSERT INTO TABLE table_A
SELECT id, value FROM source_data_A;
-- 插入数据到分桶表B
INSERT INTO TABLE table_B
SELECT id, data FROM source_data_B;
- 进行Join操作: 当需要进行Join操作时,如果两个参与Join的表都是分桶表并且使用相同的分桶列,Hive会自动利用分桶信息来进行优化。在Join时,Hive会根据分桶列的哈希值将相同哈希值的数据分配到同一个节点上,从而减少数据的传输和倾斜的问题。
-- 进行基于分桶表的Join操作
SELECT a.id, a.value, b.data
FROM table_A a
JOIN table_B b ON a.id = b.id;
在这个示例中,我们创建了两个分桶表table_A和table_B,分别用于存储两个数据源的数据。然后通过插入数据,将源数据插入到分桶表中。最后,我们进行了一个基于分桶表的Join操作,通过分桶列id来进行Join。由于两个表都是分桶表,Hive会根据分桶列的哈希值将相同哈希值的数据分配到同一个节点上,从而优化Join操作。
请注意,实际使用中需要根据数据的特点和需求来选择分桶列和分桶数量。分桶表的使用需要结合具体场景来考虑,以达到优化查询性能的目的。
分桶表的优势在于,通过合理设置分桶数量和选择适当的分桶列,可以使数据更加均匀地分布在不同的分桶中,从而减轻数据倾斜的影响。但需要注意的是,分桶表并不能完全消除数据倾斜,特别是在数据分布不均匀的情况下,仍然可能会出现倾斜的问题。在实际应用中,还可以结合其他优化技术,如使用Combiner、调整分桶数量、使用随机前缀等,来更全面地解决数据倾斜的影响。
2.6 使用抽样数据进行优化
对于大数据表,可以先对数据进行抽样,分析抽样数据的分布情况,再进行优化,避免全表扫描导致的倾斜。
代码语言:javascript复制// 采样数据
InputSampler.writePartitionFile(job, new InputSampler.RandomSampler(0.1, 10000));
这段代码是在MapReduce程序中使用Hadoop的InputSampler
来采样数据,用于优化数据倾斜问题。具体来说,这段代码的作用是:
InputSampler.writePartitionFile(job, new InputSampler.RandomSampler(0.1, 10000));
:这行代码使用随机采样器来创建一个分区文件。分区文件包含了采样的数据信息以及相应的分区信息,这可以用来指导MapReduce作业在进行Shuffle操作时将数据分配到不同的Reducer上。
在优化数据倾斜时,采样数据的目的是识别哪些数据可能会导致倾斜。通过对数据进行采样,可以分析采样数据的分布情况,进而确定哪些数据量较大或者分布不均匀。在这个例子中,使用了随机采样器,从输入数据中随机选择一定比例的数据(0.1,即10%),并采样的数据量为10000条。
通过分析采样数据,可以有助于识别数据倾斜的情况,从而采取相应的优化策略。例如,可以根据采样数据的分布情况来调整分区策略,使得数据更加均匀地分配到不同的Reducer上,从而减轻数据倾斜问题。
需要注意的是,虽然采样数据可以帮助识别数据倾斜问题,但它并不是解决数据倾斜的唯一方法。在实际应用中,可能还需要结合其他优化策略,如使用Combiner、使用合适的分区键、使用随机前缀等,来更全面地解决数据倾斜的影响。
2.7 过滤倾斜join单独进行join
假设有两个表:orders
和 customers
,其中 orders
表中的 customer_id
列是高基数列,可能导致数据倾斜。我们可以使用过滤倾斜Key单独进行Join的方式来解决这个问题。
下面是一个示例的SQL代码:
代码语言:javascript复制-- 识别倾斜Key
SELECT customer_id
FROM orders
GROUP BY customer_id
HAVING COUNT(*) > 10000; -- 举例,根据实际情况设定阈值
-- 拆分倾斜Key
CREATE TABLE skewed_orders AS
SELECT *
FROM orders
WHERE customer_id IN (identified_skewed_keys);
CREATE TABLE non_skewed_orders AS
SELECT *
FROM orders
WHERE customer_id NOT IN (identified_skewed_keys);
-- 单独处理倾斜Key
CREATE TABLE skewed_result AS
SELECT o.*, c.name
FROM skewed_orders o
JOIN customers c ON o.customer_id = c.customer_id;
CREATE TABLE non_skewed_result AS
SELECT o.*, c.name
FROM non_skewed_orders o
JOIN customers c ON o.customer_id = c.customer_id;
-- 合并结果
CREATE TABLE final_result AS
SELECT * FROM skewed_result
UNION ALL
SELECT * FROM non_skewed_result;
在这个示例中,我们首先识别出可能导致数据倾斜的 customer_id
值。然后,我们根据倾斜和非倾斜的情况,分别创建了两个临时表。接下来,对倾斜数据和非倾斜数据分别进行Join操作,并将结果存储在临时表中。最后,我们通过 UNION ALL 合并了倾斜和非倾斜数据的结果,得到最终的查询结果。
这种方法也适用于处理空值,思路是用where
将空值过滤掉,再使用union all
将带空值的数据进行关联。