案例一 实时数据ETL架构
在实际实时流式项目中,无论使用Storm、SparkStreaming、Flink及Structured Streaming处理流式数据时,往往先从Kafka 消费原始的流式数据,经过ETL后将其存储到Kafka Topic中,以便其他业务相关应用消费数据,实时处理分析,技术架构流程图如下所示:
接下来模拟产生运营商基站数据,实时发送到Kafka 中,使用StructuredStreaming消费,经过ETL(获取通话状态为success数据)后,写入Kafka中,便于其他实时应用消费处理分析。
准备主题
创建Topic,相关命令如下:
代码语言:javascript复制#查看topic信息
/export/server/kafka/bin/kafka-topics.sh --list --zookeeper node1:2181
#删除topic
/export/server/kafka/bin/kafka-topics.sh --delete --zookeeper node1:2181 --topic stationTopic
/export/server/kafka/bin/kafka-topics.sh --delete --zookeeper node1:2181 --topic etlTopic
#创建topic
/export/server/kafka/bin/kafka-topics.sh --create --zookeeper node1:2181 --replication-factor 1 --partitions 3 --topic stationTopic
/export/server/kafka/bin/kafka-topics.sh --create --zookeeper node1:2181 --replication-factor 1 --partitions 3 --topic etlTopic
#模拟生产者
/export/server/kafka/bin/kafka-console-producer.sh --broker-list node1:9092 --topic stationTopic
/export/server/kafka/bin/kafka-console-producer.sh --broker-list node1:9092 --topic etlTopic
#模拟消费者
/export/server/kafka/bin/kafka-console-consumer.sh --bootstrap-server node1:9092 --topic stationTopic --from-beginning
/export/server/kafka/bin/kafka-console-consumer.sh --bootstrap-server node1:9092 --topic etlTopic --from-beginning
模拟基站日志数据
运行如下代码,实时产生模拟日志数据,发送Kafka Topic:
代码语言:javascript复制package cn.itcast.structedstreaming
import java.util.Properties
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.kafka.common.serialization.StringSerializer
import scala.util.Random
/**
* 模拟产生基站日志数据,实时发送Kafka Topic中,数据字段信息:
* 基站标识符ID, 主叫号码, 被叫号码, 通话状态, 通话时间,通话时长
*/
object MockStationLog {
def main(args: Array[String]): Unit = {
// 发送Kafka Topic
val props = new Properties()
props.put("bootstrap.servers", "node1:9092")
props.put("acks", "1")
props.put("retries", "3")
props.put("key.serializer", classOf[StringSerializer].getName)
props.put("value.serializer", classOf[StringSerializer].getName)
val producer = new KafkaProducer[String, String](props)
val random = new Random()
val allStatus = Array(
"fail", "busy", "barring", "success", "success", "success",
"success", "success", "success", "success", "success", "success"
)
while (true) {
val callOut: String = "1860000d".format(random.nextInt(10000))
val callIn: String = "1890000d".format(random.nextInt(10000))
val callStatus: String = allStatus(random.nextInt(allStatus.length))
val callDuration = if ("success".equals(callStatus)) (1 random.nextInt(10)) * 1000L else 0L
// 随机产生一条基站日志数据
val stationLog: StationLog = StationLog(
"station_" random.nextInt(10),
callOut,
callIn,
callStatus,
System.currentTimeMillis(),
callDuration
)
println(stationLog.toString)
Thread.sleep(100 random.nextInt(100))
val record = new ProducerRecord[String, String]("stationTopic", stationLog.toString)
producer.send(record)
}
producer.close() // 关闭连接
}
/**
* 基站通话日志数据
*/
case class StationLog(
stationId: String, //基站标识符ID
callOut: String, //主叫号码
callIn: String, //被叫号码
callStatus: String, //通话状态
callTime: Long, //通话时间
duration: Long //通话时长
) {
override def toString: String = {
s"$stationId,$callOut,$callIn,$callStatus,$callTime,$duration"
}
}
}
运行程序,基站通话日志数据格式如下:
实时增量ETL
编写代码实时从Kafka的【stationTopic】消费数据,经过处理分析后,存储至Kafka的【etlTopic】,其中需要设置检查点目录,保证应用一次且仅一次的语义。
代码语言:javascript复制package cn.itcast.structedstreaming
import org.apache.commons.lang3.StringUtils
import org.apache.spark.SparkContext
import org.apache.spark.sql.streaming.{OutputMode, StreamingQuery}
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
/**
* 实时从Kafka Topic消费基站日志数据,过滤获取通话转态为success数据,再存储至Kafka Topic中
* 1、从KafkaTopic中获取基站日志数据
* 2、ETL:只获取通话状态为success日志数据
* 3、最终将ETL的数据存储到Kafka Topic中
*/
object StructuredEtlSink {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession.builder()
.appName(this.getClass.getSimpleName.stripSuffix("$"))
.master("local[*]")
.config("spark.sql.shuffle.partitions", "3")
.getOrCreate()
val sc: SparkContext = spark.sparkContext
sc.setLogLevel("WARN")
import spark.implicits._
import org.apache.spark.sql.functions._
// 1. 从KAFKA读取数据
val kafkaStreamDF: DataFrame = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "node1:9092")
.option("subscribe", "stationTopic")
.load()
// 2. 对基站日志数据进行ETL操作
// station_0,18600004405,18900009049,success,1589711564033,9000
val etlStreamDF: Dataset[String] = kafkaStreamDF
// 获取value字段的值,转换为String类型
.selectExpr("CAST(value AS STRING)")
.as[String]
// 过滤数据:通话状态为success
.filter(log => StringUtils.isNoneBlank(log) && "success".equals(log.trim.split(",")(3)))
etlStreamDF.printSchema()
// 3. 针对流式应用来说,输出的是流
val query: StreamingQuery = etlStreamDF.writeStream
// 对流式应用输出来说,设置输出模式
.outputMode(OutputMode.Append())
.format("kafka")
.option("kafka.bootstrap.servers", "node1:9092")
.option("topic", "etlTopic")
// 设置检查点目录
.option("checkpointLocation", "./ckp" System.currentTimeMillis())
.start()
query.awaitTermination()
query.stop()
}
}