Spark 性能优化指南(官网文档)

2022-05-26 08:33:42 浏览数 (1)

本篇文章翻译之 Tuning Spark。

由于大多数Spark组件基于内存的特性,Spark程序可能会因为集群中的任何资源而导致出现瓶颈:CPU、网络带宽或内存。通常情况下,如果数据适合于放到内存中,那么瓶颈就是网络带宽,但有时,我们还是需要内存进行一些调优的,比如以序列化的形式保存RDDs,以便减少内存占用。

这篇调优指南主要涵盖两个主题:数据序列化和内存调优。数据序列化不仅可以优化网络性能,而且还可以减少内存的使用。

1、数据序列化 - Data Serialization

序列化在任何的分布式应用中都扮演着重要的角色。但是,如果将对象序列化成比较慢的格式,或者耗费大量字节的格式,都会大大降低计算速度。Spark在便利性(允许你使用任何Java类型)和性能之间取得平衡。它提供了两个序列化库:

  • Java serialization:默认情况下,Spark使用Java的ObjectOutputStream框架来序列化对象,而且可以使用任何你通过实现java.io.Serializable来创建的类。你还可以通过继承java.io.Externalizable来控制序列化的性能。Java序列化是灵活的,但通常很慢,而且对于很多类会导致大的序列化格式。
  • Kryo serialization:Spark也可以使用Kryo库(version 4)来更快的序列化对象。Kryo明显要比Java序列化更快,更紧凑,但不支持所有序列化类型,并且要求你提前注册你将在程序中使用的类,以获得最佳性能。

如何使用呢?

我们可以通过使用 SparkConf 初始化 job,并调用 conf.set(“spark.serializer”, “org.apache.spark.serializer.KryoSerializer”) 来使用 Kryo 序列化。这个设置配置的序列化器不仅可以用于 worker 节点之间的 shuffle 数据,还可以用于序列化到磁盘的 RDDs。Kryo 不是默认值的唯一原因是因为其要自定义注册,但是官方建议在任何大型网络密集计算应用中应该尝试使用它。

从 Spark2.0.0 开始,我们在基于基本数据类型、基本数据类型或字符串类型的数组来 shuffle RDDs 时,使用Kyro序列化器。

Spark 对于包含在 AllScalaRegistrar(Twitter chill library) 中的常用核心Scala类,都自动包含了Kryo序列化器。

使用 registerKryoClasses 方法,向 Kryo 注册您自己的自定义类。下面是示例:

代码语言:javascript复制
val conf = new SparkConf().setMaster(...).setAppName(...)
conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]))
val sc = new SparkContext(conf)

Kryo 文档描述了更高级的注册选项,比如添加自定义的序列化代码。

如果对象很大,我们可能需要增加配置(spark.kryoserializer.buffer)的值。这个值要足够大到能容纳你要序列化的最大的对象。

最后,如果我们没有注册自定义类,Kryo 将仍然生效,但是它将不得不存储每个对象的完整类名,那将会非常浪费。

2、内存调优 - Memory Tuning

调优内存时需要考虑三个因素:对象占用的内存数(可能想要将整个dataset放到内存中),访问这些对象的成本以及垃圾收集的开销。

默认情况下,Java 对象访问速度很快,但是,所消耗的存储空间要比实际的对象多消消耗 2~5 倍的空间。这是为什么呢?有以下几个原因:

  • 不同的Java对象都有一个"对象头",大约是16个字节,并包含指向其类的指针等信息。对于一个只有很少数据的对象(比如一个Int字段),对象头可能会比数据更大。
  • Java 字符串在其原始数据上大约有40个字节的开销(因为它们是将原始数据保存在字符数组中的,并且保存长度等额外的数据),由于字符串内部使用UTF-16编码,所以每个字符都存储为两个字节。因此,一个10字符的字符串可以很容易的消耗60个字节。
  • 通用集合类,如HashMap和LinkedList,使用链式数据结构,其中每个条目(例如Map.Entry)都有一个"wrapper"对象。这个对象不仅有对象头,还有指向列表中下一个对象的指针(通常每个指针8个字节)。
  • 基本数据类型的集合通常将它们存储为装箱对象,如java.lang.Integer。

下面将首先概述 Spark 的内存管理,然后讨论用户可以采取的具体策略,以便更有效地使用应用程序中的内存。我们将描述如何确定对象的内存使用,以及如何改进内存使用——通过改变数据结构,或以序列化格式存储数据。然后,我们将概括调优Spark的缓存大小和Java垃圾收集器。

2.1 内存管理 - Memory Management Overview

Spark 中的内存使用主要分为两类:execution 和 storage。Execution memory 指的是,在 shuffle、join、sort和 aggregation 中用于计算的内存,而storage memory 指的是用来在集群中缓存和传输内部数据的内存。Spark中,execution 和storage 共享一个统一的区域(M)。当没有execution memory被使用时,storage可以获取所有可用内存,反之,如果没有storage memory被使用时,execution也可以获取所有可用的内存。如果在Execution storage不够用时,可驱逐storage区域占用Execution区域的一部分内存,但仅在总的storage memory占用低于某个阈值®之前才会这么做。换句话说,R是M中的一个子区域,是在默认情况下分配给storage的内存,阈值R内缓存的块是永远不会被驱逐的。

这种设计确保了几个重要的特性。首先,不使用缓存的应用程序可以拿整个内存空间给execution用,从而避免不必要的磁盘溢出。其次,如果应用程序确实要使用缓存,可以保留一个最小的storage空间®,这里的数据块不会被驱逐。

虽然有两个相关的配置,但由于默认值已适用于大多数情况,一般用户是不需要调整这两个参数的:

  • spark.memory.fraction 代表统一共享区域M占Java堆内存-300MB的比例(默认是0.6)。剩余40%的空间是留给用户数据结构、Spark内部元数据和防止OMM用的。
  • spark.memory.storageFraction 代表R区域占M区域的比例(默认是0.5)。R中的缓存块时不会被Execution驱逐的。

spark.memory.fraction 的值应满足JVM老年代的堆空间大小。有关详细信息,请参考下面关于高级GC调优的讨论。

2.2 确定内存占用 - Determining Memory Consumption

衡量一个 dataset 所需内存的最好的方法就是创建一个 RDD,将其放入缓存中,然后到web UI中查看"Storage"页面。这个页面会告诉你,这个RDD占用了多少内存。要估计一个特定对象的内存占用,可以使用SizeEstimator的estimate方法,这对于尝试用不同的数据设计来调整内存使用是非常有用的,还可以确定广播变量在每个 executor 上占的堆大小。

2.3 数据结构调优 - Tuning Data Structures

减少内存消耗的第一种方法是,避免那些会增加开销的Java特性,比如基于指针的数据结构和包装对象。有几种方式可以做到这一点:

  1. 设计你的数据结构以优先选择对象数组和基本类型,而不是标准的Java或Scala集合类型(比如HashMap)。fastutil库为与Java标准库兼容的基本类型提供了方便的集合类。
  2. 尽可能避免使用包含大量小对象和指针的嵌套结构。
  3. 对于主键字段,考虑使用数字类型的ID或枚举对象来代替字符串。
  4. 如果内存少于32GB,可以设置JVM参数-XX: UseCompressedOops,来使指针由8个字节变为4个字节。您可以在spark-env.sh中添加这个选项。

2.4 序列化RDD存储 - Serialized RDD Storage

当进行了调优之后,对象太大还是无法有效地存储时,一个更简单的减少内存占用的方式就是使用RDD持久化API中的序列化存储级别(比如MEMORY_ONLY_SER)以序列化形式存储对象。Spark将每个RDD分区存储为一个大的字节数组。以序列化形式存储数据的唯一缺点就是访问时间慢,由于必须动态地反序列化对个对象。我们强烈建议使用Kryo,如果您想以序列化的形式缓存数据,因为它比Java序列化占用小的多的空间。

2.5 垃圾收集调优 - Garbage Cllection Tuning

当我们的应用程序存储了大量的RDD时,JVM垃圾收集可能会成为问题。

当Java需要驱逐旧对象来为新对象腾出空间时,它将跟踪所有Java对象,并找到未使用的对象。这里要记住的要点是,垃圾收集的成本与Java对象的数据成正比,使用更小对象的数据结构(比如,用int类型的数组代替LinkedList)可以大大降低垃圾收集的成本。

一个更好的方法是以序列化的形式持久化对象,如上所述:现在每个RDD分区只有一个对象(一个字节数组)。如果存在GC问题,在尝试使用其他技术之前,首先要尝试使用序列化缓存。

由于任务工作内存(运行task所需的内存空间)和缓存在节点上的RDD之间存在冲突,也可能会导致GC问题。我们将讨论如何控制分配给RDD的缓存空间来缓解这种问题。

2.5.1 衡量GC影响 - Measuring the Impact of GC

GC调优的第一步是收集统计垃圾收集的频率和GC所耗费的时间。这可以通过添加Java gc选项-XX: PrintGCDetails和-XX: PrintGCTimeStamps来实现。(有关给Spark job传递Java选项的信息,请查看configuration guide)。在下次Spark job运行时,您将在发生垃圾收集时看到被打印到work检点上的日志信息。注意,这些GC日志是打印在集群的worker节点而不是driver节点。

2.5.2 高级GC调优策略 - Advanced GC Tuning

为了更进一步地调优垃圾收集,我们首先需要了解一些关于JVM内存管理的基本信息:

  • Java堆空间被划分为年轻代和年老代两个区域。年轻代用来保存存活时间短的对象,而年老代保存寿命更长的对象。
  • 年轻代被进一步划分成Eden,Survivor1和Survivor2三个区域。
  • 垃圾收集过程的简单描述:当Eden空间已满时,会在Eden空间触发一次minor GC,然后将Eden和Survivor1中仍然存活的对象复制到Survivor2区域。如果一个对象达到了所设定的最大年龄或者Survivor2区满了,就会将对象移动到年老代。最终,当年老代空间快要满了时,将会触发一次full GC。

Spark中进行GC调优的目标是确保只有存活时间长的RDD存储在年老代,年轻代足以存储存活时间短的对象。这将有助于避免full GC去收集任务执行期间创建的临时对象。下面是一些有用的GC调优方法:

  • 通过收集GC统计信息来检查是否有太多的垃圾收集发生。如果在一个task执行完成之前,触发了多次full GC,这意味着没有足够的内存可用来执行tasks。
  • 如果触发了太多的minor GC,而没有太多major GC,那么为Eden区分配更多内存将会有所帮助。您可以将Eden区的大小设置为高于每个task预估所占用的内存。如果Eden区的大小被确定为E,那么可以使用选项-Xmn=4/3*E来这是年轻代的大小。
  • 在打印的GC统计信息中,如果发现年老代将要满了,则通过降低spark.memory.fraction来减少用于缓存的内存占用;缓存更少的对象比降低task的执行速度要更好。或者,考虑减少年轻代的大小。如果你已经设置了-Xmn的值,这意味着降低它的大小。如果没有设置-Xmn的值,尝试盖面JVM的NewRatio参数的值,许多JVM将这个参数的默认值设为2,这表明年老代占整个堆空间的2/3,它应该足够大,以超过spark.memory.fraction的值。
  • 尝试使用G1GC垃圾收集器-XX: UseG1GC。它可以在垃圾收集成为瓶颈的情况下提高性能。注意,对于那些堆内存大的executor来说,增加G1 的region size(-XX:G1HeapRegionSize)可能很重要。
  • 举个例子,如果您的task是从HDFS读取数据,那么就可以使用从HDFS读取数据的block大小来估计这个task所使用的内存。需要注意的是,block解压缩之后的大小通常是原来的2或3倍。因此,如果我们希望有3或4个task的工作空间,并且HDFS block大小为128MB,我们就可以估算Eden区大小为43128。
  • 监视垃圾收集的频率和时间如何随着设置的变化而变化。

我们的经验表明,GC调优的效果取决于你的应用程序和可用内存的大小。网上有许多调优选项,但是管理full GC发生的频率有助于减少开销。

3、其他优化技巧 - Other Considerations

3.1 任务并行度 - Level of Parallelism

除非为每个操作设置足够高的并行度,否则集群资源不会得到充分利用。Spark根据每个文件的大小自动设置要在每个文件上运行的map task的数量。对于分布式的reduce操作,例如groupByKey和reduceByKey,它使用最大的父RDD的分区数。你可以将并行度作为第二个参数传递,或设置属性spark.default.parallelism来更改默认值。通常,我们建议集群中每个CPU xore执行2-3个task。

3.2 reduce端task内存占用 - Memory Usage of Reduce Tasks

有时候,我们的应用程序发生OOM错误并不是因为RDD无法放入内存中,而是因为其中一个task的工作集太大,例如groupByKey中的一个reduce task数据太多。Spark的shuffle操作(sortByKey,groupByKey,reduceByKey,join等)在每个task中构建了一个hash table来执行聚合分组,这通常会包含大量的数据。缓解这种情况最简单的方法就是增加并行度,这样每个task的处理的数据就会变少。Spark可以有效地支持短至200ms的task,因为它可以对许多tasks重用一个executor JVM,而且启动task成本很低,因此你可以安全将并行度增加到集群core数量以上。

3.3 广播大变量 - Broadcasting Large Variables

使用SparkContext中的广播功能可以极大地减少每个序列化task的大小和集群启动job的成本。如果你的task使用了driver端任何的大对象,可以考虑将这些对象转换为广播变量。Spark在master节点打印每个task的序列化大小,因此您可以查看来确定task是否太大,一般来说,大于20KB的task值得去优化。

3.4 数据本地性 - Data Locality

数据所在的位置对Spark作业的性能有很大的影响。如果数据和要处理数据的代码在同一个地方,那么计算速度往往就很快。但是,如果代码和数据不在同一个地方,那么其中一个必须移动到另外一个所在的地方。通常情况下,移动代码比移动数据要快得多,因为代码的大小要比数据小的多。Spark就是根据这种原则来进行调度的。

数据所在的位置就是指数据与处理数据的代码之间的距离。根据数据当前的位置,有几个级别的距离,按顺序从最近到最远:

  • PROCESS_LOCAL 数据和运行代码位于同一个JVM中。这是最好的情况。
  • NODE_LOCAL 数据和运行代码位于同一个节点。这会比PROCESS_LOCAL 慢一点,因为数据要在进程之间传输。
  • NO_PREF 从任何地方访问数据都是一样快的。
  • RACK_LOCAL 数据位于同一个服务器机架上。数据位于同一机架的不同服务器上,因此需要通过网络传输数据,通常是经过一个交换机。
  • ANY 数据位于其他机架上。

Spark会优先调度task在最佳的位置级别,但这并不总是可能的。在任何空闲executor上都没有未处理的数据的情况下,Spark会切换到更低的位置级别。有两种选择:a) 等待CPU空闲下来,在同一服务器上启动一个task,或b) 立即在远端启动一个task,并要求将数据移动到那里。 Spark通常的策略就是,先等待一段时间,希望繁忙的CPU能得到释放,一旦超过指定时间,就开始将数据从远端移动到空闲的CPU。每个位置级别之间的超时时间都可以单独配置,也可以全部配置在一个参数中。关于spark.locality参数的详细信息,请查看configuration page。如果您的tasks运行时间很长并且位置级别很差,那么可以增加配置的值,但是默认的设置通常就能满足多数的情况。

4、总结 - Summary

这篇简短的调优指南指出了在调优Spark应用程序时,应该关注的主要的点——最重要的是数据序列化和内存调优。对于大多数应用程序,切换到Kryo序列化,并以序列化的形式持久化数据就能解决大多数常见的性能问题。

参考

Tuning Spark

0 人点赞