客快物流大数据项目(五十五):封装公共接口(根据存储介质抽取特质)

2022-03-04 10:03:20 浏览数 (1)

目录

封装公共接口(根据存储介质抽取特质)

封装公共接口(根据存储介质抽取特质)

Structured Streaming 流处理程序消费kafka数据以后,会将数据分别存储到Kudu、ES、ClickHouse中,因此可以根据存储介质不同,封装其公共接口,每个流处理程序继承自该接口

实现步骤:

  • etl模块realtime 包下创建 StreamApp  特质
  • 实现方法:创建读取kafka集群指定主题的数据
  • 实现方法:创建execute方法
  • 实现方法:创建save方法
代码语言:javascript复制
package cn.it.logistics.etl.realtime

import cn.it.logistics.common.Configuration
import org.apache.kafka.common.internals.Topic
import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, SparkSession}

/**
 * 这是所有ETL流式处理的基类
 * kudu、es、ck都要实现这个特质
 * 定义三个方法:
 * 1)读取数据
 * 2)处理数据
 * 3)保存数据
 */
trait StreamApp {

  /**
   * 读取数据的方法
   * @param sparkSession    SparkSession
   * @param topic           指定消费的主题
   * @param selectExpr      默认值:CAST(value AS STRING)
   */
  def getKafkaSource(sparkSession: SparkSession, topic: String, selectExpr:String = "CAST(value AS STRING)") = {
    sparkSession.readStream.format(Configuration.SPARK_KAFKA_FORMAT)
      .options(Map(
        "kafka.bootstrap.servers" -> Configuration.kafkaAddress,
        "subscribe" -> topic,
        "group.id" -> "logistics", //该参数可以省略,不需要指定(官网提到改参数不能设置: kafka的source会在每次query的时候自定创建唯一的group id)
        //表示数据丢失以后(topic被删除,或者offset不存在可用的范围的时候)
        "failOnDataLoss" -> "false"
      )).load().selectExpr(selectExpr)
  }

  /**
   * 数据的处理
   * @param sparkConf
   */
  def execute(sparkConf: SparkConf)

  /**
   * 数据的保存
   * @param dataFrame
   * @param tableName
   * @param isAutoCreateTable
   */
  def save(dataFrame:DataFrame, tableName:String, isAutoCreateTable:Boolean = true)
}

0 人点赞