一、什么是数据倾斜?
原理:在进行shuffle的时候,须将各个节点上相同的key拉取到某个节点上的一个task来进行处理,比如按照key进行聚合或join等操作。此时如果某个key对应的数据量特别大的话,就会发生数据倾斜。比如大部分key对应10条数据,但是个别key却对应了100万条数据,那么大部分task可能就只会分配到10条数据,然后1秒钟就运行完了;但是个别task可能分配到了100万数据,要运行一两个小时。因此,整个Spark作业的运行进度是由运行时间最长的那个task决定的。
表象:
MapReduce任务:主要表现在ruduce阶段卡在99.99%,一直99.99%不能结束,各种container报错OOM
Spark任务:单个Executor执行时间特别久,整体任务卡在某个stage不能结束,Executor lost,OOM,Shuffle过程出错。
二、业内数据倾斜的判断标准?
从执行时间倾斜度和数据倾斜度来观测:(比如执行时间倾斜度、数据量倾斜度均大于 2)
执行时间倾斜度定义为:所有并行节点执行时长的最大值 (Max) 与中位数 (Median) 的比值;(举例:执行时间倾斜 = 5.7mins(最大) - 12s(中位数) )
数据量倾斜度定义为:所有并行节点所分配的数据量的最大值 (Max) 与中位数 (Median) 的比值;(数据量倾斜
= 38MB(最大) - 204.8KB(中位数) )
三、如何解决倾斜?
常见处理方法汇总:
3.1 输入倾斜
方案实现原理:
在读orc表时,spark任务在创建map task时默认使用BI策略,BI策略是以文件为粒度进行split划分;ETL策略会将文件进行切分,多个stripe组成一个split;HYBRID策略为:当文件的平均大小大于hadoop最大split值(默认256M)时使用ETL策略,否则使用BI策略。
解决方案:
指定使用ETL策略:
spark.hadoop.hive.exec.orc.split.strategy=ETL;(该参数只对orc格式生效)
合并小文件:
spark.sql.mergeSmallFileSize=10485760(10M),有效减少map输入端倾斜
spark.hadoopRDD.targetBytesInPartition=67108864; (平台设置为:1M) 合并文件大小为64M
方案优缺点:
当ETL策略生效时,driver读取file footer等信息,若其footer(用于描述整个文件的基本信息、表结构信息、行数、各个字段的统计信息以及各个Stripe的信息)较大,可能会导致driver端OOM,因此这类表的读取建议使用BI策略。对于一些较小的尤其有数据倾斜的表(这里的数据倾斜指大量stripe存储于少数文件中),建议使用ETL策略。
3.2 shuffle倾斜
3.2.1、key倾斜程度轻微
方案实现原理:
增加shuffle read task的数量,让原本分配给一个task的多个key分配给多个task,从而让每个task处理比原来更少的数据。
解决方案:
spark.sql.shuffle.partitions = 4000 (默认500)
方案优缺点:
实现起来比较简单,可以有效缓解和减轻数据倾斜的影响。只是缓解了数据倾斜而已,没有彻底根除问题,其效果有限。
3.2.2、少数key倾斜严重
方案实现原理:
将导致数据倾斜的少数key过滤之后,这些key就不会参与计算了,自然不可能产生数据倾斜。
解决方案:
在Spark SQL中可以使用where子句过滤掉这些key或者在Spark Core中对RDD执行filter算子过滤掉这些key。
代码块
代码语言:javascript复制where key is not in('bigkey')
方案优缺点:
实现简单,而且效果也很好,可以完全规避掉数据倾斜。适用场景不多,大多数情况下,导致倾斜的key还是很多的,并不是只有少数几个。
3.2.3、reducebykey等聚合类shuffle算子
方案实现原理:
将原本相同的key通过附加随机前缀的方式,变成多个不同的key,就可以让原本被一个task处理的数据分散到多个task上去做局部聚合,进而解决单个task处理数据量过多的问题。接着去除随机前缀,再次进行全局聚合,就可以得到最终的结果。具体原理见下图。
解决方案:
将group by 产生的倾斜key 通过附加随机前缀的方式,进行聚合。
代码语言:sql复制select split(t.activity_id,'_')[1]
from
(select concat(cast(ceiling(rand(1)*10000) as int),'_',activity_id) activity_id -- 将activity_id打散10000倍
from table1
group by 1
) t --1阶段聚合
group by split(t.activity_id,'_')[1]; --2阶段聚合
方案优缺点:
对于聚合类的shuffle操作导致的数据倾斜,效果是非常不错的。通常都可以解决掉数据倾斜,或者至少是大幅度缓解数据倾斜,仅仅适用于聚合类的shuffle操作,适用范围相对较窄。
3.2.4、join类导致的key倾斜
3.2.4.1 维表小,将reduce join 变为map join
方案实现原理:
普通的join是会走shuffle过程的,而一旦shuffle,就相当于会将相同key的数据拉取到一个shuffle read task中再进行join,此时就是reduce join。但是如果一个RDD是比较小的,则可以采用广播小表 map算子来实现与join同样的效果,也就是map join,此时就不会发生shuffle操作,也就不会发生数据倾斜
解决方案:将小表进行广播
代码语言:javascript复制set hive.auto.convert.join = true; -- hive是否自动根据文件量大小,选择将common join转成map join
set hive.mapjoin.smalltable.filesize =25000000;
-- 大表小表判断的阈值,如果表的大小小于该值25Mb,则会被判定为小表。则会被加载到内存中运行,将commonjoin转化成mapjoin。一般这个值也就最多几百兆的样子。
代码语言:sql复制 select /* MAPJOIN(b) */
a.poi_id
from table a join b
方案优缺点:
对join操作导致的数据倾斜,效果非常好,这个方案只适用于一个大表和一个小表join的情况。
一般集群开启map join会自动进行广播,对于表是否被广播,需要读取表元数据信息。分区表在matestore里基本都是没有元数据的,取不到的话就走默认值了(取int最大值),临时表在matestore也没有存储表信息。因此对于分区表或者临时表,需要手动指定map join。
方案实现原理:
对于join导致的数据倾斜,如果只是某几个key导致了倾斜,可以将少数几个key分拆成独立RDD,并附加随机前缀打散成n份去进行join,此时这几个key对应的数据就不会集中在少数几个task上,而是分散到多个task进行join了。
解决方案:
将少数倾斜key取出来,并将对应的维表扩容n倍,非倾斜的key正常join
方案优缺点:
对于join导致的数据倾斜,如果只是某几个key导致了倾斜,采用该方式可以用最有效的方式打散倾斜的key进行join。而且只需要针对少数倾斜key对应的数据进行扩容n倍,不需要对全量数据进行扩容。避免了占用过多内存。
如果导致倾斜的key特别多的话,比如成千上万个key都导致数据倾斜,那么这种方式也不适合。
3.2.4.3 大量key倾斜严重,采样随机前缀和扩容RDD
方案实现原理:
将原先相同的key通过附加随机前缀变成不同的key,然后就可以将这些处理后的“不同的key”分散到多个task中去处理,而不是让一个task处理大量的相同key。而这一种方案是针对有大量倾斜key的情况,没法将部分key拆分出来进行单独处理,因此只能对整个RDD维表进行数据扩容,对内存资源要求很高。
解决方案:
将倾斜key对应的b表进行扩容n倍。
方案优缺点:
对join类型的数据倾斜基本都可以处理,而且效果也相对比较显著,性能提升效果非常不错。
该方案更多的是缓解数据倾斜,而不是彻底避免数据倾斜。而且需要对整个RDD进行扩容,对内存资源要求很高。维表会膨胀n倍。运行时间会有影响,可能会变多。
3.3.4.4 大量key倾斜严重,动态一分为二
方案实现原理:
对于倾斜的值和非倾斜的值分开处理,最后union all。
解决方案:
需要临时表存放倾斜的键,将表倾斜值扩容n倍,对于倾斜的key的维表进行mapjoin(广播)或者关联,非倾斜的key正常join
代码语言:sql复制-- 临时表存放买家超过50000的大卖家
INSERT OVERWRITE TABLE temp_b
SELECT t1.seller_id,t2.seller_level
FROM ( SELECT seller_id,count(buyer_id) as buyer_cnt
FROM table_a --- 有购买行为
GROUP BY seller_id
having count(buyer_id)>50000
) t1
inner JOIN
( SELECT seller_id,seller_level
FROM table_b
) t2
ON t1.seller_id = t2.seller_id
-- 对于倾斜的值和非倾斜的值分开处理,最后union all
SELECT buyer_id seller_level,sum(order_num ) as order_num
FROM (
SELECT
/* BROADCAST(t2) */
t1.buyer_id,t1.seller_level,sum(order_num)
FROM table_a t1
LEFT JOIN temp_b t2
ON t1.seller_id = t2.seller_id
group by 1,2
UNION ALL --针对大卖家map join 其他卖家正常join
SELECT t1.buyer_id,t4.seller_level,sum(order_num) order_num
FROM table_a t1
LEFT JOIN
(
SELECT seller_id,seller_level
FROM table_b t2
LEFT JOIN temp_b t3
ON t2.seller_id = t3.seller_id
WHERE t3.seller_id is null
) t4
ON t1.seller_id = t4.seller_id
) t
GROUP BY 1,2;
方案优缺点:
比较通用,自由度高,但是对于代码的更改最大,更改代码框架。
3.3 膨胀倾斜
方案实现原理:
在数据处理中有一种特殊的情况,两个多对多关系的表进行join,会发生数据膨胀。
解决方案:
在数据处理中应该尽可能的避免笛卡尔积,以及热点key的多对多关系。如果业务上确实需要多对多关系,可以从这几点考虑优化
- 能否去掉一些热点的大key
- 能否增加一些关联条件,减少最终的结果数据
- 能否在数据范围上做减少,对于笛卡尔积的关联需要把数据条数控制在1亿以内
- 如果是M*N(M>>N)的多对多关系,可以考虑把小表N广播出去,对于大表M切分成多个很小的数据分片,进行mapjoin