SparkStreaming编程实现

2018-10-18 15:16:21 浏览数 (1)

1.产生实时流

代码语言:javascript复制
nc -lk 1234

hello hadoop word
hello spark hbase
hive hello china

​

2.MyNetworkWordCount.scala

代码语言:javascript复制
package day10

import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

object MyNetworkWordCount {

  Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
  def main(args: Array[String]): Unit = {
    //创建StreamingContext对象
    val sparkConf = new SparkConf().setAppName("MyNetworkWordCount").setMaster("local[2]")
    //定义一个采样时间 每隔2秒钟采集一次数据
    val ssc = new StreamingContext(sparkConf,Seconds(2))
    //创建一个离散流 DStream代表输入的数据流
    val lines = ssc.socketTextStream("hadoop01",1234)

    //处理数据
    val words = lines.flatMap(_.split(" "))
    val result = words.map(x => (x,1)).reduceByKey(_ _)

    //输出结果
    result.print()

    ssc.start()

    ssc.awaitTermination()



  }
}

2.MyNetworkWordCount_total.scala(设置检查点)

代码语言:javascript复制
package day10

import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

object MyNetworkWordCount_total {
  def main(args: Array[String]): Unit = {
    //取消日志
    Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
    //设置环境变量
    val conf = new SparkConf().setAppName("StreamWordCountTotal").setMaster("local[2]")
    //创建StreamingContext
    val ssc = new StreamingContext(conf,Seconds(3))

    //设置标记点
    ssc.checkpoint("./cpt")
    //创建离散流 DStream代表输入的离散流
    val lines = ssc.socketTextStream("hadoop01",1234)
    //处理分词
    val words = lines.flatMap(_.split(" "))
    val pairs = words.map(word => (word,1))

    //构建函数
    val updateStateFunc = (currValues:Seq[Int],preValues:Option[Int])=>{
      val curr = currValues.sum
      val pre = preValues.getOrElse(0)
      Some(curr pre)

    }

    //累加
    val totalvalue = pairs.updateStateByKey(updateStateFunc)

    totalvalue.print()

    ssc.start()

    ssc.awaitTermination()


  }

}

3.MyNetworkTotalWordCountV2.scala(开发自己的实时词频统计程序(累计单词出现次数))

代码语言:javascript复制
package cn.edu360.spark.day10

import javax.net.ssl.SSLSessionContext
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.ReceiverInputDStream
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
  * 开发自己的实时词频统计程序(累计单词出现次数)
  *
  */
object MyNetworkTotalWordCountV2 {
  val ckp = "./ckp"

  /**
    * 该函数会作用在相同的key的value上
    * @param newValues
    * @param runningCount
    * @return
    */
  def updataFunction(newValues:Seq[Int], runningCount:Option[Int]):Option[Int] = {
    //得到当前的总和
    val currentTotal = newValues.sum
    //执行累加操作:如果是第一次执行(单词第一次出现,则没有之前的值)
    val totalValues = runningCount.getOrElse(0)
    Some(currentTotal   totalValues)
  }

  //如果从checkpoint目录中恢复不了上一个job的scc实例,则创建一个新的ssc
  def funcToCreateContext (): StreamingContext={
    println("create new ssc ..........")
    //创建StreamingContext对象
    val sparkConf = new SparkConf().setAppName("MyNetworkWordCount").setMaster("local[2]")

    //定义一个采样时间,每隔2秒钟采集一次数据,这个时间不能随意设置
    val ssc = new StreamingContext(sparkConf, Seconds(2))

    //设置检查点目录
    ssc.checkpoint(ckp)

    //创建一个离散流,DStream代表输入的数据流
    val lines: ReceiverInputDStream[String] = ssc.socketTextStream("bigdata01", 5678)

    //设置checkpoint,默认每10秒做一次checkpoint
    lines.checkpoint(Seconds(6))

    //处理分词,每个单词记一次数
    val words = lines.flatMap(_.split(" "))
    val pairs = words.map(x=>(x, 1))

    //累加
    val totalResult = pairs.updateStateByKey(updataFunction _)
    totalResult.print()

    ssc
  }

  def main(args: Array[String]): Unit = {
    Logger.getLogger("org.apache.spark").setLevel(Level.OFF)

    //定义一个采样时间,每隔2秒钟采集一次数据,这个时间不能随意设置
    val ssc = StreamingContext.getOrCreate(ckp, funcToCreateContext _)

    //开始计算
    ssc.start()

    //等待计算被中断
    ssc.awaitTermination()
  }
}

4.onlinehotitems.scala(窗口函数)

代码语言:javascript复制
package cn.edu360.spark.day10

import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
  * 背景描述:
  *   在社交网络(微博),电子商务(京东)、搜索引擎(百度)、股票交易中人们关心的内容之一是我所关注的内容中,大家正在关注什么
  *   在实际企业中非常有价值
  *   例如:我们关注过去30分钟大家都在热搜什么?并且每5分钟更新一次。要求列出来搜索前三名的话题内容
  *
  *   数据格式:
  *     hadoop,201810080102
  *     spark,201810080103
  *
  *    问题:
  *       下述代码每隔20秒回重新计算之前60秒内的所有数据,如果窗口时间间隔太长,那么需要重新计算的数据就比较大,非常耗时
  *    解决:
  *       searchPair.reduceByKeyAndWindow((v1:Int, v2:Int) => v1 v2, (v1:Int, v2:Int) => v1-v2, Seconds(60), Seconds(20))
  *
  *
  */
object OnlineHotItems {
  def main(args: Array[String]): Unit = {
    Logger.getLogger("org.apache.spark").setLevel(Level.OFF)

    //创建StreamingContext对象
    val sparkConf = new SparkConf().setAppName("OnlineHotItems").setMaster("local[2]")

    /**
      * 此处设置Batch Interval 是在Spark Streaming中生成基本Job的时间单位,窗口和滑动时间间隔必须是是该
      * Batch Interval的整数倍
      */
    val ssc = new StreamingContext(sparkConf, Seconds(5))

    //创建一个离散流,DStream代表输入的数据流
    val hottestStream: ReceiverInputDStream[String] = ssc.socketTextStream("bigdata01", 5678)

    /**
      * 用户搜索的格式简化为item,time  在这里我们由于要计算出热点内容,所以只需要取出item即可
      * 提取出的item然后通过map转换为(item,1)格式
      */
    val searchPair = hottestStream.map(_.split(",")(0)).filter(!_.isEmpty).map(item=>(item, 1))
    val hottestDStream: DStream[(String, Int)] = searchPair.reduceByKeyAndWindow((v1:Int, v2:Int) => v1 v2, Seconds(60), Seconds(20))

    val result: DStream[(String, Int)] = hottestDStream.transform(hottestRDD => {
      val top3: Array[(String, Int)] = hottestRDD.map(pair => (pair._2, pair._1)).sortByKey(false).map(pair => (pair._2, pair._1)).take(3)
      ssc.sparkContext.makeRDD(top3)
    })
    result.print()

    ssc.start()
    ssc.awaitTermination()
  }
}

5.MyNetworkWordCountMysql.scala(保存数据到mysql)

有一个配置文件

application.conf

代码语言:javascript复制
db.url="jdbc:mysql://hadoop01:3306/bigdata"
db.user="root"
db.password="123456"
db.table="test"
代码语言:javascript复制
package cn.edu360.spark.day11

import java.util.Properties

import com.typesafe.config.ConfigFactory
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.sql.{SaveMode, SparkSession}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.ReceiverInputDStream

/**
  * 将实时词频统计的数据写入到mysql数据库
  * 
  */
object MyNetworkWordCountMysql {

  def main(args: Array[String]): Unit = {
    Logger.getLogger("org.apache.spark").setLevel(Level.OFF)

    //会去加载resources下面的配置文件,默认规则:application.conf->application.json->application.properties
    val config = ConfigFactory.load()

    //创建StreamingContext对象
    val sparkConf = new SparkConf().setAppName("MyNetworkWordCount").setMaster("local[2]")

    //定义一个采样时间,每隔2秒钟采集一次数据,这个时间不能随意设置
    val ssc = new StreamingContext(sparkConf, Seconds(5))

    //创建一个离散流,DStream代表输入的数据流
    val lines: ReceiverInputDStream[String] = ssc.socketTextStream("hadoop", 1234)

    //插入当前批次计算出来的数据结果
    lines.foreachRDD(rdd => {
      //创建一个Spark Session对象
      val spark = SparkSession.builder().config(rdd.sparkContext.getConf).getOrCreate()

      //将Rdd转换成DataFrame
      import spark.implicits._
      //words是列的名字,表只有一列
      val wordsDataFrame = rdd.flatMap(line => line.split(" ")).toDF("words")

      //创建临时视图
      wordsDataFrame.createOrReplaceTempView("wordcount")

      //执行Sql, 进行词频统计
      val result = spark.sql("select words, count(*) as total from wordcount group by words")

      //封装用户名和口令
      val props = new Properties()
      props.setProperty("user", config.getString("db.user"))
      props.setProperty("password", config.getString("db.password"))

      if (!result.rdd.isEmpty()) {
        result.write.mode(SaveMode.Append).jdbc(config.getString("db.url"), config.getString("db.table"), props)
      }
    })

    ssc.start()
    ssc.awaitTermination()
  }
}

6.MyNetworkWordCountMysqlState.scala(Mysql加强版)

代码语言:javascript复制
package cn.edu360.spark.day11

import java.sql.DriverManager

import com.typesafe.config.ConfigFactory
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.ReceiverInputDStream

/**
  * 将实时词频统计的数据写入到mysql数据库(基于State统计)
  * Created by zhangjingcun on 2018/10/9 9:43.
  */
object MyNetworkWordCountMysqlState {

  def main(args: Array[String]): Unit = {
    Logger.getLogger("org.apache.spark").setLevel(Level.OFF)

    //会去加载resources下面的配置文件,默认规则:application.conf->application.json->application.properties
    val config = ConfigFactory.load()

    //创建StreamingContext对象
    val sparkConf = new SparkConf().setAppName("MyNetworkWordCountMysqlState").setMaster("local[2]")

    //定义一个采样时间,每隔5秒钟采集一次数据,这个时间不能随意设置
    val ssc = new StreamingContext(sparkConf, Seconds(2))

    //创建一个离散流,DStream代表输入的数据流
    val lines: ReceiverInputDStream[String] = ssc.socketTextStream("hadoop01", 1234)

    /**
      * 插入当前批次计算出来的数据结果
      * foreachRDD 在Driver端运行
      * foreachPartition、foreach在worker端运行
      */
    lines.foreachRDD(rdd => {
      //计算当前批次结果
      val curr_batch_result: RDD[(String, Int)] = rdd.flatMap(line => line.split(" ")).filter(!_.isEmpty).map(word => (word, 1)).reduceByKey(_   _)

      //插入当前批次计算出来的数据结果
      curr_batch_result.foreachPartition(partition => {
        //创建一个连接
        val url = config.getString("db.url")
        val user = config.getString("db.user")
        val password = config.getString("db.password")
        val conn = DriverManager.getConnection(url, user, password)

        //将当前分区里面的所有数据都插入到mysql数据库中
        partition.foreach(tp => {
          val word = tp._1
          //判断即将插入的数据是否之前已经插入过,如果已经插入过,则进行更新操作,否则就是插入
          val pstmts = conn.prepareStatement("select * from wordcount where words=?")
          pstmts.setString(1, word)
          val rs = pstmts.executeQuery()
          var flag = false
          while (rs.next()) {
            println(s"${word}数据库中存在")
            flag = true  //已经存在该单词
            //即将插入的单词已经存在,可以进行更新操作
            val dbCurrCount = rs.getInt("total")
            //计算最新的值
            val newCount = dbCurrCount   tp._2
            //更新
            val update = conn.prepareStatement("update wordcount set total=? where words=?")
            update.setInt(1, newCount)
            update.setString(2, word)
            update.executeUpdate()
            update.close()
          }
          rs.close()
          pstmts.close()

          if (!flag){
            //插入一条数据
            val stmts = conn.prepareStatement("insert into wordcount values(?,?)")
            stmts.setString(1, tp._1)
            stmts.setInt(2, tp._2)
            stmts.executeUpdate()
            pstmts.close()
          }
        })
        if (conn!=null) conn.close()
      })
    })

    ssc.start()
    ssc.awaitTermination()
  }

}

7.SparkStreaming_SparkSql

使用SparkSql查询SparkStreaming里的数据

代码语言:javascript复制
package day11

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.{Seconds, StreamingContext}

object SparkStreaming_SparkSql {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local[2]")
    val ssc = new StreamingContext(conf,Seconds(5))
    val lines = ssc.socketTextStream("hadoop01",1234)
    val words = lines.flatMap(_.split(" "))

    //使用sparkSql老查询SparkStreaming的流式数据
    words.foreachRDD(rdd => {
      val spark = SparkSession.builder().config(rdd.sparkContext.getConf).getOrCreate()
      import spark.implicits._
      val wordsDataFrame = rdd.toDF("words")
      wordsDataFrame.createOrReplaceTempView("t_words")
      val result = spark.sql("select words,count(*) as total from t_words group by words")
      result.show()
    })

    ssc.start()

    ssc.awaitTermination()

  }

}

0 人点赞