云计算和大数据密不可分,这里有必要详细讨论下我的老本行——大数据领域。未来几年,我们将很荣幸地见证大数据技术的容器化。首先我们用几篇文章深入地了解一下大数据领域的相关技术。
1. 背景和简介
Spark Streaming是Spark的一个组件,它把流处理当作离散微批处理,被称为离散流或DStream。Spark的核心是RDD,即弹性分布式数据集。RDD本质上是将数据分区(Partition)封装起来。而DStream是一个由时间驱动、逻辑封装的RDD。以下面这段代码为例:
代码语言:javascript复制def main(args: Array[String]): Unit = {
val logger = LoggerFactory.getLogger(PreJoin.getClass)
Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
val conf = new SparkConf().setMaster("local[2]").setAppName("PreJoin")
// val conf = new SparkConf().setAppName("PreJoin")
/** 限制每秒钟从topic的每个partition最多消费的消息条数 */
conf.set("spark.streaming.kafka.maxRatePerPartition", "500")
conf.set("spark.streaming.kafka.consumer.cache.enabled", "false")
// 设置每五秒更新一下
val ssc = new StreamingContext(conf, Seconds(5))
val sparkSession = SparkSession.builder().config(conf).getOrCreate()
import sparkSession.implicits._
var zookeeperservers = ""
// hbase的相关配置
val config: Configuration = HBaseConfiguration.create()
val newAPIJobConfiguration = Job.getInstance(config)
// 读取Kafka
val topics = Array("result1")
val stream = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams)
)
// 读取另外两个topic
// 每30秒一个滑动窗口处理30秒内到达的数据,将Json数据解析出来
val resultJson = stream.map(record => record.value).window(Seconds(30), Seconds(30)).transform(rdd => {
...
})
// resultJson2和resultJson3类似
val resultJson2 =
val resultJson3 =
// 合并流
val resultJsonAll = ssc.union(Seq(resultJson2, resultJson, resultJson3))
// 遍历每个RDD,存入HBase
resultJsonAll.foreachRDD(rdd => {
val hbasePuts3 = data3.rdd.map((row: Row) => {
...
})
hbasePuts3.saveAsNewAPIHadoopDataset(newAPIJobConfiguration3.getConfiguration)
})
ssc.start() // 开始计算
ssc.awaitTermination()
} // 等待停止计算
上面这段代码描述了一个典型的Spark Streaming 的处理流程。它的功能是从Kafka拉取数据,经过一系列的转换,将结果存入HBase。我们可以看到流处理应用程序和批处理应用程序的一些区别。批处理应用程序拥有清晰的生命周期,它们一旦处理了输入文件就完成了执行。而上面的流处理应用程序的执行没有开始和停止的标记。
几个决定Spark Streaming应用程序生命周期的方法:
方法 | 描述 |
---|---|
start() | 开始执行应用程序 |
awaitTermination() | 等待应用程序终止 |
stop() | 强制应用程序停止执行 |
一个Spark应用程序的执行过程如下图
Yarn-Cluster运行模式执行过程
spark 控制进程
守护进程(Daemon) | 描述 |
---|---|
Driver(驱动程序) | 包含SparkContext实例的应用程序入口点 |
Master(主进程) | 负责调度和资源编排 |
Worker(子进程) | 负责节点状态和运行执行器 |
Executor(执行器) | 根据作业分配,负责执行该作业派发的任务 |
为了减少网络流量,强烈建议在集群机器上运行驱动程序,例如在Master节点,特别是需要驱动程序从Worker中提取数据的情况。
Spark分层执行结构
实体 | 描述 |
---|---|
Application(应用程序) | SparkContext的一个实例 |
Job(作业) | 一个Action后执行的一组阶段 |
Stage(阶段) | 在shuffle内的一组转换 |
Task set(任务组) | 来自同一组阶段的任务组 |
Task(任务) | 一个阶段里的执行单元 |
有了上面的背景,我们下面便从几个方面来讨论下Spark Streaming的优化。
2. 调优
2.1 并行化
2.1.1 执行器Executor
num-executors
执行器是一个在每个Worker上执行的JVM进程。那么如何选择执行器的数量呢?理论上来说,既然executor是JVM进程,应该多一点才好。但是我们在选择executor数量的时候,有几条经验可供参考:
- 为每个节点上的操作系统和其他服务留出一些资源
- 如果在YARN上运行,也占用应用程序Master
executor-memory
该参数用于设置每个Executor进程的内存,Executor内存的大小,很多时候直接决定了Spark作业的性能。根据团队的资源队列的最大内存限制是多少, num-executors
乘以 executor-memory
,不能超过队列的最大内存量。建议申请的内存量最好不要超过资源队列最大总内存的1/3~1/2。
executor-cores
该参数置每个Executor进程的CPU core数量。这个参数决定了每个Executor进程并行执行task线程的能力。根据自己的资源队列的最大CPU core限制是多少,再依据设置的Executor数量,来决定每个Executor进程可以分配到几个CPU core。建议 num-executors
* executor-cores
不要超过队列总CPU core的1/3~1/2左右。
2.1.2 任务(Task)
Spark中的task是执行的单元。任务以线程而不是执行器 的进程执行。每个DStream由RDD组成,而RDD又由分区组成。每个分区是一块独立的数据,由一个任务操作。因为一个RDD中的分区数与任务数之间存在几乎一对一的映射。也就是说,DStream并行度是分区数的函数。该分区数取决于依赖关系类型:跨过DStream时如果落在窄依赖区,分区数保持不变,但经过shuffle区由于宽依赖的缘故,这个分区数会发生改变。
shuffle的分区数由 spark.default.parallelism
决定,或者如果 spark.default.parallelism
未设置,则由构成父DStream的RDD中的最大分区数决定。
实现完全优化的并行度的最佳方法,就是不断试错,和常规Spark应用的调优的方法一样,控制逐渐增加分区的个数,每次将分区数乘以1.5,直到性能停止改进位置。这可以通过Spark UI 进行校准。
综上从Executor和Task的角度,得到Spark Streaming 的一些优化方法,提交Spark作业的脚本大概为:
代码语言:javascript复制 ./spark-submit
--master yarn
--num-executors 30
--executor-memory 6G
--executor-cores 2
--driver-memory 1G
--conf spark.default.parallelism=120
--conf spark.storage.memoryFraction=0.5
--conf spark.shuffle.memoryFraction=0.3
--class com.example.PreJoin Stream-1.0-SNAPSHOT-jar-with-dependencies.jar
2.1.3 创建更多的输入DStream和Receive
每个输入DStream都会在某个Worker的Executor上启动一个Receiver,该Receiver接收一个数据流。因此可以通过创建多个DStream达到接收多个数据流的效果。
比如,一个接收多个Kafka Topic的输入DStream,可以拆分成多个输入DStream,每个分别接收一个topic的数据。这样就会创建多个Receiver,从而并行地接收数据,提高吞吐量。然后多个DStream使用union算子进行合并,从而形成一个DStream。核心代码如下:
代码语言:javascript复制// 合并流
val resultJsonAll = ssc.union(Seq(resultJson2, resultJson, resultJson3))
2.2 批处理间隔
批处理间隔是另一个直接影响性能的重要因素。它确定了微批处理的周期,也就是规定了每个微批处理能够通过的数据量。批处理间隔设置得太高则每个批处理会有高延迟,设置得太低则导致资源利用不足。
理想的状况是能够以线路速率处理数据,例如数据源每300毫秒发送一次,那么我们也可以这样假设:处理管道数据的延迟时间也为300毫秒。
如何设置批处理间隔,最好采取的策略是每次试验都从高值开始,比如1.5倍。Spark日志可用于计算系统的稳定性,即批处理间隔能否跟上数据速率。在日志中查找 Totaldelay
总延迟。如果此值保持接近批处理间隔,则系统是稳定的。否则尝试增加2.1所述的并行化来减少管道的延迟。
假如日志如下:
代码语言:javascript复制JobScheduler: Total delay: 2.236 s for time 1581859740000 ms (execution: 2.256 s)
这表示该批处理的总延迟为3秒,小于批处理间隔。假设在某些时候可能会出现数据峰值,那么5秒是个不错的值。此外还可以通过Spark UI了解每阶段的延迟细目。Spark UI我们会在另一篇文章详细介绍。
2.3 内存
RDD基于内存计算,在内存中缓存所有内容,会给堆和垃圾收集器增加很大压力。如果应用程序的堆空间不足,可以增加 spark.executor.memory
。此外有一些情况,Spark还会使用堆外内存,例如Java NIO采用的字节缓冲区。在YARN上,这个额外的内存分配由 spark.yarn.executor.memoryOverhead
处理,默认值为 max(executorMemory*0.10384)
。如果应用程序使用大量的堆外内存,那么应该增加这个因子。
一般来说,增加堆大小或堆外内存属于最后才会考虑的操作。我们首要的目标是减少应用程序的内存占用。下面介绍实现这一目标的三种方法。
2.3.1 序列化
RDD以序列化形式保存在内存中,可以减少内存使用并改善垃圾收集。默认情况下Spark使用Java序列化,这并不是很高效。Spark支持Kryo,Kryo更有效且性能高,可以将 spark.serializer
设置为 org.apache.spark.serializer.KryoSerializer
来启用Kryo。
2.3.2 压缩
除了序列化RDD之外。还可以将 spark.rdd.compress
设置为true来进行压缩。
2.3.3 垃圾收集
流处理应用程序大量的对象增加了JVM垃圾收集的压力,频繁的GC会增加程序的延迟。建议对驱动程序和执行器使用CMS垃圾收集器,与应用程序同时运行垃圾收集来缩短暂停时间。
通过传递 --driver-java-options-XX: UseConcMarkSweepGC
到 spark-submit
,为驱动程序启动CMS。对于执行器,将参数 spark.executor.extraJavaOptions
设置为 XX: UseConcMarkSweepGC
,来启用CMS垃圾收集。
2.4 Shuffle
每次触发shuffle都会在集群中来回复制数据,这将付出很高的磁盘和网络I/O开销。因此在设计流应用程序的时候应该遵循一些原则:
2.4.1 提前投影过滤
提前进行投影和过滤,可以减少下游算子处理的数据。
2.4.2 多使用Combiner
Combiner使用的是map端聚合,可以减少在shuffle过程中需要处理的数据量。如使用reduceByKey( )可以在shuffle之前的分区级别启用本地聚合。
2.4.2 大量运用并行化
shuffle操作内部使用分组操作的Hash映射来对分区空间进行分隔,这可能会导致堆空间耗尽。通过增加*ByKey()任务的的并行度,减少其工作集来避免这种情况。
2.4.3 文件合并
在大量shuffle任务的情况下,合并中间文件以改善磁盘查找是很有用的。可以设置 spark.shuffle.consolidateFiles
为 true
,启用合并。
2.4.4 更多内存
RDD,shuffle和应用程序对象之间共用执行器Java堆。默认情况下,RDD使用内存的60%( spark.storage.memoryFraction
),shuffle使用20%( spark.shuffle.memoryFraction
)。过多地使用将使shuffle聚合阶段的数据溢出到磁盘。如果使用shuffle比较多,则可以适当增加shuffle内存的的占用比例,以减少对磁盘的溢出次数。
Shuffle所涉及的问题比较复杂,调优的点也很多,我们会在另一篇文章详细介绍。