目录
封装公共接口(根据存储介质抽取特质)
封装公共接口(根据存储介质抽取特质)
Structured Streaming 流处理程序消费kafka数据以后,会将数据分别存储到Kudu、ES、ClickHouse中,因此可以根据存储介质不同,封装其公共接口,每个流处理程序继承自该接口
实现步骤:
- 在etl模块的 realtime 包下创建 StreamApp 特质
- 实现方法:创建读取kafka集群指定主题的数据
- 实现方法:创建execute方法
- 实现方法:创建save方法
package cn.it.logistics.etl.realtime
import cn.it.logistics.common.Configuration
import org.apache.kafka.common.internals.Topic
import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, SparkSession}
/**
* 这是所有ETL流式处理的基类
* kudu、es、ck都要实现这个特质
* 定义三个方法:
* 1)读取数据
* 2)处理数据
* 3)保存数据
*/
trait StreamApp {
/**
* 读取数据的方法
* @param sparkSession SparkSession
* @param topic 指定消费的主题
* @param selectExpr 默认值:CAST(value AS STRING)
*/
def getKafkaSource(sparkSession: SparkSession, topic: String, selectExpr:String = "CAST(value AS STRING)") = {
sparkSession.readStream.format(Configuration.SPARK_KAFKA_FORMAT)
.options(Map(
"kafka.bootstrap.servers" -> Configuration.kafkaAddress,
"subscribe" -> topic,
"group.id" -> "logistics", //该参数可以省略,不需要指定(官网提到改参数不能设置: kafka的source会在每次query的时候自定创建唯一的group id)
//表示数据丢失以后(topic被删除,或者offset不存在可用的范围的时候)
"failOnDataLoss" -> "false"
)).load().selectExpr(selectExpr)
}
/**
* 数据的处理
* @param sparkConf
*/
def execute(sparkConf: SparkConf)
/**
* 数据的保存
* @param dataFrame
* @param tableName
* @param isAutoCreateTable
*/
def save(dataFrame:DataFrame, tableName:String, isAutoCreateTable:Boolean = true)
}