建议收藏!详细解析如何对spark进行全方位的调优

2021-08-16 18:09:19 浏览数 (1)

前言:

Apache Spark 是专为大数据处理而设计的快速的计算引擎,Spark拥有Hadoop MapReduce所具有的优点;但不同于MapReduce的是—spark的输出结果可以保存在内存中,不用再进行HDFS的读写,因此Spark被广泛用于机器学习跟需要迭代计算类的算法。但是面对大量需要处理的数据,要让Spark稳定快速的运行,这就需要对Spark进行全方位的调优,从而在工作中拥有更高的处理效率。本篇文章主要对Spark如何进行全方位的调优进行阐述

主要从下面几点对Spark进行调优:

1.避免RDD重复创建

RDD是一个编程模型,是一种容错的,并行的数据结构,可以让用户显示的将数据储存在磁盘与内存中,并且可以控制数据的分区。RDD一个很重要的特性就是可以相互依赖,如果RDD的每个分区只可以被一个子RDD分区使用,则称之为窄依赖,可以被多个RDD分区使用则称之为宽依赖。我们在进行一个Spark作业的时候,一般会读取一个数据源作为一个初始的RDD,之后以此RDD为开始得到后面需要的RDD,形成一个RDD关系链。

在进行RDD创建的时候要避免RDD的重复创建,也就是不要对一份数据进行创建多个相同的RDD。重复创建RDD会对Spark带来更大的性能开销,如下:

代码语言:javascript复制
//错误的创建RDD的方式
val rdd1 = sc.textFile("hdfs://localhost:9000/test.txt")
rdd1.map()
val rdd2 = sc.textFile("hdfs://localhost:9000/test.txt")
rdd2.reduce()

//正确的RDD创建方式形成一条RDD链
val rdd1 = sc.textFile("hdfs://localhost:9000/test.txt")
val rdd2=rdd1.map()
rdd2.reduce()
2.日志收集:

在执行作业的时候报错可以说是每一个技术人员都会遇到的事情,Spark提供的作业日志就可以很好的帮助我们对出现的问题进行定位。Spark日志通常是排错的唯一根据。一般的报错我们可以从Spark的Driver日志中进行定位。也可以通过yarn-client的模式执行,将日志输出到客户端,方便我们进行查看,如下:

代码语言:javascript复制
./bin/spark-submit 
--class org.apache.spark.examples.SparkPi 
--master yarn 
--deploy-mode client 
--executor-memory 20G 
--num-executors 50 
/app/spark-2.2/examples/jars/xx.jar  //这里指定自己路径的jar包

根据运行的ID号可以查看到日志

使用这种方式进行报错日志的定位往往是最有效的解决问题的办法。

3.提高Shuffle性能

Shuffle表示数据从Map Task输出到Reduce Task输入的这段过程。shuffle是连接Map和Reduce之间的桥梁,Map的输出要用到Reduce中必须经过shuffle这个环节,shuffle的性能高低直接影响了整个程序的性能和吞吐量。同时,Shuffle也是Spark进行作业的时候很关键的一个环节,也是对Spark进行性能调优的一个重点,下面是Spark进行词频统计作业时候的Map Reduce的过程

Spark中与Shuffle性能有关的参数:

代码语言:javascript复制
spark.shuffle.file.buffer
spark.reducer.maxSizeInFlight
spark.shuffle.compress

1.第一个配置是Map端输出结果的缓冲区大小。

2.第二个配置是Map端输出结果文件的大小。

3.第三个配置是Map端是否开启压缩

第一个配置当然是越高越好,缓冲区越大,数据写入的性能可想而知也是会越高的,所以如果机器条件优越的情况下,这个可以尽可能的调大,来提高Shuffle性能,在Reduce的过程中Reduce的Task所在的位置会按照spark.reducer.maxSizeInFlight的配置大小去拉取文件,之后用内存缓冲区来接收,所以提高spark.reducer.maxSizeInFlight的参数大小也是可以提高Shuffle的效率的。第三个配置一般都是默认开启的,默认对Map端的输出进行压缩操作。

4.Spark作业并行程度

在Spark作业进行的时候,提高Spark作业的并行程度是提高运行效率的最有效的办法。那么我们应该要明确spark中的并行度是指什么?spark中的并行度指的就是各个stage里面task的数量。

代码语言:javascript复制
spark.default.parallelism

textfile()

可以根据地2个参数来设置该作业的并行度。Spark任务的RDD一开始的分区数量时与HDFS上的数据块数量保持一致的,通过coalesce 与 repartition 算子可以进行重分区,但是这个操作并不可以改变Rdeduce的分区数,改变的只是Map端的分区数量,想要对Reduce端的分区数量进行修改,就可以对spark.default.parallelism配置进行修改。通过在官网的描述中,设置的并行度为这个application 中cpu-core数量的2到3倍为最优。

5.内存管理

Spark作业中内存的主要用途就是计算跟储存。Spark在执行程序的时候,集群就会启动Driver和Executor两种JVM进程,Driver为主控进程,Executor负责执行具体的计算任务。 spark的进程是JVM进程,所以Executor的内存管理是建立在 JVM 的内存管理之上,所以还涉及到了堆的概念,内存受到 JVM 统一管理这一点就导致spark释放内存的时候收到限制,所以Spark引入了堆外内存。

只要是在Executor内运行的任务一律共享 JVM 堆内存,按照用途主要可以分为三大类:Storage负责缓存数据和广播变量数据,Execution负责执行Shuffle过程中占用的内存,剩下空间则是储存Spark内部的对象实例。Spark虽然不可以精准的对堆内存进行控制,但是通过决定是否要在储存的内存里面缓存新的RDD,是否为新的任务分配执行内存,也可以提高内存的利用率,相关的参数配置如下:

代码语言:javascript复制
spark.memory.fraction
spark.memory.storageFraction

更改参数配置spark.memory.fraction可调整storage executor总共占内存的百分比,更改配置spark.memory.storageFraction可调整storage占二者内存和的百分比,这两个参数一般使用默认值就可以满足我们绝大部分的作业的要求了。

再说说Spark的堆外内存,为了提高Spark内存的使用以及提高Shuffle时的效率,Spark引入了堆外(Off-heap)内存。在默认的情况下堆外内存是不会启用的,可以通过如下参数进行开启:

代码语言:javascript复制
spark.memory.offHeap.enabled

Spark Executor可以通过参数spark.yarn.executor.memoryoverhead 进行配置,最小为 384MB,默认为 Executor 内存的 10%。配置堆外内存大小的参数为spark.memory.offHeap.size,堆外内存与堆内存的划分方式其实是相同的,用户需要知道每个部分的大小如何调节,才能针对场景进行调优,这个对于普通用户来说其实不是特别的友好。

6.Spark数据缓存

Spark速度非常快的原因之一,就是在不同操作中可以在内存中持久化或缓存数据集。RDD通过persist方法或cache方法可以将前面的计算结果进行缓存,但是要注意的是并不会马上进行缓存,而是触发后面的action动作的时候,RDD才会被缓存在计算节点的内存中。如果某一份数据要被重复使用的时候,就可以使用cache算子进行缓存,可以达到很不错的效果

7.对filter产生的分区进行聚合

Spark 的filter算子主要用于数据过滤。返回一个新的RDD,该RDD由经过func函数计算后返回值为true的输入元素组成。在执行这个算子的时候数据一般会被拆分成多个分区,这些分区也会影响到后面的计算,所以在执行这个算子的时候用 coalesce 算子进行一次合并,也可以对作业的执行速度达到提升。

8.处理数据倾斜

数据倾斜是数据处理作业中一个很常见的问题。 正常情况下,数据通常都会出现数据倾斜的问题,只不过严重程度不一。数据倾斜的症状是大量数据集中到一个或者几个任务里,导致这几个任务计算大量数据,拖慢整个作业的执行速度,这里给大家详细分析一下数据倾斜是怎么出现的。

我们都知道在执行shuffle操作的时候,程序是按照key进行统计聚合,来进行values的数据的输出、拉取和聚合的。同一个key的values,会被程序分配到一个Reduce task进行处理。但是这个时候如果你处理的是大量的数据,问题可能就要出现了。大量数据必定有多个Key,多个Key对应的values,举个例子假如一共100万数据。可能程序执行后,某个聚合的key对应了98万数据,这些数据全部被分配到一个task上去面去执行。另外两个task,可能各分配到了1万数据。也可能是几百个key被对应到了剩余的两万条数据,这个时候数据倾斜的问题就出现了,而且对于Spark作业的整体性能来说是及其不乐观的。场景如下:

对应数据发生倾斜的情况,可以采用如下几种解决办法:

1.对源数据进行聚合

Spark中一些用于聚合操作的算子,比如groupByKey、reduceByKey,这些算子都是要去拿到每个key对应的values进行计算的。在一些大数据量的计算中,我们可以找到数据的一些维度进行一步聚合,比如说是时间维度的年月日,城市的地区等等,聚合了第一个维度之火再进行下一步的聚合

2.对脏数据进行首先过滤

对应源数据处理中,必定是会存在很多脏数据,这个也是导致数据倾斜的重要原因之一,这个时候我们需要第一步将脏数据进行过滤

3.使用广播变量

在作业进行连接操作的时候,我们可以将小表通过广播变量进行广播,这样可以避免Shuffle过程,让数据相对比较均匀的分布在Map任务。

4.提高作业的并行度

这个方式在前面我们也说到过如何进行参数配置,但是要注意的是,这个配置只是提高浏览作业的运行速度,但是并不能从根本上解决数据倾斜的问题。

5.使用随机Key进行双重聚合

groupByKey、reduceByKey比较适合使用这种方式。join操作通常不会这样来做。

到这里,相信大家对与Spark如何进行调优也有了全新的认识!

0 人点赞