SparkCore之RDD

2022-04-26 15:02:10 浏览数 (1)

RDD 五大特性

  • A list of partitions 一组分区:多个分区,在RDD中用分区的概念。
  • A function for computing each split 函数:每个(split/partitions)对应的计算逻辑
  • A list of dependencies on other RDDs 依赖关系:可对其他RDD有依赖关系,比如上一个RDD结果需要由下一个RDD进行处理。
  • Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned) 分区器:key-value型的RDD是根据哈希来分区的,类似于mapreduce当中的paritioner接口,控制Key分到哪个reduce。
  • Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file) 优先位置:作用在每个分区上的优先位置。由spark自动分配

https://blog.csdn.net/zym1117/article/details/79532458

RDD的创建方式

  1. 通过本地集合创建

makeRDD

代码语言:javascript复制
def makeRDD[T: ClassTag](seq: Seq[T],numSlices: Int = defaultParallelism)

底层实现采用 parallelize

代码语言:javascript复制
def makeRDD[T: ClassTag](seq: Seq[T],numSlices: Int = defaultParallelism): RDD[T] = withScope {
    parallelize(seq, numSlices)
}

seq:传入一个集合队列 numSlices:指定分区数;若不指定会采用默认的 通过源码知道numSlices默认值通过 spark.default.parallelism 配置

代码语言:javascript复制
 override def defaultParallelism(): Int =scheduler.conf.getInt("spark.default.parallelism", totalCores)

totalCores 通过 LocalSchedulerBackend 类传进来的。 至于为啥是LocalSchedulerBackend,因为我用的是本地模式,当然也有StandaloneSchedulerBackend(集群模式)

代码语言:javascript复制
private[spark] class LocalSchedulerBackend(conf: SparkConf,scheduler: TaskSchedulerImpl,val totalCores: Int)

LocalSchedulerBackend 会从SparkContext中背创建

代码语言:javascript复制
val conf=new SparkConf().setMaster("local[4]").setAppName("custom-app")
val sc=new SparkContext(conf)

master 就是在.setMaster("local[4]")指定的值。也就是说,该值会在初始化SparkContext是指定。

代码语言:javascript复制
master match {
      case "local" =>
        checkResourcesPerTask(clusterMode = false, Some(1))
        val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true)
        val backend = new LocalSchedulerBackend(sc.getConf, scheduler, 1)
        scheduler.initialize(backend)
        (backend, scheduler)

      case LOCAL_N_REGEX(threads) =>
        def localCpuCount: Int = Runtime.getRuntime.availableProcessors()
        // local[*] estimates the number of cores on the machine; local[N] uses exactly N threads.
        val threadCount = if (threads == "*") localCpuCount else threads.toInt
        if (threadCount <= 0) {
          throw new SparkException(s"Asked to run locally with $threadCount threads")
        }
        checkResourcesPerTask(clusterMode = false, Some(threadCount))
        val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true)
        val backend = new LocalSchedulerBackend(sc.getConf, scheduler, threadCount)
        scheduler.initialize(backend)
        (backend, scheduler)

      case LOCAL_N_FAILURES_REGEX(threads, maxFailures) =>
        def localCpuCount: Int = Runtime.getRuntime.availableProcessors()
        // local[*, M] means the number of cores on the computer with M failures
        // local[N, M] means exactly N threads with M failures
        val threadCount = if (threads == "*") localCpuCount else threads.toInt
        checkResourcesPerTask(clusterMode = false, Some(threadCount))
        val scheduler = new TaskSchedulerImpl(sc, maxFailures.toInt, isLocal = true)
        val backend = new LocalSchedulerBackend(sc.getConf, scheduler, threadCount)
        scheduler.initialize(backend)
        (backend, scheduler)

      case SPARK_REGEX(sparkUrl) =>
        checkResourcesPerTask(clusterMode = true, None)
        val scheduler = new TaskSchedulerImpl(sc)
        val masterUrls = sparkUrl.split(",").map("spark://"   _)
        val backend = new StandaloneSchedulerBackend(scheduler, sc, masterUrls)
        scheduler.initialize(backend)
        (backend, scheduler)

      case LOCAL_CLUSTER_REGEX(numSlaves, coresPerSlave, memoryPerSlave) =>
        checkResourcesPerTask(clusterMode = true, Some(coresPerSlave.toInt))
        // Check to make sure memory requested <= memoryPerSlave. Otherwise Spark will just hang.
        val memoryPerSlaveInt = memoryPerSlave.toInt
        if (sc.executorMemory > memoryPerSlaveInt) {
          throw new SparkException(
            "Asked to launch cluster with %d MiB RAM / worker but requested %d MiB/worker".format(
              memoryPerSlaveInt, sc.executorMemory))
        }

        // For host local mode setting the default of SHUFFLE_HOST_LOCAL_DISK_READING_ENABLED
        // to false because this mode is intended to be used for testing and in this case all the
        // executors are running on the same host. So if host local reading was enabled here then
        // testing of the remote fetching would be secondary as setting this config explicitly to
        // false would be required in most of the unit test (despite the fact that remote fetching
        // is much more frequent in production).
        sc.conf.setIfMissing(SHUFFLE_HOST_LOCAL_DISK_READING_ENABLED, false)

        val scheduler = new TaskSchedulerImpl(sc)
        val localCluster = new LocalSparkCluster(
          numSlaves.toInt, coresPerSlave.toInt, memoryPerSlaveInt, sc.conf)
        val masterUrls = localCluster.start()
        val backend = new StandaloneSchedulerBackend(scheduler, sc, masterUrls)
        scheduler.initialize(backend)
        backend.shutdownCallback = (backend: StandaloneSchedulerBackend) => {
          localCluster.stop()
        }
        (backend, scheduler)

      case masterUrl =>
        checkResourcesPerTask(clusterMode = true, None)
        val cm = getClusterManager(masterUrl) match {
          case Some(clusterMgr) => clusterMgr
          case None => throw new SparkException("Could not parse Master URL: '"   master   "'")
        }
        try {
          val scheduler = cm.createTaskScheduler(sc, masterUrl)
          val backend = cm.createSchedulerBackend(sc, masterUrl, scheduler)
          cm.initialize(scheduler, backend)
          (backend, scheduler)
        } catch {
          case se: SparkException => throw se
          case NonFatal(e) =>
            throw new SparkException("External scheduler cannot be instantiated", e)
        }
    }

local有三种模式

  • setMaster("local");默认指定一个处理线程
代码语言:javascript复制
case "local" =>
        checkResourcesPerTask(clusterMode = false, Some(1))
        val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true)
        val backend = new LocalSchedulerBackend(sc.getConf, scheduler, 1)
        scheduler.initialize(backend)
        (backend, scheduler)

通过此代码可以看出,totalCores 默认值为1

代码语言:javascript复制
val backend = new LocalSchedulerBackend(sc.getConf, scheduler, 1)
  • setMaster("local[*]") 或者 setMaster("local[N]"); * 表示服务器cpu总核数 N 表示自己指定一个core

通过正则的方式匹配.setMaster("local[4]")local[*]还是 local[N]

代码语言:javascript复制
 val LOCAL_N_REGEX = """local[([0-9] |*)]""".r
代码语言:javascript复制
// 当前设置的local[4],所以 threads=4
case LOCAL_N_REGEX(threads) =>
        // 获取当前服务器的cpu核数
        def localCpuCount: Int = Runtime.getRuntime.availableProcessors()
        // local[*] estimates the number of cores on the machine; local[N] uses exactly N threads.
        val threadCount = if (threads == "*") localCpuCount else threads.toInt
       // 校验 threads是否大于0
        if (threadCount <= 0) {
          throw new SparkException(s"Asked to run locally with $threadCount threads")
        }
        checkResourcesPerTask(clusterMode = false, Some(threadCount))
        val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true)
        // 通过上面代码得出,
        // 如果是 * 那么 threadCount =计算机cpu 核数
        // 如果是数字,那么就是具体的数字
        val backend = new LocalSchedulerBackend(sc.getConf, scheduler, threadCount)
        scheduler.initialize(backend)
        (backend, scheduler)

那么此时验证一下,是否如想到那样 partition 数是 4

代码语言:javascript复制
def main(args: Array[String]): Unit = {
    val conf=new SparkConf().setMaster("local[4]").setAppName("custom-app")
    val sc=new SparkContext(conf)

    val list=List(1,2,3,4,5,6,7,8)
    val value: RDD[Int] = sc.makeRDD(list)
    println(s"Partitions=${value.getNumPartitions}")
    println(value.collect.toList)
}
代码语言:javascript复制
Partitions=4
List(1, 2, 3, 4, 5, 6, 7, 8)

如果是local[*];我的电脑就是16核

代码语言:javascript复制
Partitions=16
List(1, 2, 3, 4, 5, 6, 7, 8)

除了使用默认的(由spark分配),当然我们也可以自动分配,比如设置为3

代码语言:javascript复制
  def main(args: Array[String]): Unit = {
    val list=List(1,2,3,4,5,6,7,8)
    val value: RDD[Int] = sc.makeRDD(list,3)
    println(s"Partitions=${value.getNumPartitions}")
    println(value.collect.toList)
  }
代码语言:javascript复制
Partitions=3
List(1, 2, 3, 4, 5, 6, 7, 8)

前面说过makeRDD底层就是parallelize,所以直接使用parallelize也是没问题的。

代码语言:javascript复制
def main(args: Array[String]): Unit = {
    //val lines: RDD[String] = sc.textFile("file:///C:/Users/123456/Desktop/worldCount.txt",3)
    val list=List(1,2,3,4,5,6,7,8)
    val value: RDD[Int] = sc.parallelize(list,3)
    println(s"Partitions=${value.getNumPartitions}")
    println(value.collect.toList)
  }
代码语言:javascript复制
Partitions=3
List(1, 2, 3, 4, 5, 6, 7, 8)

parallelize:底层实现

代码语言:javascript复制
def parallelize[T: ClassTag](seq: Seq[T],numSlices: Int = defaultParallelism): RDD[T] = withScope {
    assertNotStopped()
    new ParallelCollectionRDD[T](this, seq, numSlices, Map[Int, Seq[String]]())
}
  1. 通过读取文件创建

通过读取文件创建RDD 如果集群配置文件中有配置HADOOP_CONF_DIR,此时spark默认读取的是HDFS文件 1、读取HDFS文件: 1、sc.textFile("/.../...") 2、sc.textFile("hdfs:///.../...") 3、sc.textFile("hdfs://hadoop102:8020/.../...") 2、读取本地文件: sc.textFile("file:///.../...") 如果集群配置文件中没有配置HADOOP_CONF_DIR,此时spark默认读取的是本地文件 1、读取HDFS文件: sc.textFile("hdfs://hadoop102:8020/.../...") 2、读取本地文件: 1、sc.textFile("/.../...") 2、sc.textFile("file:///.../...")

读取hdfs文件hdfs://

代码语言:javascript复制
  @Test
  def readHdfs():Unit={
    val conf=new SparkConf().setMaster("local[4]").setAppName("custom-app")
    val sc=new SparkContext(conf)

    val lines=sc.textFile("hdfs://hadoop102:9820/input/wordcount")
    println(s"Partitions=${lines.getNumPartitions}")
    println(lines.collect.toList)

  }

为了方便测试,使用了junit

代码语言:javascript复制
Partitions=2
List(你好 google , python 你好 count word hello, count 你好 google)

读取本地文件file://

代码语言:javascript复制
  @Test
  def readLocalFile():Unit={
    val conf=new SparkConf().setMaster("local[4]").setAppName("custom-app")
    val sc=new SparkContext(conf)

    val lines= sc.textFile("file:///C:/Users/123456/Desktop/worldCount.txt")
    println(s"Partitions=${lines.getNumPartitions}")
    println(lines.collect.toList)
  }
代码语言:javascript复制
Partitions=2
List(hello java shell, python java java, wahaha java shell)

明明指定的是local[4] 为啥Partitions=2? 分享textFile源码 从textFile参数列表中可以看出,除了指定文件地址外,还需要配置一个minPartitions;入股不配置使用默认值

代码语言:javascript复制
def textFile(path: String,minPartitions: Int = defaultMinPartitions): RDD[String] = withScope {
    assertNotStopped()
    hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
      minPartitions).map(pair => pair._2.toString).setName(path)
  }
代码语言:javascript复制
  /**
   * Default min number of partitions for Hadoop RDDs when not given by user
   * Notice that we use math.min so the "defaultMinPartitions" cannot be higher than 2.
   * The reasons for this are discussed in https://github.com/mesos/spark/pull/718
   */
  def defaultMinPartitions: Int = math.min(defaultParallelism, 2)

通过源码追踪 defaultParallelism 就是我们上面获取到的(如下)。所以defaultParallelism=4

代码语言:javascript复制
override def defaultParallelism(): Int = scheduler.conf.getInt("spark.default.parallelism", totalCores)

由此得出若不指定minPartitions默认值为 小于等于2

我们指定一个分区数试试,比如5

代码语言:javascript复制
@Test
def readLocalFile():Unit={
    val lines= sc.textFile("file:///C:/Users/123456/Desktop/worldCount.txt",5)
    println(s"Partitions=${lines.getNumPartitions}")
    println(lines.collect.toList)
}
代码语言:javascript复制
Partitions=5

通过读取文件创建的RDD的分区数 1、如果有设置minPartitions参数, RDD分区数 >= minPartitions 2、如果没有设置minPartitions参数,RDD分区数 >= Math.min( defaultParallelism, 2) RDD的分区数最终看文件的切片数

  1. 通过其他RDD衍生 完善worldCount功能
代码语言:javascript复制
 @Test
  def readLocalFile():Unit={
    val lines= sc.textFile("file:///C:/Users/123456/Desktop/worldCount.txt",4)
    println(s"Partitions=${lines.getNumPartitions}")

    val groupList: RDD[(String, Iterable[String])] = lines.flatMap(_.split(" ")).groupBy(x=>x)
    val value: RDD[(String, Int)] = groupList.map({ case (k, v) => (k, v.size) })

    println(value.collect.toList)
  }

以其他方式参数的RDD如:flatMap,map 就称为RDD的衍生

代码语言:javascript复制
Partitions=4
List((python,2), (wahaha,2), (shell,3), (hello,2), (java,7))
代码语言:javascript复制
    * 通过集合创建的RDD
    *       如果在通过parallelize在创建RDD的时候有指定numSlices,此时RDD的分区数 = numSlices
    *       如果在通过parallelize在创建RDD的时候没有指定numSlices
    *             defaultParallelism的值:
    *                 如果有设置spark.default.parallelism参数,则RDD的分区数 = spark.default.parallelism参数值
    *                 如果没有设置spark.default.parallelism参数
    *                       1、master=local[N]的时候,RDD的分区数 = N
    *                       2、master=local[*]的时候,RDD的分区数 = 机器CPU个数
    *                       3、master=local的时候,RDD的分区数 = 1
    *                       4、master=spark://.. 的时候,RDD的分区数 = 本次任务所有executor cpu总核数
    */
  • textFile 文件切片

运行程序

代码语言:javascript复制
 @Test
  def readLocalFile():Unit={
    val lines= sc.textFile("file:///C:/Users/123456/Desktop/worldCount.txt",4)
    println(s"Partitions=${lines.getNumPartitions}")

    val groupList: RDD[(String, Iterable[String])] = lines.flatMap(_.split(" ")).groupBy(x=>x)
    val value: RDD[(String, Int)] = groupList.map({ case (k, v) => (k, v.size) })
    println(value.collect.toList)
  }

worldCount.txt 文件大小

267 字节 (267 字节)

切片源码

代码语言:javascript复制
public InputSplit[] getSplits(JobConf job, int numSplits)
    throws IOException {
    StopWatch sw = new StopWatch().start();
    FileStatus[] files = listStatus(job);
    
    // 获取文件总大小  totalSize =267
    job.setLong(NUM_INPUT_FILES, files.length);
    long totalSize = 0;                           // compute total size
    for (FileStatus file: files) {                // check we have valid files
      if (file.isDirectory()) {
        throw new IOException("Not a file: "  file.getPath());
      }
      totalSize  = file.getLen();
    }
    // 计算每个分区分配多大文件 267/4 =66 取整
    long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits);
    // 获取最低切片个数,默认为1。
    long minSize = Math.max(job.getLong(org.apache.hadoop.mapreduce.lib.input.
      FileInputFormat.SPLIT_MINSIZE, 1), minSplitSize);

    // splits 存储切块数据。
    ArrayList<FileSplit> splits = new ArrayList<FileSplit>(numSplits);
    NetworkTopology clusterMap = new NetworkTopology();
    for (FileStatus file: files) {
      Path path = file.getPath();
      long length = file.getLen();
     // 判断文件大小
      if (length != 0) {
        FileSystem fs = path.getFileSystem(job);
        BlockLocation[] blkLocations;
        if (file instanceof LocatedFileStatus) {
          blkLocations = ((LocatedFileStatus) file).getBlockLocations();
        } else {
          blkLocations = fs.getFileBlockLocations(file, 0, length);
        }
        // 判断文件是否可切
        if (isSplitable(fs, path)) {
          // 获取分块大小,本地 32M;HDFS:128M
          long blockSize = file.getBlockSize();
          // 获取切片大小(通过下面分析得出 splitSize =66)
          long splitSize = computeSplitSize(goalSize, minSize, blockSize);
          // 循环得出切片大小
          //SPLIT_SLOP=1.1 用于处理剩余文件
          // 第一次   267/66 = 4.045454545454546 ;bytesRemaining =267-66=201
          // 第二次   201/66 = 3.0454545454545454 ;bytesRemaining =201-66=135
          // 第三次   135/66 = 2.0454545454545454 ;bytesRemaining =267-66=69
          // 第四次   69/66 = 1.0454545454545454; 该结果小于  SPLIT_SLOP 所以不进while 
          long bytesRemaining = length; // bytesRemaining =267
          while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
            String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations,
                length-bytesRemaining, splitSize, clusterMap);
            splits.add(makeSplit(path, length-bytesRemaining, splitSize,
                splitHosts[0], splitHosts[1]));
            bytesRemaining -= splitSize;
          }
          // 第四次   由于上面最后一次不满足while 条件,所以不进;剩下的数据都将合在一起。
          if (bytesRemaining != 0) {
            String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations, length
                - bytesRemaining, bytesRemaining, clusterMap);
            splits.add(makeSplit(path, length - bytesRemaining, bytesRemaining,
                splitHosts[0], splitHosts[1]));
          }
        } else {
          String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations,0,length,clusterMap);
          splits.add(makeSplit(path, 0, length, splitHosts[0], splitHosts[1]));
        }
      } else { 
        //Create empty hosts array for zero length files
        splits.add(makeSplit(path, 0, length, new String[0]));
      }
    }
    sw.stop();
    if (LOG.isDebugEnabled()) {
      LOG.debug("Total # of splits generated by getSplits: "   splits.size()
            ", TimeTaken: "   sw.now(TimeUnit.MILLISECONDS));
    }
    // splits 大小=4;表示最终分区分为splits.size个。
    return splits.toArray(new FileSplit[splits.size()]);
  }

computeSplitSize Hadoop 切片大小公式

代码语言:javascript复制
// goalSize=66
// minSize=1
// blockSize=32<<20 字节
protected long computeSplitSize(long goalSize, long minSize,long blockSize) {
    // Math.min(goalSize, blockSize) 找最小值  结果就是66
   //   Math.max(minSize, 66); 找最大值 66 
    return Math.max(minSize, Math.min(goalSize, blockSize));
}

调整 worldCount.txt 文件大小

79 字节 (79 字节)

此时发现,程序最终Partitions结果为 5

代码语言:javascript复制
Partitions=5

通过切片来说明

获取文件总大小

代码语言:javascript复制
long totalSize = 79; 

计算每个分区分配多大文件 goalSize = 79/4 =19 取整

代码语言:javascript复制
long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits);

获取最低切片个数。 minSize =1

代码语言:javascript复制
long minSize = Math.max(job.getLong(org.apache.hadoop.mapreduce.lib.input.FileInputFormat.SPLIT_MINSIZE, 1), minSplitSize);

获取切片大小 splitSize = 19

代码语言:javascript复制
long splitSize = computeSplitSize(goalSize, minSize, blockSize);

切片

代码语言:javascript复制
          // 第一次 79 /19 = 4.157894736842105 ; bytesRemaining  =79-19=60
          // 第二次 60 /19 = 3.1578947368421053 ; bytesRemaining  =60-19=41
          // 第三次 41 /19 = 2.1578947368421053 ; bytesRemaining  =41-19=22
          // 第四次 22 /19 = 1.1578947368421053 ; bytesRemaining  =22-19=3
          // 第五次 3 /19 = 0.15789473684210525 ;不满足 while  循环,
          long bytesRemaining = length; // bytesRemaining =79
          while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
            String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations,
                length-bytesRemaining, splitSize, clusterMap);
            splits.add(makeSplit(path, length-bytesRemaining, splitSize,
                splitHosts[0], splitHosts[1]));
            bytesRemaining -= splitSize;
          }
          // 第五次   将剩下文件分区合并在一起。
          if (bytesRemaining != 0) {
            String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations, length
                - bytesRemaining, bytesRemaining, clusterMap);
            splits.add(makeSplit(path, length - bytesRemaining, bytesRemaining,
                splitHosts[0], splitHosts[1]));
          }

此次切片之后,切分了5次,最终的分区数所以就是5。

虽然在上面,设置了切片数为4,他只是表示最低的切片数为4。具体分多少分区,还得看最终文件切片数量。

代码语言:javascript复制
al lines= sc.textFile("file:///C:/Users/123456/Desktop/worldCount.txt",4)

0 人点赞