Spark——RDD转换操作

2019-08-14 14:41:23 浏览数 (2)

概述

每一次转换操作都会产生不同的RDD,供给下一个操作使用。

惰性机制

RDD的转换过程是惰性求值的,也就是,整个转换过程只记录轨迹,并不会发生真正的计算,只有遇到了行动操作时,才会触发真正的计算。

filter(func)

过滤出满足函数func的元素,并返回存入一个新的数据集

代码语言:javascript复制
    val conf = new SparkConf().setAppName("spark").setMaster("local")
    val sc = new SparkContext(conf)
    val rdd = sc.parallelize(List(1,2,3,4,5,6))
    val result = rdd.filter(_%2==0)
    println(result.collect().mkString(","))

map(func)

将每个元素传递到函数func中进行操作,并将结果返回为一个新的数据集。 collect()以数组的形式返回rdd的结果,但列表中每个数乘以2

代码语言:javascript复制
    val conf = new SparkConf().setAppName("spark").setMaster("local")
    val sc = new SparkContext(conf)
    val rdd = sc.parallelize(List(1,2,3,4,5,6))
    val mapResult = rdd.map(_*2)
    println(mapResult.collect().toBuffer)

flatMap(func)

与map相似,但是每个输入元素都可以映射到0或多个输出结果

代码语言:javascript复制
    val conf = new SparkConf().setAppName("spark").setMaster("local")
    val sc = new SparkContext(conf)
    val rdd = sc.parallelize(Array("a b c","b c d"))
    val result = rdd.flatMap(_.split(" "))
    println(result.collect().mkString(","))

sample

参数1 是否抽出的数据放回 参数2 抽样比例 浮点型 参数3 种子,默认值

代码语言:javascript复制
    val conf = new SparkConf().setAppName("spark").setMaster("local")
    val sc = new SparkContext(conf)
    val rdd = sc.parallelize(1 to 10)
    val result = rdd.sample(false,0.5)
    println(result.collect().mkString(","))

union

求并集

代码语言:javascript复制
    val conf = new SparkConf().setAppName("spark").setMaster("local")
    val sc = new SparkContext(conf)
    val rdd1 = sc.parallelize(List(1,3,4))
    val rdd2 = sc.parallelize(List(2,3,4))
    val result = rdd1.union(rdd2)
    println(result.collect().toBuffer)

intersection

求交集

代码语言:javascript复制
    val conf = new SparkConf().setAppName("spark").setMaster("local")
    val sc = new SparkContext(conf)
    val rdd1 = sc.parallelize(List(1,3,4))
    val rdd2 = sc.parallelize(List(2,3,4))
    val result = rdd1.intersection(rdd2)
    println(result.collect().toBuffer)

distinct

去除重复元素

代码语言:javascript复制
    val conf = new SparkConf().setAppName("spark").setMaster("local")
    val sc = new SparkContext(conf)
    val rdd = sc.parallelize(List(1,3,4,3,5,1))
    val result = rdd.distinct()
    println(result.collect().toBuffer)

groupByKey(func)

应用于(K,V)键值的数据集时,返回一个新的(K,Iterable)形式的数据集

reduceByKey(func)

应用于(K,V)键值对的数据集时,返回一个新的(K,V)形式的数据集,其中每个值是将每个Key传递到函数func中进行聚合后的结果。

持久化

RDD采用惰性求值的机制,每次遇到行动操作,都会从头开始执行计算,每次调用行动操作,都会触发一次从头开始的计算,这个对于迭代计算而言,代价非常大,因为迭代计算经常需要多次使用同一组的数据。

代码语言:javascript复制
    val conf = new SparkConf().setAppName("spark").setMaster("local")
    val sc = new SparkContext(conf)
    val list = List("Spark", "Hadoop", "Hive")
    val rdd = sc.parallelize(list)
    println(rdd.count())
    println(rdd.collect().mkString(","))
persist()

persist(MEMORY_ONLY) 表示将RDD作为反序列化的对象存储在JVM中,如果内存不足,就要按照LRU原则替换缓存中的内容。一般使用cache 调用persist(MEMORY_ONLY)

0 人点赞