大数据开发-Spark编程

2022-04-28 08:52:19 浏览数 (1)

Windows环境

https://cloud.tencent.com/developer/article/1987790

两种操作

转换操作

  • filter
  • map
  • flatMap
  • groupByKey
  • reduceByKey

行动操作

  • count()
  • collect()
  • first()
  • take(n)
  • reduce(func)
  • foreach(func)
  • saveAsTextFile()

转换操作并不会开始计算,只是记录下要做啥计算,之后在调用行动操作的时候才会开始计算,并且每次都是从开始开始计算。

Spark编程概要

获取sc

代码语言:javascript复制
object Test {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("WordCount").setMaster("local")
    val sc = new SparkContext(conf)
  }
}

RDD创建

RDD创建的两种方式

方式1 加载文件

代码语言:javascript复制
val inputFile = "file:///D:\spark_study\wordcount.txt"
val rdd = sc.textFile(inputFile)

或者

代码语言:javascript复制
val inputFile = "hdfs://localhost:9000/user/hadoop/wordcount.txt"
val rdd = sc.textFile(inputFile)

方式2

代码语言:javascript复制
val list = Array(1, 2, 3, 4, 5, 6, 7, 8)
var rdd = sc.parallelize(list)

简单示例

代码语言:javascript复制
import org.apache.spark.{SparkConf, SparkContext}

object WordCount {
  def main(args: Array[String]): Unit = {
    val inputFile = "file:///D:\spark_study\wordcount.txt"
    val conf = new SparkConf().setAppName("WordCount").setMaster("local")
    val sc = new SparkContext(conf)
    val textFile = sc.textFile(inputFile)
    val wordCount = textFile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey((a, b) => a   b)
    wordCount.foreach(println)
  }
}

持久化

代码语言:javascript复制
val list = Array(1, 2, 3, 4, 5, 6, 7, 8)
var rdd = sc.parallelize(list, 2)
rdd = rdd.filter(item => item > 1)
rdd.cache()
//rdd.persist(StorageLevel.MEMORY_ONLY)
rdd.foreach(println)
rdd.filter(item => item < 7).foreach(println)

注意

rdd持久化可以用两个方法

  • rdd.cache()
  • rdd.persist(StorageLevel.MEMORY_ONLY)

这两个方法作用是一样的,只不过后者可以设置持久化的位置,cache()则是直接持久化到内存中。

分区

代码语言:javascript复制
val arr = Array(1, 2, 3, 4, 5)
// 分区
val rdd = sc.parallelize(arr, 2)
rdd.filter(item => item > 2).foreach(println)

Pair

创建

代码语言:javascript复制
val list = List("Hadoop", "Spark", "Hive", "Spark")
val rdd = sc.parallelize(list)
val pairRDD = rdd.map(word => (word, 1))
pairRDD.foreach(println)

或者

代码语言:javascript复制
val rdd = sc.textFile("file:///D:\spark_study\wordcount.txt")
val pairRDD = rdd.flatMap(line => line.split(" ")).map(word => (word, 1))
pairRDD.foreach(println)

wordcount.txt

代码语言:javascript复制
good good study
day day up

reduceByKey

代码语言:javascript复制
val list = List("good good study", "day day up")
val rdd = sc.parallelize(list)
val pairRDD = rdd.flatMap(line => line.split(" ")).map(word => (word, 1))
pairRDD.reduceByKey((a, b) => a   b).foreach(println)

结果

(up,1) (day,2) (good,2) (study,1)

groupByKey()

代码语言:javascript复制
val list = List("good good study", "day day up")
val rdd = sc.parallelize(list)
val pairRDD = rdd.flatMap(line => line.split(" ")).map(word => (word, 1))
pairRDD.groupByKey().foreach(println)

结果

(up,CompactBuffer(1)) (day,CompactBuffer(1, 1)) (good,CompactBuffer(1, 1)) (study,CompactBuffer(1))

keys/values

代码语言:javascript复制
val list = List("good good study", "day day up")
val rdd = sc.parallelize(list)
val pairRDD = rdd.flatMap(line => line.split(" ")).map(word => (word, 1))
val pairRDD2 = pairRDD.groupByKey()
pairRDD2.keys.foreach(println)
pairRDD2.values.foreach(println)

结果

up day good study CompactBuffer(1) CompactBuffer(1, 1) CompactBuffer(1, 1) CompactBuffer(1)

sortByKey()

代码语言:javascript复制
val list = List("good good study", "day day up")
val rdd = sc.parallelize(list)
val pairRDD = rdd.flatMap(line => line.split(" ")).map(word => (word, 1))
pairRDD.sortByKey().foreach(println)

结果

(day,1) (day,1) (good,1) (good,1) (study,1) (up,1)

mapValues(func)

代码语言:javascript复制
val list = List("good good study", "day day up")
val rdd = sc.parallelize(list)
val pairRDD = rdd.flatMap(line => line.split(" ")).map(word => (word, 1))
pairRDD.mapValues((x) => x   100).foreach(println)

结果

(good,101) (good,101) (study,101) (day,101) (day,101) (up,101)

join

代码语言:javascript复制
val pairRDD1 = sc.parallelize(Array(("spark", 1), ("spark", 2), ("hadoop", 3), ("hadoop", 5)))
val pairRDD2 = sc.parallelize(Array(("spark", 100)))
println("join:")
pairRDD1.join(pairRDD2).foreach(println)
println("fullOuterJoin:")
pairRDD1.fullOuterJoin(pairRDD2).foreach(println)
println("leftOuterJoin:")
pairRDD1.leftOuterJoin(pairRDD2).foreach(println)
println("rightOuterJoin:")
pairRDD1.rightOuterJoin(pairRDD2).foreach(println)

结果

join: (spark,(1,100)) (spark,(2,100)) fullOuterJoin: (spark,(Some(1),Some(100))) (spark,(Some(2),Some(100))) (hadoop,(Some(3),None)) (hadoop,(Some(5),None)) leftOuterJoin: (spark,(1,Some(100))) (spark,(2,Some(100))) (hadoop,(3,None)) (hadoop,(5,None)) rightOuterJoin: (spark,(Some(1),100)) (spark,(Some(2),100))

共享变量

广播变量

广播变量(broadcast variables)允许程序开发人员在每个机器上缓存一个只读的变量,而不是为机器上的每个任务都生成一个副本。通过这种方式,就可以非常高效地给每个节点(机器)提供一个大的输入数据集的副本。Spark的“动作”操作会跨越多个阶段(stage),对于每个阶段内的所有任务所需要的公共数据,Spark都会自动进行广播。通过广播方式进行传播的变量,会经过序列化,然后在被任务使用时再进行反序列化。这就意味着,显式地创建广播变量只有在下面的情形中是有用的:当跨越多个阶段的那些任务需要相同的数据,或者当以反序列化方式对数据进行缓存是非常重要的。

可以通过调用SparkContext.broadcast(v)来从一个普通变量v中创建一个广播变量。这个广播变量就是对普通变量v的一个包装器,通过调用value方法就可以获得这个广播变量的值,具体代码如下:

代码语言:javascript复制
val broadcastVar = sc.broadcast(Array(1, 2, 3))
println(broadcastVar.value.mkString("Array(", ", ", ")"))

这个广播变量被创建以后,那么在集群中的任何函数中,都应该使用广播变量broadcastVar的值,而不是使用v的值,这样就不会把v重复分发到这些节点上。此外,一旦广播变量创建后,普通变量v的值就不能再发生修改,从而确保所有节点都获得这个广播变量的相同的值。

累加器

累加器是仅仅被相关操作累加的变量,通常可以被用来实现计数器(counter)和求和(sum)。

Spark原生地支持数值型(numeric)的累加器,程序开发人员可以编写对新类型的支持。如果创建累加器时指定了名字,则可以在Spark UI界面看到,这有利于理解每个执行阶段的进程。

一个数值型的累加器,可以通过调用SparkContext.longAccumulator()或者SparkContext.doubleAccumulator()来创建。

运行在集群中的任务,就可以使用add方法来把数值累加到累加器上,但是,这些任务只能做累加操作,不能读取累加器的值,只有任务控制节点(Driver Program)可以使用value方法来读取累加器的值。

下面是一个代码实例,演示了使用累加器来对一个数组中的元素进行求和:

代码语言:javascript复制
val accum = sc.longAccumulator("My Accumulator")
sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum.add(x))
println(accum.value)

结果

10

JSON处理

转换的库的网址

https://github.com/json4s/json4s/

代码语言:javascript复制
import org.json4s._
import org.json4s.jackson.JsonMethods._
import org.json4s.jackson.Serialization

implicit val formats = Serialization.formats(ShortTypeHints(List()))
val testjson = """{"name":"joe","age":15,"luckNumbers":[1,2,3,4,5]}"""
val p = parse(testjson).extract[Person]
println(p.name)
println(p.age)
println(p.luckNumbers)

JSON文件json.txt

代码语言:javascript复制
{"name": "xiaoming","age": 10, "luckNumbers":[1,2,3,4,5]}
{"name": "xiaohong","age": 18,"luckNumbers":[3,4,5]}
{"name": "xiaogang","age": 6,"luckNumbers":[1,2,3]}

代码

代码语言:javascript复制
import org.apache.spark.{SparkConf, SparkContext}
import org.json4s._
import org.json4s.jackson.JsonMethods._
import org.json4s.jackson.Serialization

object Test {
  case class Person(name: String, age: Int, luckNumbers: List[Int])

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("WordCount").setMaster("local")
    val sc = new SparkContext(conf)
    val inputFile = "file:///D:\spark_study\json.txt"
    val rdd = sc.textFile(inputFile)
    rdd.foreach(item => {
      implicit val f1 = Serialization.formats(ShortTypeHints(List()))
      val p = parse(item).extract[Person]
      println(p.name)
    })
  }
}

结果

xiaoming

xiaohong

xiaogang

0 人点赞