2021年大数据Spark(五十):Structured Streaming 案例一实时数据ETL架构

2021-10-11 10:21:11 浏览数 (1)


案例一 实时数据ETL架构

     在实际实时流式项目中,无论使用Storm、SparkStreaming、Flink及Structured Streaming处理流式数据时,往往先从Kafka 消费原始的流式数据,经过ETL后将其存储到Kafka Topic中,以便其他业务相关应用消费数据,实时处理分析,技术架构流程图如下所示:

     接下来模拟产生运营商基站数据,实时发送到Kafka 中,使用StructuredStreaming消费,经过ETL(获取通话状态为success数据)后,写入Kafka中,便于其他实时应用消费处理分析。

​​​​​​​准备主题

创建Topic,相关命令如下:

代码语言:javascript复制
#查看topic信息

/export/server/kafka/bin/kafka-topics.sh --list --zookeeper node1:2181

#删除topic

/export/server/kafka/bin/kafka-topics.sh --delete --zookeeper node1:2181 --topic stationTopic

/export/server/kafka/bin/kafka-topics.sh --delete --zookeeper node1:2181 --topic etlTopic



#创建topic

/export/server/kafka/bin/kafka-topics.sh --create --zookeeper node1:2181 --replication-factor 1 --partitions 3 --topic stationTopic

/export/server/kafka/bin/kafka-topics.sh --create --zookeeper node1:2181 --replication-factor 1 --partitions 3 --topic etlTopic



#模拟生产者

/export/server/kafka/bin/kafka-console-producer.sh --broker-list node1:9092 --topic stationTopic

/export/server/kafka/bin/kafka-console-producer.sh --broker-list node1:9092 --topic etlTopic



#模拟消费者

/export/server/kafka/bin/kafka-console-consumer.sh --bootstrap-server node1:9092 --topic stationTopic --from-beginning

/export/server/kafka/bin/kafka-console-consumer.sh --bootstrap-server node1:9092 --topic etlTopic --from-beginning

​​​​​​​模拟基站日志数据

运行如下代码,实时产生模拟日志数据,发送Kafka Topic:

代码语言:javascript复制
package cn.itcast.structedstreaming

import java.util.Properties

import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.kafka.common.serialization.StringSerializer

import scala.util.Random

/**
 * 模拟产生基站日志数据,实时发送Kafka Topic中,数据字段信息:
 * 基站标识符ID, 主叫号码, 被叫号码, 通话状态, 通话时间,通话时长
 */
object MockStationLog {
  def main(args: Array[String]): Unit = {
    // 发送Kafka Topic
    val props = new Properties()
    props.put("bootstrap.servers", "node1:9092")
    props.put("acks", "1")
    props.put("retries", "3")
    props.put("key.serializer", classOf[StringSerializer].getName)
    props.put("value.serializer", classOf[StringSerializer].getName)
    val producer = new KafkaProducer[String, String](props)

    val random = new Random()
    val allStatus = Array(
      "fail", "busy", "barring", "success", "success", "success",
      "success", "success", "success", "success", "success", "success"
    )

    while (true) {
      val callOut: String = "1860000d".format(random.nextInt(10000))
      val callIn: String = "1890000d".format(random.nextInt(10000))
      val callStatus: String = allStatus(random.nextInt(allStatus.length))
      val callDuration = if ("success".equals(callStatus)) (1   random.nextInt(10)) * 1000L else 0L

      // 随机产生一条基站日志数据
      val stationLog: StationLog = StationLog(
        "station_"   random.nextInt(10),
        callOut,
        callIn,
        callStatus,
        System.currentTimeMillis(),
        callDuration
      )
      println(stationLog.toString)
      Thread.sleep(100   random.nextInt(100))

      val record = new ProducerRecord[String, String]("stationTopic", stationLog.toString)
      producer.send(record)
    }

    producer.close() // 关闭连接
  }

  /**
   * 基站通话日志数据
   */
  case class StationLog(
                         stationId: String, //基站标识符ID
                         callOut: String, //主叫号码
                         callIn: String, //被叫号码
                         callStatus: String, //通话状态
                         callTime: Long, //通话时间
                         duration: Long //通话时长
                       ) {
    override def toString: String = {
      s"$stationId,$callOut,$callIn,$callStatus,$callTime,$duration"
    }
  }

}

运行程序,基站通话日志数据格式如下:

​​​​​​​实时增量ETL

编写代码实时从Kafka的【stationTopic】消费数据,经过处理分析后,存储至Kafka的【etlTopic】,其中需要设置检查点目录,保证应用一次且仅一次的语义。

代码语言:javascript复制
package cn.itcast.structedstreaming

import org.apache.commons.lang3.StringUtils
import org.apache.spark.SparkContext
import org.apache.spark.sql.streaming.{OutputMode, StreamingQuery}
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}

/**
 * 实时从Kafka Topic消费基站日志数据,过滤获取通话转态为success数据,再存储至Kafka Topic中
 * 1、从KafkaTopic中获取基站日志数据
 * 2、ETL:只获取通话状态为success日志数据
 * 3、最终将ETL的数据存储到Kafka Topic中
 */
object StructuredEtlSink {
  def main(args: Array[String]): Unit = {
    val spark: SparkSession = SparkSession.builder()
      .appName(this.getClass.getSimpleName.stripSuffix("$"))
      .master("local[*]")
      .config("spark.sql.shuffle.partitions", "3")
      .getOrCreate()
    val sc: SparkContext = spark.sparkContext
    sc.setLogLevel("WARN")
    import spark.implicits._
    import org.apache.spark.sql.functions._

    // 1. 从KAFKA读取数据
    val kafkaStreamDF: DataFrame = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "node1:9092")
      .option("subscribe", "stationTopic")
      .load()

    // 2. 对基站日志数据进行ETL操作
    // station_0,18600004405,18900009049,success,1589711564033,9000
    val etlStreamDF: Dataset[String] = kafkaStreamDF
      // 获取value字段的值,转换为String类型
      .selectExpr("CAST(value AS STRING)")
      .as[String]
      // 过滤数据:通话状态为success
      .filter(log => StringUtils.isNoneBlank(log) && "success".equals(log.trim.split(",")(3)))
    etlStreamDF.printSchema()

    // 3. 针对流式应用来说,输出的是流
    val query: StreamingQuery = etlStreamDF.writeStream
      // 对流式应用输出来说,设置输出模式
      .outputMode(OutputMode.Append())
      .format("kafka")
      .option("kafka.bootstrap.servers", "node1:9092")
      .option("topic", "etlTopic")
      // 设置检查点目录
      .option("checkpointLocation", "./ckp"   System.currentTimeMillis())
      .start()
    query.awaitTermination()
    query.stop()
  }
}

0 人点赞