Spark Streaming 项目实战 (2) | 从 Kafka中消费数据

2020-10-28 17:37:12 浏览数 (1)

编写App, 从 kafka 读取数据

  新建一个Maven项目:spark-streaming-project

  在依赖选择上spark-streaming-kafka此次选用0-10_2.11而非0-08_2.11

代码语言:javascript复制
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
        <version>2.1.1</version>
    </dependency>

一. 测试是否能够从Kafka消费到数据

  • 1. 新建APP(Trait)
代码语言:javascript复制
package com.buwenbuhuo.streaming.project.app


import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
/**
 *
 * @author 不温卜火
 * @create 2020-08-14 13:41
 * MyCSDN :  https://buwenbuhuo.blog.csdn.net/
 *
 */
trait App {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("App").setMaster("local[*]")
    val ssc = new StreamingContext(conf, Seconds(3))

    val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> "hadoop002:9092,hadoop003:9092,hadoop004:9092",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "bigdata0814",
      "auto.offset.reset" -> "latest",
      // 自动提交管理
      "enable.auto.commit" -> (true: java.lang.Boolean)
    )
    val topics = Array("ads_log0814")

    val sourceStream: DStream[String] = KafkaUtils.createDirectStream[String, String](
      ssc,
      PreferConsistent, // 标配
      Subscribe[String, String](topics, kafkaParams)
    ).map(_.value())
    sourceStream.print(1000)


    ssc.start()
    ssc.awaitTermination()
  }
}
  • 2. 新建AreaTopAPP
代码语言:javascript复制
package com.buwenbuhuo.streaming.project.app

/**
 *
 * @author 不温卜火
 * @create 2020-08-14 13:41
 * MyCSDN :  https://buwenbuhuo.blog.csdn.net/
 *
 */
object AreaTopAPP extends App {

}
  • 3. 运行AreaTopAPP

  上述即为测试 ,但是其实在这个app内,有一部分可以专门封装成一个新的样例类

  测试能够成功得到所想要的结果,下面给出完善最终的程序源码

二. 完整程序源码

编写App, 从 kafka 读取数据

  • bean 类 AdsInfo
代码语言:javascript复制
package com.buwenbuhuo.streaming.project.bean
import java.sql.Timestamp
import java.text.SimpleDateFormat
import java.util.Date

/**
 *
 * @author 不温卜火
 * @create 2020-08-14 17:45
 * MyCSDN :  https://buwenbuhuo.blog.csdn.net/
 *
 */
case class AdsInfo(ts: Long,
                   area: String,
                   city: String,
                   userId: String,
                   adsId: String,
                   var timestamp: Timestamp = null,
                   var dayString: String = null, // 2012-8-14
                   var hmString: String = null) { // 11:20

  timestamp = new Timestamp(ts)

  val date = new Date(ts)
  dayString = new SimpleDateFormat("yyyy-MM-dd").format(date)
  hmString = new SimpleDateFormat("HH:mm").format(date)
}
  • 2. 工具类类 MyKafkaUtils
代码语言:javascript复制
package com.buwenbuhuo.streaming.project.util

import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent

/**
 *
 * @author 不温卜火
 * @create 2020-08-14 15:20
 * MyCSDN :  https://buwenbuhuo.blog.csdn.net/
 *
 */
object MyKafkaUtils {

  val kafkaParams = Map[String, Object](
    "bootstrap.servers" -> "hadoop002:9092,hadoop003:9092,hadoop004:9092",
    "key.deserializer" -> classOf[StringDeserializer],
    "value.deserializer" -> classOf[StringDeserializer],
    "group.id" -> "bigdata0814",
    "auto.offset.reset" -> "latest",
    // 自动提交管理
    "enable.auto.commit" -> (true: java.lang.Boolean)
  )

  /*
  * 根据传入的参数,返回从kafka得到的流
  * @param ssc
  * @param topic
  * @return
  */
  def getKafkaSteam(ssc:StreamingContext,topics:String*): DStream[String] =

    KafkaUtils.createDirectStream[String, String](
      ssc,
      PreferConsistent, // 标配
      Subscribe[String, String](topics.toIterable, kafkaParams)
    ).map(_.value())
}
  • 3. 从kafka消费数据(APP)
代码语言:javascript复制
package com.buwenbuhuo.streaming.project.app


import com.buwenbuhuo.streaming.project.bean.AdsInfo
import com.buwenbuhuo.streaming.project.util.MyKafkaUtils
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
 *
 * @author 不温卜火
 * @create 2020-08-14 13:41
 * MyCSDN :  https://buwenbuhuo.blog.csdn.net/
 *
 */
trait App {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("App").setMaster("local[*]")
    val ssc = new StreamingContext(conf, Seconds(3))
    // 得到最原始的流
    val sourceStream: DStream[String] = MyKafkaUtils.getKafkaSteam(ssc,"ads_log0814")

    val adsInfoStream: DStream[AdsInfo] = sourceStream.map(s => {
      val spilt: Array[String] = s.split(",")
      AdsInfo(spilt(0).toLong, spilt(1), spilt(2), spilt(3), spilt(4))
    })

    adsInfoStream.print(1000)

    ssc.start()
    ssc.awaitTermination()
  }
}
  • 4. AreaTopAPP
代码语言:javascript复制
package com.buwenbuhuo.streaming.project.app

/**
 *
 * @author 不温卜火
 * @create 2020-08-14 13:41
 * MyCSDN :  https://buwenbuhuo.blog.csdn.net/
 *
 */
object AreaTopAPP extends App {

}

3. 运行结果

同时运行MockRealtimeData(数据生产者)和AreaTopAPP(数据消费者)

  本次的分享就到这里了

0 人点赞