Spark中广播变量详解以及如何动态更新广播变量

2020-08-10 14:43:56 浏览数 (1)

前言:Spark目前提供了两种有限定类型的共享变量:广播变量和累加器,今天主要介绍一下基于Spark2.4版本的广播变量。先前的版本比如Spark2.1之前的广播变量有两种实现:HttpBroadcast和TorrentBroadcast,但是鉴于HttpBroadcast有各种弊端,目前已经舍弃这种实现,本篇文章也主要阐述TorrentBroadcast】

广播变量概述

广播变量是一个只读变量,通过它我们可以将一些共享数据集或者大变量缓存在Spark集群中的各个机器上而不用每个task都需要copy一个副本,后续计算可以重复使用,减少了数据传输时网络带宽的使用,提高效率。相比于Hadoop的分布式缓存,广播的内容可以跨作业共享。

广播变量要求广播的数据不可变、不能太大但也不能太小(一般几十M以上)、可被序列化和反序列化、并且必须在driver端声明广播变量,适用于广播多个stage公用的数据,存储级别目前是MEMORY_AND_DISK。

广播变量存储目前基于Spark实现的BlockManager分布式存储系统,Spark中的shuffle数据、加载HDFS数据时切分过来的block块都存储在BlockManager中,不是今天的讨论点,这里先不做详述了。

广播变量的创建方式和获取

代码语言:javascript复制
//创建广播变量
val broadcastVar = sparkSession.sparkContext.broadcast(Array(1, 2, 3))

//获取广播变量
broadcastVar.value

广播变量实例化过程

1.首先调用val broadcastVar = sparkSession.sparkContext.broadcast(Array(1, 2, 3))

2.调用BroadcastManager的newBroadcast方法

代码语言:javascript复制
val bc = env.broadcastManager.newBroadcast[T](value, isLocal)

3.通过广播工厂的newBroadcast方法进行创建

代码语言:javascript复制
broadcastFactory.newBroadcast[T](value_, isLocal, nextBroadcastId.getAndIncrement())

在调用BroadcastManager的newBroadcast方法时已完成对广播工厂的初始化(initialize方法),我们只需看BroadcastFactory的实现TorrentBroadcastFactory中对TorrentBroadcast的实例化过程:

代码语言:javascript复制
new TorrentBroadcast[T](value_, id)

4.在构建TorrentBroadcast时,将广播的数据写入BlockManager

1)首先会将广播变量序列化后的对象划分为多个block块,存储在driver端的BlockManager,这样运行在driver端的task就不用创建广播变量的副本了(具体可以查看TorrentBroadcast的writeBlocks方法)

2)每个executor在获取广播变量时首先从本地的BlockManager获取。获取不到就会从driver或者其他的executor上获取,获取之后,会将获取到的数据保存在自己的BlockManager中

3)块的大小默认4M

代码语言:javascript复制
conf.getSizeAsKb("spark.broadcast.blockSize", "4m").toInt * 1024

广播变量初始化过程

1.首先调用broadcastVar.value

2.TorrentBroadcast中lazy变量_value进行初始化,调用readBroadcastBlock()

3.先从缓存中读取,对结果进行模式匹配,匹配成功的直接返回

4.读取不到通过readBlocks()进行读取

从driver端或者其他的executor中读取,将读取的对象存储到本地,并存于缓存中

代码语言:javascript复制
new ReferenceMap(AbstractReferenceMap.HARD, AbstractReferenceMap.WEAK)

Spark两种广播变量对比

正如【前言】中所说,HttpBroadcast在Spark后续的版本中已经被废弃,但考虑到部分公司用的Spark版本较低,面试中仍有可能问到两种实现的相关问题,这里简单介绍一下:

HttpBroadcast会在driver端的BlockManager里面存储广播变量对象,并且将该广播变量序列化写入文件中去。所有获取广播数据请求都在driver端,所以存在单点故障和网络IO性能问题。

TorrentBroadcast会在driver端的BlockManager里面存储广播变量对象,并将广播对象分割成若干序列化block块(默认4M),存储于BlockManager。小的block存储位置信息,存储于Driver端的BlockManagerMaster。数据请求并非集中于driver端,避免了单点故障和driver端网络磁盘IO过高。

TorrentBroadcast在executor端存储一个对象的同时会将获取的block存储于BlockManager,并向driver端的BlockManager汇报block的存储信息。

请求数据的时候会先获取block的所有存储位置信息,并且是随机的在所有存储了该executor的BlockManager去获取,避免了数据请求服务集中于一点。

总之就是HttpBroadcast导致获取广播变量的请求集中于driver端,容易引起driver端单点故障,网络IO过高影响性能等问题,而TorrentBroadcast获取广播变量的请求服务即可以请求到driver端也可以在executor,避免了上述问题,当然这只是主要的优化点。

动态更新广播变量

通过上面的介绍,大家都知道广播变量是只读的,那么在Spark流式处理中如何进行动态更新广播变量?

既然无法更新,那么只能动态生成,应用场景有实时风控中根据业务情况调整规则库、实时日志ETL服务中获取最新的日志格式以及字段变更等。

代码语言:javascript复制
@volatile private var instance: Broadcast[Array[Int]] = null

//获取广播变量单例对象
def getInstance(sc: SparkContext, ctime: Long): Broadcast[Array[Int]] = {
  if (instance == null) {
    synchronized {
      if (instance == null) {
        instance = sc.broadcast(fetchLastestData())
      }
    }
  }
  instance
}

//加载要广播的数据,并更新广播变量
def updateBroadCastVar(sc: SparkContext, blocking: Boolean = false): Unit = {
  if (instance != null) {
    //删除缓存在executors上的广播副本,并可选择是否在删除完成后进行block等待
    //底层可选择是否将driver端的广播副本也删除
    instance.unpersist(blocking)
    
    instance = sc.broadcast(fetchLastestData())
  }
}

def fetchLastestData() = {
  //动态获取需要更新的数据
  //这里是伪代码
  Array(1, 2, 3)
}
代码语言:javascript复制
val dataFormat = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss")

...
...

stream.foreachRDD { rdd =>
  val current_time = dataFormat.format(new Date())
  val new_time = current_time.substring(14, 16).toLong
  //每10分钟更新一次
  if (new_time % 10 == 0) {
    updateBroadCastVar(rdd.sparkContext, true)
  }

  rdd.foreachPartition { records =>
    instance.value
    ...
  }
}

注意:上述是给出了一个实现思路的伪代码,实际生产中还需要进行一定的优化。

此外,这种方式有一定的弊端,就是广播的数据因为是周期性更新,所以存在一定的滞后性。广播的周期不能太短,要考虑外部存储要广播数据的存储系统的压力。具体的还要看具体的业务场景,如果对实时性要求不是特别高的话,可以采取这种,当然也可以参考Flink是如何实现动态广播的。

Spark流式程序中为何使用单例模式

1.广播变量是只读的,使用单例模式可以减少Spark流式程序中每次job生成执行,频繁创建广播变量带来的开销

2.广播变量单例模式也需要做同步处理。在FIFO调度模式下,基本不会发生并发问题。但是如果你改变了调度模式,如采用公平调度模式,同时设置Spark流式程序并行执行的job数大于1,如设置参数spark.streaming.concurrentJobs=4,则必须加上同步代码

3.在多个输出流共享广播变量的情况下,同时配置了公平调度模式,也会产生并发问题。建议在foreachRDD或者transform中使用局部变量进行广播,避免在公平调度模式下不同job之间产生影响。

除了广播变量,累加器也是一样。在Spark流式组件如Spark Streaming底层,每个输出流都会产生一个job,形成一个job集合提交到线程池里并发执行,详细的内容在后续介绍Spark Streaming、Structured Streaming时再做详细阐述。

0 人点赞