Spark之基本流程(一)

2023-08-24 07:56:45 浏览数 (2)

前言

最近在拜读许老师的《大数据处理框架Apache Spark设计与实现》,之前看豆瓣评分很高,阅读了一下果然通俗易懂,在这里记录一下相关的笔记,补充了一些个人理解,如有不对还请指正。参考链接:https://github.com/JerryLead/SparkInternals

1.1 Spark部署

Spark在集群上部署有多个版本:Standalone、Mesos、YARN、Kubernetes。其中Standalone是Spark官方的,其他都是第三方框架。YARN是目前主流之一,可以同时管理Spark、Hadoop Mapreduce任务。

1.2 Spark 系统架构

1.2.1 基本名词概念

Spark和MapReduce一样是Master-Worker结构。由于在介绍Spark原理的时候会涉及到很多名词,一不小心就容易搞混淆,因此先梳理一下几个名词:

  • Master节点:本质上是一台机器,常驻Master进程,负责分配任务以及监控Worker存活。
  • Worker节点:本质上是多台机器,常驻Worker进程,负责执行任务以及监控任务运行状态。
  • Spark Application:用户自己写的程序,比如 HelloWorld.scala。
  • Spark Driver:一个进程。负责运行main(),以及创建SparkContext。如果是 YARN 集群,那么 Driver 可能被调度到 Worker 节点上运行(比如上图中的 Worker Node 2)。
  • Executor:一个JVM进程。一个Worker可以管理一个或多个Executor,但一个Executor只有一个线程池,线程池里有多个线程,每个线程可以执行一个 task。Spark先以Executor为单位占用集群资源,然后Driver再分配任务执行。通常来说一个Executor可以分配多个CPU和内存。
  • Task:一个Executor内的线程,最小的计算单位。一个task一般使用一个CPU,且多个task共享同一个Executor的内存。
  • Job:Spark的作业。通常执行几次action(),就会有几个作业数。比如count()两次就有两个Job。
  • Stage:Spark Job的阶段。一个Job可以分为1~n个stage。(物理执行计划里面的概念)
  • Partition:数据的分区。分区个数可以决定该数据最多部署在几台机器上。
  • RDD:本质上是一个封装好的抽象类(abstract class)。并行数据集的抽象表示(Resilient Distributed Datasets, RDD)。另外提一下,Spark的Dataframe是在RDD基础上再封装的。

1.2.2优点缺点

以上介绍可以看出来Spark这么设计相比于Hadoop MapReduce的优点和缺点:

  • 优点:多个task以线程形式执行,互相可以共享内存,避免资源浪费;同时线程启动比进程启动更快。(MR里面的task是以java进程方式运行)
  • 缺点:多个task之间由于是线程的形式会导致资源竞争,另外多个task并行的日志会比较混乱。

1.3 Spark应用例子

1.3.1 GroupBy例子

下面举一个groupby的例子,来了解spark运行的流程。

代码语言:java复制
```scala
package org.apache.spark.examples

import java.util.Random

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.SparkContext._

/**
  * Usage: GroupByTest [numMappers] [numKVPairs] [valSize] [numReducers]
  */
object GroupByTest {
  def main(args: Array[String]) {
    val sparkConf = new SparkConf().setAppName("GroupBy Test")
    var numMappers = 3 //该应用包含3个map task 
    var numKVPairs = 4 //每个task随机生成4个<K,V>record 
    var valSize = 1000 //每个Value大小1000byte 
    var numReducers = 2 //由于随机产生的key会有重复,groupby聚合过程使用2个reduce task 

    val sc = new SparkContext(sparkConf) //也可以通过SparkSession初始化

    // 生成一个k-v形式的array
    val pairs1 = sc.parallelize(0 until numMappers, numMappers).flatMap { p =>
      val ranGen = new Random
      var arr1 = new Array[(Int, Array[Byte])](numKVPairs)
      for (i <- 0 until numKVPairs) {
        val byteArr = new Array[Byte](valSize)
        ranGen.nextBytes(byteArr)
        arr1(i) = (ranGen.nextInt(Int.MaxValue), byteArr)
      }
      arr1
    }.cache()
    // Enforce that everything has been calculated and in cache
    pairs1.count() //=numMappers * numKVPairs = 12,到这里cache()才会生效

    println(pairs1.groupByKey(numReducers).count()) //=4

    sc.stop()
  }
}

```

代码大致意思是:先用RDD的形式生成一堆key-value形式的数组,key是随机给0~Int最大值,value是一个随机的byte。然后调用groupby和count,把相同的key聚合,计算个数。

这里需要注意,真正在写应用的时候一般不用自己指定map task的个数,通常自动计算为:

MapTask个数=frac{输入数据大小}{每个分片大小(HDFS默认是128MB)}

实际执行流程比自己的要复杂,需要先建立逻辑处理流程(Logical Plan),然后根据逻辑处理流程生成 物理逻辑流程(Physical Plan),然后生成具体 task 执行。

1.3.2逻辑处理流程(Logical Plan)

逻辑处理流程通俗的话来说就是:各种各样的RDD不停的变换。逻辑处理流程表示的是数据上的依赖关系,不是 task 的执行图。仔细观察上面代码可以发现,action()一共有两次:

  1. 一次是flatmap生成array之后进行了一次count()。
  2. 一次是groupby之后进行了一次count()。

由于第二次count()时候数据依赖于前面,因此以变量result为例。使用 result.toDebugString 输出日志,可以看到整个逻辑处理流程如下:

代码语言:scala复制
(2) ShuffledRDD[2] at groupByKey at GroupByTest.scala:55 []
   -(3) MapPartitionsRDD[1] at flatMap at GroupByTest.scala:41 []
     |      CachedPartitions: 3; MemorySize: 12.4 кв;
            ExternalBlockStoreSize: 0.0 в; DiskSize: 0.0 B
     |  ParallelCollectionRDD[0] at parallelize at GroupByTest. scala:41 []

不难看出生成了3个RDD,后面的中括号是生成的顺序,即:ParallelCollectionRDDMapPartitionsRDDShuffledRDD

另外,(2) ShuffledRDD[2] at groupByKey at GroupByTest.scala:55 []前面的小括号里面的(2)、(3)是分区(partition)个数的意思。

这个日志“从里往外“看:可以看出来由于生成了3个数组,因此一直到MapPartitionsRDD都是3个分区。另外,由于我们使用了cache(),因此其中的3个分区计算时候会被缓存为CachedPartitions。而到groupby的时候,由于我们指定了var numReducers = 2,因此变成了2个分区。并且转成了ShuffledRDD

1.3.3 物理执行计划(Physical Plan)

上一节说的逻辑处理流程(Logical Plan)基本上可以理解是RDD之间的变化的关系,但是并不能执行计算任务,因此需要再转换成物理执行计划(Physical Plan)对任务执行。其中包括执行阶段(Stage)和执行任务(Task)。简单来说可以分成三个步骤:

  1. 确定应用(Application)会产生哪些作业(Job)。 比如上面例子因为count()两次,就是两个Job。
  2. 将每个作业(Job)拆分成1~n个执行阶段(Stage)。 这里是根据逻辑处理流程的数据依赖关系来拆分。比如上面例子第一个Job就只拆了1个stage,而第二个Job拆成了2个Stage。为什么这么拆,后面再说。
  3. 确定执行任务(task)的个数和种类。 这里是依据RDD的分区(Partition)个数来确定,比如第二个Job,一开始是3个partition,因此在stage 0 里面是3个task来计算任务。到了stage 1,由于定义了var numReducers = 2,变成了2个分区,因此在这里是2个task来计算任务。stage 0→stage 1,这个过程称为shuffle机制,会将数据重新分配。

注:为什么要拆分执行阶段(Stage)?

  1. 便于并行执行。 先看同一个stage里面,多个task大小合适,且为同构的,并行起来方便。
  2. 提高数据处理效率。 再看同一个task里面,多个操作串行处理,效率高。
  3. 方便错误容忍。 如果一个stage挂了,直接重新运行这个stage就行了,不用把整个job都重新运行。

1.4 查看日志

如果想知道自己Spark Application的运行流程,可以根据Spark提供的执行界面查看。

1.4.1 查看Job日志

Job日志可以查看Stage的运行的情况

1.4.2 查看Stage日志

上图点开后,可以看到多个stage,点击stage的超链接(或者从Job那边点超链接也可以),可以查看该stage的运行情况。

打开之后可以查看DAG,查看RDD的生成顺序,同时也可以查看每个task的运行时间,方便排查问题。

0 人点赞