前言
最近在拜读许老师的《大数据处理框架Apache Spark设计与实现》,之前看豆瓣评分很高,阅读了一下果然通俗易懂,在这里记录一下相关的笔记,补充了一些个人理解,如有不对还请指正。参考链接:https://github.com/JerryLead/SparkInternals
1.1 Spark部署
Spark在集群上部署有多个版本:Standalone、Mesos、YARN、Kubernetes。其中Standalone是Spark官方的,其他都是第三方框架。YARN是目前主流之一,可以同时管理Spark、Hadoop Mapreduce任务。
1.2 Spark 系统架构
1.2.1 基本名词概念
Spark和MapReduce一样是Master-Worker结构。由于在介绍Spark原理的时候会涉及到很多名词,一不小心就容易搞混淆,因此先梳理一下几个名词:
- Master节点:本质上是一台机器,常驻Master进程,负责分配任务以及监控Worker存活。
- Worker节点:本质上是多台机器,常驻Worker进程,负责执行任务以及监控任务运行状态。
- Spark Application:用户自己写的程序,比如 HelloWorld.scala。
- Spark Driver:一个进程。负责运行main(),以及创建SparkContext。如果是 YARN 集群,那么 Driver 可能被调度到 Worker 节点上运行(比如上图中的 Worker Node 2)。
- Executor:一个JVM进程。一个Worker可以管理一个或多个Executor,但一个Executor只有一个线程池,线程池里有多个线程,每个线程可以执行一个 task。Spark先以Executor为单位占用集群资源,然后Driver再分配任务执行。通常来说一个Executor可以分配多个CPU和内存。
- Task:一个Executor内的线程,最小的计算单位。一个task一般使用一个CPU,且多个task共享同一个Executor的内存。
- Job:Spark的作业。通常执行几次action(),就会有几个作业数。比如count()两次就有两个Job。
- Stage:Spark Job的阶段。一个Job可以分为1~n个stage。(物理执行计划里面的概念)
- Partition:数据的分区。分区个数可以决定该数据最多部署在几台机器上。
- RDD:本质上是一个封装好的抽象类(abstract class)。并行数据集的抽象表示(Resilient Distributed Datasets, RDD)。另外提一下,Spark的Dataframe是在RDD基础上再封装的。
1.2.2优点缺点
以上介绍可以看出来Spark这么设计相比于Hadoop MapReduce的优点和缺点:
- 优点:多个task以线程形式执行,互相可以共享内存,避免资源浪费;同时线程启动比进程启动更快。(MR里面的task是以java进程方式运行)
- 缺点:多个task之间由于是线程的形式会导致资源竞争,另外多个task并行的日志会比较混乱。
1.3 Spark应用例子
1.3.1 GroupBy例子
下面举一个groupby的例子,来了解spark运行的流程。
代码语言:java复制```scala
package org.apache.spark.examples
import java.util.Random
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.SparkContext._
/**
* Usage: GroupByTest [numMappers] [numKVPairs] [valSize] [numReducers]
*/
object GroupByTest {
def main(args: Array[String]) {
val sparkConf = new SparkConf().setAppName("GroupBy Test")
var numMappers = 3 //该应用包含3个map task
var numKVPairs = 4 //每个task随机生成4个<K,V>record
var valSize = 1000 //每个Value大小1000byte
var numReducers = 2 //由于随机产生的key会有重复,groupby聚合过程使用2个reduce task
val sc = new SparkContext(sparkConf) //也可以通过SparkSession初始化
// 生成一个k-v形式的array
val pairs1 = sc.parallelize(0 until numMappers, numMappers).flatMap { p =>
val ranGen = new Random
var arr1 = new Array[(Int, Array[Byte])](numKVPairs)
for (i <- 0 until numKVPairs) {
val byteArr = new Array[Byte](valSize)
ranGen.nextBytes(byteArr)
arr1(i) = (ranGen.nextInt(Int.MaxValue), byteArr)
}
arr1
}.cache()
// Enforce that everything has been calculated and in cache
pairs1.count() //=numMappers * numKVPairs = 12,到这里cache()才会生效
println(pairs1.groupByKey(numReducers).count()) //=4
sc.stop()
}
}
```
代码大致意思是:先用RDD的形式生成一堆key-value形式的数组,key是随机给0~Int最大值,value是一个随机的byte。然后调用groupby和count,把相同的key聚合,计算个数。
这里需要注意,真正在写应用的时候一般不用自己指定map task的个数,通常自动计算为:
MapTask个数=frac{输入数据大小}{每个分片大小(HDFS默认是128MB)}
实际执行流程比自己的要复杂,需要先建立逻辑处理流程(Logical Plan),然后根据逻辑处理流程生成 物理逻辑流程(Physical Plan),然后生成具体 task 执行。
1.3.2逻辑处理流程(Logical Plan)
逻辑处理流程用通俗的话来说就是:各种各样的RDD不停的变换。逻辑处理流程表示的是数据上的依赖关系,不是 task 的执行图。仔细观察上面代码可以发现,action()一共有两次:
- 一次是flatmap生成array之后进行了一次count()。
- 一次是groupby之后进行了一次count()。
由于第二次count()时候数据依赖于前面,因此以变量result
为例。使用 result.toDebugString
输出日志,可以看到整个逻辑处理流程如下:
(2) ShuffledRDD[2] at groupByKey at GroupByTest.scala:55 []
-(3) MapPartitionsRDD[1] at flatMap at GroupByTest.scala:41 []
| CachedPartitions: 3; MemorySize: 12.4 кв;
ExternalBlockStoreSize: 0.0 в; DiskSize: 0.0 B
| ParallelCollectionRDD[0] at parallelize at GroupByTest. scala:41 []
不难看出生成了3个RDD,后面的中括号是生成的顺序,即:ParallelCollectionRDD
→ MapPartitionsRDD
→ ShuffledRDD
。
另外,(2) ShuffledRDD[2] at groupByKey at GroupByTest.scala:55 []
前面的小括号里面的(2)、(3)是分区(partition)个数的意思。
这个日志“从里往外“看:可以看出来由于生成了3个数组,因此一直到MapPartitionsRDD
都是3个分区。另外,由于我们使用了cache(),因此其中的3个分区计算时候会被缓存为CachedPartitions
。而到groupby的时候,由于我们指定了var numReducers = 2
,因此变成了2个分区。并且转成了ShuffledRDD
。
1.3.3 物理执行计划(Physical Plan)
上一节说的逻辑处理流程(Logical Plan)基本上可以理解是RDD之间的变化的关系,但是并不能执行计算任务,因此需要再转换成物理执行计划(Physical Plan)对任务执行。其中包括执行阶段(Stage)和执行任务(Task)。简单来说可以分成三个步骤:
- 确定应用(Application)会产生哪些作业(Job)。 比如上面例子因为count()两次,就是两个Job。
- 将每个作业(Job)拆分成1~n个执行阶段(Stage)。 这里是根据逻辑处理流程的数据依赖关系来拆分。比如上面例子第一个Job就只拆了1个stage,而第二个Job拆成了2个Stage。为什么这么拆,后面再说。
- 确定执行任务(task)的个数和种类。 这里是依据RDD的分区(Partition)个数来确定,比如第二个Job,一开始是3个partition,因此在stage 0 里面是3个task来计算任务。到了stage 1,由于定义了
var numReducers = 2
,变成了2个分区,因此在这里是2个task来计算任务。stage 0→stage 1,这个过程称为shuffle机制,会将数据重新分配。
注:为什么要拆分执行阶段(Stage)?
- 便于并行执行。 先看同一个stage里面,多个task大小合适,且为同构的,并行起来方便。
- 提高数据处理效率。 再看同一个task里面,多个操作串行处理,效率高。
- 方便错误容忍。 如果一个stage挂了,直接重新运行这个stage就行了,不用把整个job都重新运行。
1.4 查看日志
如果想知道自己Spark Application的运行流程,可以根据Spark提供的执行界面查看。
1.4.1 查看Job日志
Job日志可以查看Stage的运行的情况
1.4.2 查看Stage日志
上图点开后,可以看到多个stage,点击stage的超链接(或者从Job那边点超链接也可以),可以查看该stage的运行情况。
打开之后可以查看DAG,查看RDD的生成顺序,同时也可以查看每个task的运行时间,方便排查问题。