Spark Stage, DAG(Directed Acyclic Graph)
- Spark 划分 Stage 的依据是其根据提交的 Job 生成的
DAG
,在离散数学中我们学到了一个 有向无环图(Directed Acyclic Graph) 的概念,再生产环境中,我写的任务仅仅是 有向树(Directed tree) 级别,有向无环图还未遇到过。 但是可以想象到,如果在代码中使用了 RDD 的join
算子是有可能出现 有向无环图 的 DAG。对于我们组所使用的日志数据处理,主要还是集中在 有向树复杂度的 逻辑拓扑。
PS: 有向树一定是 有向无环图,有向无环图不一定都是有向树。可以自行脑补一下 将流程抽象为拓扑能够更好的将在其中添加各种优化措施,而不是像 Hadoop MapReduce 一般将每一步的结果都写回,造成大量的浪费。
- 在我们的业务场景中有这种情况,将原始搜集的日志,切割出小字段,并按序排列,这个操作我称之为 归一化。并对归一化数据进行一系列操作。
real_data.map(deal_data_func).reduceByKey(merge_data_func).foreachRDD(store_data_func)
- 在
store_data_func
中 使用foreachPartition
进行与存储化介质之间的联通。在 Spark 中,该方法称作action
RDD 的方法
- RDD 的方法分为两类
transformation
和action
,当且仅当action
被调用时,Spark
才会真正将任务提交至DAG调度器
,进而分配至Task调度器
如果在编写 Spark 项目时,仅仅做了 transformation 但并未提交 action,这时候 Spark Would do nothing!
real_data.map(deal_data_func).reduceByKey(merge_data_func)
这种写法放在寻常非Spark项目中一点也不意外,甚至可以认为是完整的。 这是与 MapReduce 最大的区别之一,因为 MapReduce 没有所谓的 Stage 划分,导致很多人看了网上的老代码,在新入手 Spark 时陷入这个误区。
- 之所以 Spark 需要在提交
action
之后才真正执行计算,是为了充分利用DAG
划分Stage
带来的优势,包括但不限于 减少计算量,I/O负载 等 - 在诸多
transformation
操作中,上篇提到,其又分为两类: 宽依赖(reduceByKey, ...),窄依赖(map,flatMap, ...)- 后者比起前者简单许多,仅仅是对每个
Partition
中的每个数据做一次映射,Partition
数目不变 - 前者就稍微复杂些,因为在该类型的操作中,我们的目的是获取全局数据的一种提取(如对相同 key 的 value 进行累加),但是当数据量大到无法在一台机器上全部容纳时,我们就需要 Spark 去调度并切分数据并重新分配
Partition
及其数据。 - 宽依赖 生成的 新RDD 的
Partition
数是初学者使用时最大的疑惑以及黑盒(包括我),在某天我终于忍不住,去查了源码,以reduceByKey
为例子: # reduceByKey 有三种函数签名,一目了然 1.def reduceByKey( partitioner: Partitioner, func: JFunction2[V, V, V] ) 2.def reduceByKey( func: JFunction2[V, V, V], numPartitions: Int ) 3.def reduceByKey( func: JFunction2[V, V, V] ) - 我们最常用的是 最简短,只带一个参数的重载模式
3
2
较3
多了一个参数numPartitions
,这个参数代表我们可以人为指定Partition
数目,所以很多网上说的Partition
为Spark
自己生成的 带有一定的误导性,但这个函数仅当十分了解Spark
调度原理时才使用。1
是本次的重点,其第一个参数是Partitioner
类型的变量,我们可以猜测,如果我们使用3
时,既不指定numPartitions
也不指定一个Partitioner
那必然有一个default
的东西,用于确定reduceByKey
后的Partition
数量- 继续翻阅源码,在
3
的函数实现中我们看见了defaultPartitioner
的实例化,并调用了1
: fromRDD(reduceByKey(defaultPartitioner(rdd), func)) # 签名,可以看出,该实例是至少要传入一个 rdd 作为参数的 def defaultPartitioner( rdd: RDD[_], others: RDD[_]* )
- 后者比起前者简单许多,仅仅是对每个
defaultPartitioner
是 Spark 一个自带的实现,其内实现了一段设置 新RDDPartition
逻辑- 如果是一个以上 旧RDD 传入,且没有设置
spark.default.parallelism
参数 则 新RDD 的Partition
数为 旧RDD 中最大的 - 如果设置
spark.default.parallelism
参数,则 新RDD 的Partition
数目为该参数的值。 val defaultNumPartitions = if (rdd.context.conf.contains("spark.default.parallelism")) { rdd.context.defaultParallelism } else { rdds.map(_.partitions.length).max }
- 如果是一个以上 旧RDD 传入,且没有设置
spark.default.parallelism 参数在 Spark 项目初始化之时设置,保存在 SparkContext 中,用过 Spark 的人不会陌生,一般而言这个值设置成
Excutor * Excutor-core * 2
至此弄懂了Partition
数量的计算由来,相关看更详细的源码操作,可以阅读 Spark Core 中的 Partitioner.scala 文件,很简洁。
- RDD 中的
Partition
数目之所以重要,缘由便在于其在很大程度上决定了 Spark 的 并发 效果,上篇文章提到, RDD 的Partition
·与其所处Stage
中的task
一一对应,这也是这个spark.default.parallelism
参数名字的由来。
在 Spark 的 Patch 中对于 Partition 数目的选择一直是一个热议,大家有兴趣可以看看例如这个 Patch(https://issues.apache.org/jira/plugins/servlet/mobile#issue/SPARK-6377) ,但截至目前 Spark-2.3.2,依旧是我上述的结论 但是实际上 Spark SQL 已经有了一个动态调整 Partition 数量的功能代码, 1spark.sql.adaptive.enable=true1
Stage 的划分
- 实际上
Stage
的划分应该是最好理解的,或者说并不需要深究源码级别的理解,实际使用中,我们最需要留意的地方,便是在何时会发生Shuffle
,而Stage
的划分就是为了找出Shuffle
最该发生的位置 Shuffle
的发生意味着,数据可能会在不同节点间的迁移,内存向文件的写出操作,内存读取文件内容的一系列损耗较大的操作,90%以上的场景是需要越少Shuffle
越好。- 通过不同
transformation
的替换来达到这个目的,最经典的 用reduceByKey
替换groupByKey
就不再赘述,原理就是 前者会将本机数据先做一次聚合,再传输到其他节点上,减少Shuffle
Stage
在 Spark 本质中就是一系列可以 并行 执行的task
的集合,划分Stage
的标准便是 宽依赖 的出现。以文章开头处的例子为原型
- 通过不同
- 从图中可以看出,当执行到
reduceByKey
时,Shuffle
便开始了,如果你的 Spark 是一套用有 多 个节点的集群- 那么首先它会在本地进行
reduceByKey
,得到一份本地的唯一<key,value>
- 紧接着
Shuffle-Write(如Write Disk)
,由DAGScheduler
选择分配哪些数据到哪个节点(defaultpartitioner
决定) - 接着在目的节点
Shuffle-Read(如Read Network)
主动拉取数据 - 最后进行合并,此时对于任意节点上的任意
key
都是全局唯一的
- 那么首先它会在本地进行
- 以上能看出,想要降低
Shuffle
的消耗,除了减少Shuffle
的产生次数。还要尽量减少每次Shuffle
的数据量大小。
在
Shuffle
过后,我们的项目场景一般就需要存储计算结果,而计算结果的存放又在一定程度上决定了这批次任务是否能真正完成,大致可分为 就地存储 和 集中存储,将在下篇详述。
[Extra]Shuffle Read&Write 小触
这部分细节,实际上对实际项目中的应用没什么太大帮助,纯粹是了解一下 Spark 的内在,只需要知道的是, Shuffle 带来的各种 IO 无法避免,Spark 正在不断新增各种优化算法,来降低这部分的开销。 此处需要设定场景,我们用的是默认存储介质,
Shuffle Write
是向本地磁盘写入数据。
- 当一个份数据经过各种归一化,最后调用 窄依赖
transformation
时,依旧以上面的例子为背景。 - 此时首先发生了
Shuffle Write
,Spark 会先确定本次的 分区器(Partitioner
),由上面内容可知,分区器的作用有二:- 确定出 新RDD 的分区数
- 决定哪些数据被放到哪些分区
- 当 Spark 确定了分区数
- 首先它会用内部的算法将本地的数据先做一次
reduceByKey
- 紧接着在本地新建临时文件,此处会依据种种情形(例如 Partition 数量,序列化情况等)选择不同的 Shuffle Write 算法,将中间结果写出到磁盘。
- 根据
Partitioner
决定哪些key
的数据属于哪个分区,且在内存中按分区序号排序,当内存不足时,写出到磁盘,并带上索引文件,以标识不同分区数据(此文件是按序排列)。 - 最后当另一端准备拉取数据时,再将这些分布在不同文件中的相同分区的数据合并,传给另一端。
- 首先它会用内部的算法将本地的数据先做一次
- 图中,
1
处的Task
与 旧RDD的Partition
一一对应,在3
阶段做一次合并。4
阶段的Task
代表远端Shuffle Read
的Task
,其数量与 新RDD 的Partition
相同且一一对应。
此处有太多细节没有详述,因为 Shuffle Write 的算法有不少, Spark 根据情况来选择用哪种算法输出文件减少性能损耗。 上边所说的情况亦是其中的一种
SortShuffle
而已