spark streaming读取kafka内容并进行反序列化

2022-03-28 21:37:51 浏览数 (1)

环境:

scala:2.12

spark:3.1.2

本文介绍spark从kafka获取数据,并进行反序列化

代码语言:javascript复制
import com.fasterxml.jackson.databind.ObjectMapper
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.Trigger.ProcessingTime
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.ForeachWriter
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.databind.DeserializationFeature
/**
 * 将从kafka接收到的数据并使用jackson进行反序列化
 */
object KafkaToPostgresql {

  def main(args: Array[String]): Unit = {
    val spark: SparkSession = SparkSession.builder
      .appName("kafka to postgresql")
      .master("local[2]")
      .getOrCreate()


    import spark.implicits._

    val df = spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "xx.xx.xx.xx:9092")
      .option("subscribe", "topic-name")
      .option("startingOffsets","latest")
      .load()

    // 显示kafka报文格式
    df.printSchema()

    val rowDataset = df.selectExpr("CAST(value AS STRING)")
    
    rowDataset.writeStream.foreach(new ForeachWriter[Row](){
      override def process(record: Row): Unit = {
        val mapper = new ObjectMapper()
        mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
        // 很重要
        mapper.registerModule(DefaultScalaModule)
        val msg = mapper.readValue(record(0).toString(), classOf[Student])
        println(msg)
      }

      override def close(errorOrNull: Throwable): Unit = {}

      override def open(partitionId: Long, epochId: Long): Boolean = true
    })
      .start()
      .awaitTermination()
  }
}


case class Student(name:String,age:Int)

0 人点赞