Spark 创建算子源码解析

2022-03-23 14:12:35 浏览数 (1)

Spark创建方式可以通过集合进行创建,或者通过HDFS等存储文件创建,还可以基于其他算子进行转换操作。

1. 基于集合的创建

parallelize(seq, numSlices)

  • 使用方式

通过parallelize创建RDD, 可以将driver端的集合创建为RDD。通过传入Array或Seq,并设置其分区值,创建ParallelCollectionRDD。

代码语言:javascript复制
val rdd = spark.sparkContext.parallelize(Array(("a", 1), ("b", 2), ("c", 3)), 2)
  • 源码解析
代码语言:javascript复制
override def getPartitions: Array[Partition] = {
  // RDD调用slice方法
  val slices = ParallelCollectionRDD.slice(data, numSlices).toArray
  slices.indices.map(i => new ParallelCollectionPartition(id, i, slices(i))).toArray
}

parallelize实质是使用ParallelCollectionRDD.slice将数组中的数据进行切分,并分配到各个分区中。

代码语言:javascript复制
def positions(length: Long, numSlices: Int): Iterator[(Int, Int)] = {
  (0 until numSlices).iterator.map { i =>
    val start = ((i * length) / numSlices).toInt
    val end = (((i   1) * length) / numSlices).toInt
    (start, end)
  }
}

拆分的规则,如上所示,将start =(分区id * 数据总长度)/ 分区数, end=(分区id 1 * 数据总长度)/ 分区数, 分区id从0开始。最后调用Array.slice方法将数据进行切分。

分区数默认为:conf.getInt("spark.default.parallelism", math.max(totalCoreCount.get(), 2)), 机器总核数和2的最大值。

  • makeRDD 实质是调用parallelize(seq, numSlices)算子。不过其还有另一个方法,def makeRDD[T: ClassTag](seq: Seq[(T, Seq[String])]), 可以指定数据的优先位置。

eg:

代码语言:javascript复制
val rdd = spark.sparkContext.makeRDD(Seq((1 to 10,Seq("host1", "host2")),
Seq(11 to 20,Seq("host3"))))
println(rdd.preferredLocations(rdd.partitions(0)))

2. 基于存储的创建

textfile(path, minPartitions): RDD[String]

textfile函数是用来读取hdfs文件系统上的文件,并返回String类型的数据。

其是基于HadoopRDD实现的。

代码语言:javascript复制
def textFile(
    path: String,
    minPartitions: Int = defaultMinPartitions): RDD[String] = withScope {
  assertNotStopped()
  hadoopFile(path,classOf[TextInputFormat],classOf[LongWritable],classOf[Text],
    minPartitions).map(pair => pair._2.toString).setName(path)
}

hadoopRDD的返回值是key-value形式,key为分区id, 再经过map操作,过滤为仅仅value数据值。

textFile在读取hdfs上文件前,先从本地获取hadoopConfiguration配置信息,并将其封装为广播变量,broadcast(new SerializableConfiguration(hadoopConfiguration))。

代码语言:javascript复制
override def getPartitions: Array[Partition] = {
  val jobConf = getJobConf()
  ...
  try {
    // 获取输入文件的切分
    val allInputSplits = getInputFormat(jobConf).getSplits(jobConf, minPartitions)
    val inputSplits = if (ignoreEmptySplits) {
      allInputSplits.filter(_.getLength > 0)
    } else {
      allInputSplits
    }
    // 分区数等于inputSplits数
    val array = new Array[Partition](inputSplits.size)
    for (i <- 0 until inputSplits.size) {
      array(i) = new HadoopPartition(id, i, inputSplits(i))
    }
    ...
}

分区数即为获取输入文件的切分数。

而切分数和几个因素有关:minPartitions, goalSize,blockSize

总结下HadoopRDD分区规则:

1.如果textFile指定分区数量为0或者1的话,defaultMinPartitions值为1,则有多少个文件,就会有多少个分区。

2.如果不指定默认分区数量,则默认分区数量为2,则会根据所有文件字节大小totalSize除以分区数量,得到的值goalSize,然后比较goalSize和hdfs指定分块大小(这里是128M)作比较,以较小的最为goalSize作为切分大小,对每个文件进行切分,若文件大于大于goalSize,则会生成该(文件大小/goalSize)个分区,如果文件内的数据不能除尽则分区数会 1,则为(fileSize/goalSize) 1。

3.如果指定分区数量大于等于2,则默认分区数量为指定值,生成实际分区数量规则任然同2中的规则一致。

总之:文件总大小除以分区数,大于分块大小,则与分块大小相关,否则以得到的商相关。

0 人点赞