环境:
scala:2.12
spark:3.1.2
本文介绍spark从kafka获取数据,并进行反序列化
代码语言:javascript复制import com.fasterxml.jackson.databind.ObjectMapper
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.Trigger.ProcessingTime
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.ForeachWriter
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.databind.DeserializationFeature
/**
* 将从kafka接收到的数据并使用jackson进行反序列化
*/
object KafkaToPostgresql {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession.builder
.appName("kafka to postgresql")
.master("local[2]")
.getOrCreate()
import spark.implicits._
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "xx.xx.xx.xx:9092")
.option("subscribe", "topic-name")
.option("startingOffsets","latest")
.load()
// 显示kafka报文格式
df.printSchema()
val rowDataset = df.selectExpr("CAST(value AS STRING)")
rowDataset.writeStream.foreach(new ForeachWriter[Row](){
override def process(record: Row): Unit = {
val mapper = new ObjectMapper()
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
// 很重要
mapper.registerModule(DefaultScalaModule)
val msg = mapper.readValue(record(0).toString(), classOf[Student])
println(msg)
}
override def close(errorOrNull: Throwable): Unit = {}
override def open(partitionId: Long, epochId: Long): Boolean = true
})
.start()
.awaitTermination()
}
}
case class Student(name:String,age:Int)