Scala写Spark笔记

2023-12-25 17:15:31 浏览数 (2)

学习感悟

(1)配置环境最费劲

(2)动手写,动手写,动手写

WordCount

代码语言:javascript复制
package wordcount

import org.apache.spark.{SparkConf, SparkContext}

/**
  * @author CBeann
  * @create 2019-08-10 18:02
  */
object WordCount {

  def main(args: Array[String]): Unit = {
    //创建SparkConf()并且设置App名称
    val conf = new SparkConf().setMaster("local[8]").setAppName("cbeann")
    //创建SparkContext
    val sc = new SparkContext(conf);
    //使用sc创建rdd并且执行相应的tranformation和action
    val data = sc.textFile("C:\Users\Lenovo\Desktop\leetcode.txt")

    //操作
    val result = data.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_   _,1)

    //控制台打印
    result.collect().foreach(println _)

    //保存
    result.saveAsTextFile("F:\temp\aa")


    sc.stop()
    println("-----over-----")
  }

}

排序

代码语言:javascript复制
第一种方式:按照某一字段排序
val result = data.sortBy(_._2, false)

第二种方式:用类继承Ordered
val result =data.sortBy(x => Boy(x._1,x._2,x._3),false)
代码语言:javascript复制
package mysort

import org.apache.spark.{SparkConf, SparkContext}

/**
  * @author CBeann
  * @create 2019-08-10 18:26
  */
object MysortDemo {

  def main(args: Array[String]): Unit = {
    //创建SparkConf()并且设置App名称
    val conf = new SparkConf().setMaster("local[8]").setAppName("cbeann")
    //创建SparkContext
    val sc = new SparkContext(conf);
    //使用sc创建rdd并且执行相应的tranformation和action
    val data = sc.makeRDD(List(("张三", 10, 14), ("张三", 9, 9), ("张三", 13, 15)))

//    //第一种方式:按照某一字段排序
//    val result = data.sortBy(_._2, false)

    //第二种方式:用类继承Ordered
     val result =data.sortBy(x => Boy(x._1,x._2,x._3),false)



    //控制台打印
    result.collect().foreach(println _)


  }

}

case class Boy(name: String, faceVale: Int, age: Int) extends Ordered[Boy]{
  override def compare(that: Boy): Int = {
    if(this.faceVale!=that.faceVale){
      this.faceVale-that.faceVale
    }else{
      this.age-that.age
    }

  }
}

自定义分区

自定义分区器
代码语言:javascript复制
package mypartition

import org.apache.spark.Partitioner

import scala.collection.mutable

/**
  * @author CBeann
  * @create 2019-08-10 18:36
  *         自定义分区器,继承Partitioner
  */
class MyPartitioner extends Partitioner {


  val map = new mutable.HashMap[String, Int]()
  map.put("Java", 0)
  map.put("Scala", 1)
  map.put("Go", 2)


  //一共分多少个区
  override def numPartitions: Int = map.size

  //分区的业务逻辑
  override def getPartition(key: Any): Int = {
    map.getOrElse(key.toString, 0)

  }
}
测试类
代码语言:javascript复制
package mypartition

import org.apache.spark.{SparkConf, SparkContext}

/**
  * @author CBeann
  * @create 2019-08-10 18:59
  */
object PartitionDemo {

  def main(args: Array[String]): Unit = {
    //创建SparkConf()并且设置App名称
    val conf = new SparkConf().setMaster("local[8]").setAppName("cbeann")
    //创建SparkContext
    val sc = new SparkContext(conf);

    val data = sc.makeRDD(List(("Java", 11), ("Java", 9), ("Scala", 13), ("Go", 11)))


    val result = data.partitionBy(new MyPartitioner)

    result.saveAsTextFile("F:\temp\aaa")

    println("--------------OVER------------")


  }

}

SparkSQL

person.json

代码语言:javascript复制
{  "name": "王小二",   "age": 15}
{  "name": "王小三",   "age": 25}
{  "name": "王小四",   "age": 35}

测试类

代码语言:javascript复制
package sparksql

import org.apache.spark.sql.SparkSession

/**
  * @author CBeann
  * @create 2019-08-10 18:20
  */
object SparkSqlDemo {

  def main(args: Array[String]): Unit = {


    //创建SparkConf()并设置App名称
    val spark = SparkSession
      .builder()
      .appName("Spark SQL basic example").master("local[8]")
      //.config("spark.some.config.option", "some-value")
      .getOrCreate()

    // For implicit conversions like converting RDDs to DataFrames
    import spark.implicits._

    val df = spark.read.json("E:\IntelliJ IDEA 2019.1.3Workspace\ScalaSpark\SparkDemo\src\main\resources\json\person.json")

    // Displays the content of the DataFrame to stdout
    df.show()

    df.filter($"age" > 21).show()

    df.createOrReplaceTempView("persons")

    spark.sql("SELECT * FROM persons where age > 21").show()

    spark.stop()


    printf("-----over---------")


  }

}

SparkStream

无状态wordcount
代码语言:javascript复制
package stream

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
  * @author CBeann
  * @create 2019-08-10 18:38
  */
object StreamDemo {

  def main(args: Array[String]): Unit = {

    //需要新建了一个sparkconf变量
    val conf = new SparkConf().setAppName("SparkStreamDemo").setMaster("local[8]")
    //新建一个StreamContext入口
    val ssc = new StreamingContext(conf, Seconds(5))

    //从hostname 机器上的9999短空不断的获取数据
    val lines = ssc.socketTextStream("iZm5ea99qngm2v98asii1aZ", 9999);
    //val lines = ssc.receiverStream(new MyRecever("iZm5ea99qngm2v98asii1aZ",9999));

    //处理数据(wordcount)
    val result = lines.flatMap(_.split(" ")).map(x => (x, 1)).reduceByKey(_   _)
    result.print()


    //启动流式处理程序
    ssc.start()
    //等待你的停止信号
    ssc.awaitTermination()


    printf("--------OVER-------------")
  }

}
有状态wordcount

updateStateByKey方法是关键,传入一个固定参数的方法

代码语言:javascript复制
package stream.withstatus

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import stream.MyRecever

/**
  * @author CBeann
  * @create 2019-08-10 19:24
  */
object UpdateStateByKeyTest {

  def main(args: Array[String]): Unit = {
    //需要新建了一个sparkconf变量
    val conf = new SparkConf().setAppName("SparkStreamDemo").setMaster("local[8]")
    //新建一个StreamContext入口
    val ssc = new StreamingContext(conf, Seconds(5))

    ssc.checkpoint("F:\temp\aaa")

    //从hostname 机器上的9999短空不断的获取数据
    val lines = ssc.socketTextStream("iZm5ea99qngm2v98asii1aZ", 9999);

    //处理数据
    val result = lines.flatMap(_.split(" ")).map(x => (x, 1)).reduceByKey(_   _)

    //重点
    val allresult = result.updateStateByKey(updateFunction)

    allresult.print()


    //启动流式处理程序
    ssc.start()
    //等待你的停止信号
    ssc.awaitTermination()


    printf("--------OVER-------------")
  }

  //参数列表的类型是固定的,参数名称不是固定的,参数类型是固定的
  // currValues是当前批次RDD中相同的key的value集合
  //preValue是框架提供的上一次的值
  def updateFunction(currValues: Seq[Int], preValue: Option[Int]): Option[Int] = {

    //当前时间段内的数据
    val currValueSum = currValues.sum
    //当前时间段以前的数据
    val oldValueSum = preValue.getOrElse(0)
    //当前值的和加上历史值
    Some(currValueSum oldValueSum)

  }

}
自定义接收器
代码语言:javascript复制
package stream

import java.io.{BufferedReader, InputStreamReader}
import java.net.Socket
import java.nio.charset.StandardCharsets

import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.receiver.Receiver

/**
  * @author CBeann
  * @create 2019-08-10 18:39
  */
class MyRecever (host: String, port: Int) extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2) {

  //recever启动调用的方法
  override def onStart(): Unit = {

    new Thread() {
      override def run(): Unit = {
        receive()
      }
    }.start()


  }

  /** Create a socket connection and receive data until receiver is stopped */
  private def receive() {
    var socket: Socket = null
    var userInput: String = null
    try {
      // Connect to host:port
      socket = new Socket(host, port)

      // Until stopped or connection broken continue reading
      val reader = new BufferedReader(new InputStreamReader(socket.getInputStream(), StandardCharsets.UTF_8))

      userInput = reader.readLine()
      while (!isStopped && userInput != null) {

        // 传送出来
        store(userInput)

        userInput = reader.readLine()
      }
      reader.close()
      socket.close()

      // Restart in an attempt to connect again when server is active again
      restart("Trying to connect again")
    } catch {
      case e: java.net.ConnectException =>
        // restart if could not connect to server
        restart("Error connecting to "   host   ":"   port, e)
      case t: Throwable =>
        // restart if there is any other error
        restart("Error receiving data", t)
    }
  }

  override def onStop(): Unit = ???



}
代码语言:javascript复制
val lines = ssc.receiverStream(new MyRecever("iZm5ea99qngm2v98asii1aZ",9999));

pom.xml

代码语言:javascript复制
 <properties>
        <mysql.version>6.0.5</mysql.version>
        <spring.version>4.3.6.RELEASE</spring.version>
        <spring.data.jpa.version>1.11.0.RELEASE</spring.data.jpa.version>
        <log4j.version>1.2.17</log4j.version>
        <quartz.version>2.2.3</quartz.version>
        <slf4j.version>1.7.22</slf4j.version>
        <hibernate.version>5.2.6.Final</hibernate.version>
        <camel.version>2.18.2</camel.version>
        <config.version>1.10</config.version>
        <jackson.version>2.8.6</jackson.version>
        <servlet.version>3.0.1</servlet.version>
        <net.sf.json.version>2.4</net.sf.json.version>
        <activemq.version>5.14.3</activemq.version>
        <spark.version>2.1.1</spark.version>
        <scala.version>2.11.11</scala.version>
        <hadoop.version>2.7.3</hadoop.version>
    </properties>
<dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>

        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <!-- Logging -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>jcl-over-slf4j</artifactId>
            <version>${slf4j.version}</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>${slf4j.version}</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>${slf4j.version}</version>
        </dependency>
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>${log4j.version}</version>
        </dependency>

0 人点赞