如何调优Spark Steraming

2021-05-31 10:52:30 浏览数 (1)

云计算和大数据密不可分,这里有必要详细讨论下我的老本行——大数据领域。未来几年,我们将很荣幸地见证大数据技术的容器化。首先我们用几篇文章深入地了解一下大数据领域的相关技术。

1. 背景和简介

Spark Streaming是Spark的一个组件,它把流处理当作离散微批处理,被称为离散流或DStream。Spark的核心是RDD,即弹性分布式数据集。RDD本质上是将数据分区(Partition)封装起来。而DStream是一个由时间驱动、逻辑封装的RDD。以下面这段代码为例:

代码语言:javascript复制
def main(args: Array[String]): Unit = {
 
    val logger = LoggerFactory.getLogger(PreJoin.getClass)
 
 Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
 
    val conf = new SparkConf().setMaster("local[2]").setAppName("PreJoin")
 
 //    val conf = new SparkConf().setAppName("PreJoin")
 
 /** 限制每秒钟从topic的每个partition最多消费的消息条数 */
 
    conf.set("spark.streaming.kafka.maxRatePerPartition", "500")
 
    conf.set("spark.streaming.kafka.consumer.cache.enabled", "false")
 
 // 设置每五秒更新一下
 
    val ssc = new StreamingContext(conf, Seconds(5))
 
    val sparkSession = SparkSession.builder().config(conf).getOrCreate()
 
 import sparkSession.implicits._
 
 var zookeeperservers = ""
 
 //  hbase的相关配置
 
    val config: Configuration = HBaseConfiguration.create()
 
    val newAPIJobConfiguration = Job.getInstance(config)
 
 // 读取Kafka
 
    val topics = Array("result1")
 
    val stream = KafkaUtils.createDirectStream[String, String](
 
      ssc,
 
 PreferConsistent,
 
 Subscribe[String, String](topics, kafkaParams)
 
 )
 
 // 读取另外两个topic
 


 
 // 每30秒一个滑动窗口处理30秒内到达的数据,将Json数据解析出来
 
    val resultJson = stream.map(record => record.value).window(Seconds(30), Seconds(30)).transform(rdd => {
 
 ...
 
 })
 
 // resultJson2和resultJson3类似
 
    val resultJson2 =
 
    val resultJson3 = 
 
 // 合并流
 
    val resultJsonAll = ssc.union(Seq(resultJson2, resultJson, resultJson3))
 
 // 遍历每个RDD,存入HBase
 
    resultJsonAll.foreachRDD(rdd => {
 
      val hbasePuts3 = data3.rdd.map((row: Row) => {
 
 ...
 
 })
 
      hbasePuts3.saveAsNewAPIHadoopDataset(newAPIJobConfiguration3.getConfiguration)
 
 })
 
    ssc.start() // 开始计算
 
    ssc.awaitTermination()
 
 } // 等待停止计算
 

上面这段代码描述了一个典型的Spark Streaming 的处理流程。它的功能是从Kafka拉取数据,经过一系列的转换,将结果存入HBase。我们可以看到流处理应用程序和批处理应用程序的一些区别。批处理应用程序拥有清晰的生命周期,它们一旦处理了输入文件就完成了执行。而上面的流处理应用程序的执行没有开始和停止的标记。

几个决定Spark Streaming应用程序生命周期的方法

方法

描述

start()

开始执行应用程序

awaitTermination()

等待应用程序终止

stop()

强制应用程序停止执行

一个Spark应用程序的执行过程如下图

Yarn-Cluster运行模式执行过程

spark 控制进程

守护进程(Daemon)

描述

Driver(驱动程序)

包含SparkContext实例的应用程序入口点

Master(主进程)

负责调度和资源编排

Worker(子进程)

负责节点状态和运行执行器

Executor(执行器)

根据作业分配,负责执行该作业派发的任务

为了减少网络流量,强烈建议在集群机器上运行驱动程序,例如在Master节点,特别是需要驱动程序从Worker中提取数据的情况。

Spark分层执行结构

实体

描述

Application(应用程序)

SparkContext的一个实例

Job(作业)

一个Action后执行的一组阶段

Stage(阶段)

在shuffle内的一组转换

Task set(任务组)

来自同一组阶段的任务组

Task(任务)

一个阶段里的执行单元

有了上面的背景,我们下面便从几个方面来讨论下Spark Streaming的优化。

2. 调优

2.1 并行化

2.1.1 执行器Executor

num-executors

执行器是一个在每个Worker上执行的JVM进程。那么如何选择执行器的数量呢?理论上来说,既然executor是JVM进程,应该多一点才好。但是我们在选择executor数量的时候,有几条经验可供参考:

  • 为每个节点上的操作系统和其他服务留出一些资源
  • 如果在YARN上运行,也占用应用程序Master

executor-memory

该参数用于设置每个Executor进程的内存,Executor内存的大小,很多时候直接决定了Spark作业的性能。根据团队的资源队列的最大内存限制是多少, num-executors乘以 executor-memory,不能超过队列的最大内存量。建议申请的内存量最好不要超过资源队列最大总内存的1/3~1/2。

executor-cores

该参数置每个Executor进程的CPU core数量。这个参数决定了每个Executor进程并行执行task线程的能力。根据自己的资源队列的最大CPU core限制是多少,再依据设置的Executor数量,来决定每个Executor进程可以分配到几个CPU core。建议 num-executors * executor-cores不要超过队列总CPU core的1/3~1/2左右。

2.1.2 任务(Task)

Spark中的task是执行的单元。任务以线程而不是执行器 的进程执行。每个DStream由RDD组成,而RDD又由分区组成。每个分区是一块独立的数据,由一个任务操作。因为一个RDD中的分区数与任务数之间存在几乎一对一的映射。也就是说,DStream并行度是分区数的函数。该分区数取决于依赖关系类型:跨过DStream时如果落在窄依赖区,分区数保持不变,但经过shuffle区由于宽依赖的缘故,这个分区数会发生改变。

shuffle的分区数由 spark.default.parallelism决定,或者如果 spark.default.parallelism未设置,则由构成父DStream的RDD中的最大分区数决定。

实现完全优化的并行度的最佳方法,就是不断试错,和常规Spark应用的调优的方法一样,控制逐渐增加分区的个数,每次将分区数乘以1.5,直到性能停止改进位置。这可以通过Spark UI 进行校准。

综上从Executor和Task的角度,得到Spark Streaming 的一些优化方法,提交Spark作业的脚本大概为:

代码语言:javascript复制
 ./spark-submit 
 
 --master yarn 
 
 --num-executors 30 
 
 --executor-memory 6G 
 
 --executor-cores 2 
 
 --driver-memory 1G 
 
 --conf spark.default.parallelism=120 
 
 --conf spark.storage.memoryFraction=0.5 
 
 --conf spark.shuffle.memoryFraction=0.3 
 
 --class com.example.PreJoin Stream-1.0-SNAPSHOT-jar-with-dependencies.jar
 
2.1.3 创建更多的输入DStream和Receive

每个输入DStream都会在某个Worker的Executor上启动一个Receiver,该Receiver接收一个数据流。因此可以通过创建多个DStream达到接收多个数据流的效果。

比如,一个接收多个Kafka Topic的输入DStream,可以拆分成多个输入DStream,每个分别接收一个topic的数据。这样就会创建多个Receiver,从而并行地接收数据,提高吞吐量。然后多个DStream使用union算子进行合并,从而形成一个DStream。核心代码如下:

代码语言:javascript复制
// 合并流
 
val resultJsonAll = ssc.union(Seq(resultJson2, resultJson, resultJson3))
 

2.2 批处理间隔

批处理间隔是另一个直接影响性能的重要因素。它确定了微批处理的周期,也就是规定了每个微批处理能够通过的数据量。批处理间隔设置得太高则每个批处理会有高延迟,设置得太低则导致资源利用不足。

理想的状况是能够以线路速率处理数据,例如数据源每300毫秒发送一次,那么我们也可以这样假设:处理管道数据的延迟时间也为300毫秒。

如何设置批处理间隔,最好采取的策略是每次试验都从高值开始,比如1.5倍。Spark日志可用于计算系统的稳定性,即批处理间隔能否跟上数据速率。在日志中查找 Totaldelay总延迟。如果此值保持接近批处理间隔,则系统是稳定的。否则尝试增加2.1所述的并行化来减少管道的延迟

假如日志如下:

代码语言:javascript复制
JobScheduler: Total delay: 2.236 s for time 1581859740000 ms (execution: 2.256 s)
 

这表示该批处理的总延迟为3秒,小于批处理间隔。假设在某些时候可能会出现数据峰值,那么5秒是个不错的值。此外还可以通过Spark UI了解每阶段的延迟细目。Spark UI我们会在另一篇文章详细介绍。

2.3 内存

RDD基于内存计算,在内存中缓存所有内容,会给堆和垃圾收集器增加很大压力。如果应用程序的堆空间不足,可以增加 spark.executor.memory。此外有一些情况,Spark还会使用堆外内存,例如Java NIO采用的字节缓冲区。在YARN上,这个额外的内存分配由 spark.yarn.executor.memoryOverhead处理,默认值为 max(executorMemory*0.10384)。如果应用程序使用大量的堆外内存,那么应该增加这个因子。

一般来说,增加堆大小或堆外内存属于最后才会考虑的操作。我们首要的目标是减少应用程序的内存占用。下面介绍实现这一目标的三种方法。

2.3.1 序列化

RDD以序列化形式保存在内存中,可以减少内存使用并改善垃圾收集。默认情况下Spark使用Java序列化,这并不是很高效。Spark支持Kryo,Kryo更有效且性能高,可以将 spark.serializer设置为 org.apache.spark.serializer.KryoSerializer来启用Kryo。

2.3.2 压缩

除了序列化RDD之外。还可以将 spark.rdd.compress设置为true来进行压缩。

2.3.3 垃圾收集

流处理应用程序大量的对象增加了JVM垃圾收集的压力,频繁的GC会增加程序的延迟。建议对驱动程序和执行器使用CMS垃圾收集器,与应用程序同时运行垃圾收集来缩短暂停时间。

通过传递 --driver-java-options-XX: UseConcMarkSweepGCspark-submit,为驱动程序启动CMS。对于执行器,将参数 spark.executor.extraJavaOptions设置为 XX: UseConcMarkSweepGC,来启用CMS垃圾收集。

2.4 Shuffle

每次触发shuffle都会在集群中来回复制数据,这将付出很高的磁盘和网络I/O开销。因此在设计流应用程序的时候应该遵循一些原则:

2.4.1 提前投影过滤

提前进行投影和过滤,可以减少下游算子处理的数据。

2.4.2 多使用Combiner

Combiner使用的是map端聚合,可以减少在shuffle过程中需要处理的数据量。如使用reduceByKey( )可以在shuffle之前的分区级别启用本地聚合。

2.4.2 大量运用并行化

shuffle操作内部使用分组操作的Hash映射来对分区空间进行分隔,这可能会导致堆空间耗尽。通过增加*ByKey()任务的的并行度,减少其工作集来避免这种情况。

2.4.3 文件合并

在大量shuffle任务的情况下,合并中间文件以改善磁盘查找是很有用的。可以设置 spark.shuffle.consolidateFilestrue,启用合并。

2.4.4 更多内存

RDD,shuffle和应用程序对象之间共用执行器Java堆。默认情况下,RDD使用内存的60%( spark.storage.memoryFraction),shuffle使用20%( spark.shuffle.memoryFraction)。过多地使用将使shuffle聚合阶段的数据溢出到磁盘。如果使用shuffle比较多,则可以适当增加shuffle内存的的占用比例,以减少对磁盘的溢出次数。

Shuffle所涉及的问题比较复杂,调优的点也很多,我们会在另一篇文章详细介绍。

0 人点赞