Spark 的性能分析和调优很有意思,今天再写一篇。主要话题是 shuffle,当然也牵涉一些其他代码上的小把戏。
以前写过一篇文章,比较了几种不同场景的性能优化,包括 portal 的性能优化,web service 的性能优化,还有 Spark job 的性能优化。Spark 的性能优化有一些特殊的地方,比如实时性一般不在考虑范围之内,通常我们用 Spark 来处理的数据,都是要求异步得到结果的数据;再比如数据量一般都很大,要不然也没有必要在集群上操纵这么一个大家伙,等等。事实上,我们都知道没有银弹,但是每一种性能优化场景都有一些特定的 “大 boss”,通常抓住和解决大 boss 以后,能解决其中一大部分问题。比如对于 portal 来说,是页面静态化,对于 web service 来说,是高并发(当然,这两种可以说并不确切,这只是针对我参与的项目总结的经验而已),而对于 Spark 来说,这个大 boss 就是 shuffle。
首先要明确什么是 shuffle。Shuffle 指的是从 map 阶段到 reduce 阶段转换的时候,即 map 的 output 向着 reduce 的 input 映射的时候,并非节点一一对应的,即干 map 工作的 slave A,它的输出可能要分散跑到 reduce 节点 A、B、C、D …… X、Y、Z 去,就好像 shuffle 的字面意思 “洗牌” 一样,这些 map 的输出数据要打散然后根据新的路由算法(比如对 key 进行某种 hash 算法),发送到不同的 reduce 节点上去。(下面这幅图来自 《Spark Architecture: Shuffle》)
为什么说 shuffle 是 Spark job 的大 boss,就是因为 Spark 本身的计算通常都是在内存中完成的,比如这样一个 map 结构的 RDD:(String, Seq),key 是字符串,value 是一个 Seq,如果只是对 value 进行一一映射的 map 操作,比如(1)先计算 Seq 的长度,(2)再把这个长度作为元素添加到 Seq 里面去。这两步计算,都可以在 local 完成,而事实上也是在内存中操作完成的,换言之,不需要跑到别的 node 上去拿数据,因此执行的速度是非常快的。但是,如果对于一个大的 rdd,shuffle 发生的时候,就会因为网络传输、数据序列化/反序列化产生大量的磁盘 IO 和 CPU 开销。这个性能上的损失是非常巨大的。
要减少 shuffle 的开销,主要有两个思路:
- 减少 shuffle 次数,尽量不改变 key,把数据处理在 local 完成;
- 减少 shuffle 的数据规模。
先去重,再合并
比如有 A、B 这样两个规模比较大的 RDD,如果各自内部有大量重复,那么二者一合并,再去重:
代码语言:javascript复制A.union(B).distinct()
这样的操作固然正确,但是如果可以先各自去重,再合并,再去重,可以大幅度减小 shuffle 的开销(注意 Spark 的默认 union 和 Oracle 里面的 “union all” 很像——不去重):
代码语言:javascript复制A.distinct().union(B.distinct()).distinct()
看起来变复杂了对不对,但是当时我解决这个问题的时候,用第二种方法时间开销从 3 个小时减到 20 分钟。
如果中间结果 rdd 如果被调用多次,可以显式调用 cache() 和 persist(),以告知 Spark,保留当前 rdd。当然,即便不这么做,Spark 依然存放不久前计算过的结果(以下来自官方指南):
Spark also automatically persists some intermediate data in shuffle operations (e.g. reduceByKey), even without users calling persist. This is done to avoid recomputing the entire input if a node fails during the shuffle. We still recommend users call persist on the resulting RDD if they plan to reuse it.
数据量大,并不一定慢。通常情况下,由于 Spark 的 job 是放到内存里面进行运算的,因此一个复杂的 map 操作不一定执行起来很慢。但是如果牵涉到 shuffle,这里面有网络传输和序列化的问题,就有可能非常慢。
类似地,还有 filter 等等操作,目的也是要先对大的 RDD 进行 “瘦身” 操作,然后在做其他操作。
mapValues 比 map 好
明确 key 不会变的 map,就用 mapValues 来替代,因为这样可以保证 Spark 不会 shuffle 你的数据:
代码语言:javascript复制A.map{case (A, ((B, C), (D, E))) => (A, (B, C, E))}
改成:
代码语言:javascript复制A.mapValues{case ((B, C), (D, E)) => (B, C, E)}
用 broadcast filter 来代替 join
这种优化是一种特定场景的神器,就是拿大的 RDD A 去 join 一个小的 RDD B,比如有这样两个 RDD:
- A 的结构为 (name, age, sex),表示全国人民的 RDD,超大
- B 的结果为 (age, title),表示 “年龄 -> 称号” 的映射,比如 60 岁有称号 “花甲之年”,70 岁则是 “古稀之年”,这个 RDD 显然很小,因为人的年龄范围在 0~200 岁之间,而且有的 “年龄” 还没有 “称号”
现在我要从全国人民中找出这些有称号的人来。如果直接写成:
代码语言:javascript复制A.map{case (name, age, sex) => (age, (name, sex))}
.join(B)
.map{case (age, ((name, sex), title)) => (name, age, sex)}
你就可以想象,执行的时候超大的 A 被打散和分发到各个节点去。而且更要命的是,为了恢复一开始的 (name, age, sex) 的结构,又做了一次 map,而这次 map 一样导致 shuffle。两次 shuffle,太疯狂了。但是如果这样写:
代码语言:javascript复制val b = sc.broadcast(B.collectAsMap)
A.filter{case (name, age, sex) => b.values.contains(age)}
一次 shuffle 都没有,A 老老实实待着不动,等着全量的 B 被分发过来。
另外,在 Spark SQL 里面直接有 BroadcastHashJoin,也是把小的 rdd 广播出去。
不均匀的 shuffle
在工作中遇到这样一个问题,需要转换成这样一个非常巨大的 RDD A,结构是 (countryId, product),key 是国家 id,value 是商品的具体信息。当时在 shuffle 的时候,这个 hash 算法是根据 key 来选择节点的,但是事实上这个 countryId 的分布是极其不均匀的,大部分商品都在美国(countryId=1),于是我们通过 Ganglia 看到,其中一台 slave 的 CPU 特别高,计算全部聚集到那一台去了。
找到原因以后,问题解决就容易了,要么避免这个 shuffle,要么改进一下 key,让它的 shuffle 能够均匀分布(比如可以拿 countryId 商品名称的 tuple 作 key,甚至生成一个随机串)。
明确哪些操作必须在 master 完成
如果想打印一些东西到 stdout 里去:
代码语言:javascript复制A.foreach(println)
想把 RDD 的内容逐条打印出来,但是结果却没有出现在 stdout 里面,因为这一步操作被放到 slave 上面去执行了。其实只需要 collect 一下,这些内容就被加载到 master 的内存中打印了:
代码语言:javascript复制A.collect.foreach(println)
再比如,如果遇到 RDD 操作嵌套的情况,通常考虑优化掉,因为只有 master 才能去理解和执行 RDD 的操作,slave 只能处理被分配的 task 而已。比如:
代码语言:javascript复制A.map{case (keyA, valueA) => doSomething(B.lookup(keyA).head, valueA)}
就可以用 join 来代替:
代码语言:javascript复制A.join(B).map{case (key, (valueA, valueB)) => doSomething(valueB, valueA)}
用 reduceByKey 代替 groupByKey
这一条应该是比较经典的了。reduceByKey 会在当前节点(local)中做 reduce 操作,也就是说,会在 shuffle 前,尽可能地减小数据量。而 groupByKey 则不是,它会不做任何处理而直接去 shuffle。当然,有一些场景下,功能上二者并不能互相替换。因为 reduceByKey 要求参与运算的 value,并且和输出的 value 类型要一样,但是 groupByKey 则没有这个要求。
有一些类似的 xxxByKey 操作,都比 groupByKey 好,比如 foldByKey 和 aggregateByKey。
另外,还有一条类似的是用 treeReduce 来代替 reduce,主要是用于单个 reduce 操作开销比较大,可以条件 treeReduce 的深度来控制每次 reduce 的规模。
文章未经特殊标明皆为本人原创,未经许可不得用于任何商业用途,转载请保持完整性并注明来源链接 《四火的唠叨》