上篇文章我们介绍了离线数仓的做题模块,本文我们来看下在线教育项目的实时部分。
本文代码可在开源项目https://github.com/SoundHearer/kuaiban中找到。
实时部分的架构图如下
原始数据格式及对应的topic
实时统计注册人数
topic:register_topic
数据格式
用户ID | 平台ID 1:PC 2:APP 3:Others | 创建时间 |
---|---|---|
85571 | 1 | 2019-07-16 16:01:55 |
做题正确率与知识掌握度数据格式
topic:qz_log
用户ID | 课程ID | 知识点ID | 题目ID | 是否正确 0 错误 1 正确 | 创建时间 |
---|---|---|---|---|---|
1005 | 505 | 29 | 1 | 1 | 2019-09-12 11:17:48 |
商品页面到订单页,订单页到支付页数据格式
{"app_id":"1","device_id":"102","distinct_id":"5fa401c8-dd45-4425-b8c6-700f9f74c532","event_name":"-","ip":"121.76.152.135","last_event_name":"-","last_page_id":"0","next_event_name":"-","next_page_id":"2","page_id":"1","server_time":"-","uid":"245494"}
topic: page_topic
uid:用户id
app_id:平台id
deviceid:平台id
disinct_id:唯一标识
Ip:用户ip地址
last_page_id :上一页面id
page_id:当前页面id 0:首页 1:商品课程页 2:订单页面 3:支付页面
next_page_id:下一页面id
实时统计学员播放视频各时长
topic: course_learn
uid:用户id
app_id:平台id
deviceid:平台id
disinct_id:唯一标识
Ip:用户ip地址
last_page_id :上一页面id
page_id:当前页面id 0:首页 1:商品课程页 2:订单页面 3:支付页面
next_page_id:下一页面id
实时统计学员播放视频各时长
topic: course_learn
{"biz":"bdfb58e5-d14c-45d2-91bc-1d9409800ac3","chapterid":"1","cwareid":"3","edutypeid":"3","pe":"55","ps":"41","sourceType":"APP","speed":"2","subjectid":"2","te":"1563352166417","ts":"1563352159417","uid":"235","videoid":"2"}
biz:唯一标识
chapterid:章节id
cwareid:课件id
edutypeid:辅导id
ps:视频播放时间区间
pe:视频播放结束区间
sourceType:播放平台
speed:播放倍速
ts:视频播放开始时间(时间戳)
te:视频播放结束时间(时间戳)
videoid:视频id
新建Topic
代码语言:javascript复制kafka-topics --zookeeper cdh1.macro.com:2181,cdh2.macro.com:2181,cdh3.macro.com:2181/kafka --create --topic register_topic --partitions 10 --replication-factor 2
kafka-topics --zookeeper cdh1.macro.com:2181,cdh2.macro.com:2181,cdh3.macro.com:2181/kafka --create --topic page_topic --partitions 10 --replication-factor 2
kafka-topics --zookeeper cdh1.macro.com:2181,cdh2.macro.com:2181,cdh3.macro.com:2181/kafka --create --topic course_learn --partitions 10 --replication-factor 2
kafka-topics --zookeeper cdh1.macro.com:2181,cdh2.macro.com:2181,cdh3.macro.com:2181/kafka --create --topic qz_log --partitions 10 --replication-factor 2
模拟数据采集
将log文件通过kafka生产者发送到topic中去,log源文件可以在开源项目https://github.com/SoundHearer/kuaiban中找到
以course_learn.log为例,我们将log上传到了hdfs中,生产者代码为下:
代码语言:javascript复制import java.util.Properties
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.spark.{SparkConf, SparkContext}
object CourseLearnProducer {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName("courseProducer").setMaster("local[*]")
val ssc = new SparkContext(sparkConf)
// System.setProperty("hadoop.home.dir", "D:\hadoop\hadoop-common-2.2.0-bin-master")
val resultLog = ssc.textFile("hdfs://cdh1.macro.com:8020/user/catelf/data/course_learn.log", 10)
.foreachPartition(partitoin => {
val props = new Properties()
props.put("bootstrap.servers", "cdh1.macro.com:9092,cdh2.macro.com:9092,cdh3.macro.com:9092")
props.put("acks", "1")
props.put("batch.size", "16384")
props.put("linger.ms", "10")
props.put("buffer.memory", "33554432")
props.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer")
props.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer")
val producer = new KafkaProducer[String, String](props)
partitoin.foreach(item => {
val msg = new ProducerRecord[String, String]("course_learn", item)
producer.send(msg)
})
producer.flush()
producer.close()
})
}
}
实现
实时统计注册人员信息
用户使用网站或APP进行注册,后台实时收集数据传输Kafka,Spark Streaming进行对接统计,实时统计注册人数。
需求1:实时统计注册人数,批次为3秒一批,使用updateStateBykey算子计算历史数据和当前批次的数据总数,仅此需求使用updateStateBykey,后续需求不使用updateStateBykey。
需求2:每6秒统统计一次1分钟内的注册数据,不需要历史数据 提示:reduceByKeyAndWindow算子
代码语言:javascript复制import java.lang
import java.sql.ResultSet
import java.util.Random
import com.catelf.qzpoint.util.{DataSourceUtil, QueryCallback, SqlProxy}
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.{Seconds, StreamingContext}
import scala.collection.mutable
object RegisterStreaming {
private val groupid = "register_group_test"
def main(args: Array[String]): Unit = {
System.setProperty("HADOOP_USER_NAME", "hdfs")
val conf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[2]")
.set("spark.streaming.kafka.maxRatePerPartition", "50")
.set("spark.streaming.stopGracefullyOnShutdown", "true")
val ssc = new StreamingContext(conf, Seconds(3))
val topics = Array("register_topic")
val kafkaMap: Map[String, Object] = Map[String, Object](
"bootstrap.servers" -> "cdh1.macro.com:9092,cdh2.macro.com:9092,cdh3.macro.com:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> groupid,
"auto.offset.reset" -> "earliest",
"enable.auto.commit" -> (false: lang.Boolean)
)
ssc.checkpoint("hdfs://cdh1.macro.com:8020/user/catelf/sparkstreaming/checkpoint")
//查询mysql中是否有偏移量
val sqlProxy = new SqlProxy()
val offsetMap = new mutable.HashMap[TopicPartition, Long]()
val client = DataSourceUtil.getConnection
try {
sqlProxy.executeQuery(client, "select * from `offset_manager` where groupid=?", Array(groupid), new QueryCallback {
override def process(rs: ResultSet): Unit = {
while (rs.next()) {
val model = new TopicPartition(rs.getString(2), rs.getInt(3))
val offset = rs.getLong(4)
offsetMap.put(model, offset)
}
rs.close() //关闭游标
}
})
} catch {
case e: Exception => e.printStackTrace()
} finally {
sqlProxy.shutdown(client)
}
//设置kafka消费数据的参数 判断本地是否有偏移量 有则根据偏移量继续消费 无则重新消费
val stream: InputDStream[ConsumerRecord[String, String]] = if (offsetMap.isEmpty) {
KafkaUtils.createDirectStream(
ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](topics, kafkaMap))
} else {
KafkaUtils.createDirectStream(
ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](topics, kafkaMap, offsetMap))
}
val resultDStream = stream.filter(item => item.value().split("t").length == 3).
mapPartitions(partitions => {
partitions.map(item => {
val line = item.value()
val arr = line.split("t")
val app_name = arr(1) match {
case "1" => "PC"
case "2" => "APP"
case _ => "Other"
}
(app_name, 1)
})
})
resultDStream.cache()
// resultDStream.reduceByKeyAndWindow((x: Int, y: Int) => x y, Seconds(60), Seconds(6)).print()
val updateFunc = (values: Seq[Int], state: Option[Int]) => {
val currentCount = values.sum //本批次求和
val previousCount = state.getOrElse(0) //历史数据
Some(currentCount previousCount)
}
resultDStream.updateStateByKey(updateFunc).print()
//处理完 业务逻辑后 手动提交offset维护到本地 mysql中
stream.foreachRDD(rdd => {
val sqlProxy = new SqlProxy()
val client = DataSourceUtil.getConnection
try {
val offsetRanges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
for (or <- offsetRanges) {
sqlProxy.executeUpdate(client, "replace into `offset_manager` (groupid,topic,`partition`,untilOffset) values(?,?,?,?)",
Array(groupid, or.topic, or.partition.toString, or.untilOffset))
}
} catch {
case e: Exception => e.printStackTrace()
} finally {
sqlProxy.shutdown(client)
}
})
ssc.start()
ssc.awaitTermination()
}
}
实时计算学员做题算正确率与知识点掌握度
用户在网站或APP进行做题,做完题点击交卷按钮,程序将做题记录提交,传输到Kafka中,下游Spark Streaming对接kafka实现实时计算做题正确率和掌握度,将正确率和掌握度存入mysql中,用户点击交卷后刷新页面能立马看到自己做题的详情。
需求1:要求Spark Streaming 保证数据不丢失,每秒100条处理速度,需要手动维护偏移量
需求2:同一个用户做在同一门课程同一知识点下做题需要去重,需要根据历史数据进行去重并且记录去重后的做题id与个数。
需求3:计算知识点正确率 正确率计算公式:做题正确总个数/做题总数 保留两位小数
需求4:计算知识点掌握度 去重后的做题个数/当前知识点总题数(已知30题)*当前知识点的正确率
代码语言:javascript复制import java.lang
import java.sql.{Connection, ResultSet}
import java.time.LocalDateTime
import java.time.format.DateTimeFormatter
import com.catelf.qzpoint.util.{DataSourceUtil, QueryCallback, SqlProxy}
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.{Seconds, StreamingContext}
import scala.collection.mutable
/**
* 知识点掌握度实时统计
*/
object QzPointStreaming {
private val groupid = "qz_point_group"
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[*]")
.set("spark.streaming.kafka.maxRatePerPartition", "50")
.set("spark.streaming.stopGracefullyOnShutdown", "true")
val ssc = new StreamingContext(conf, Seconds(3))
val topics = Array("qz_log")
val kafkaMap: Map[String, Object] = Map[String, Object](
"bootstrap.servers" -> "cdh1.macro.com:9092,cdh2.macro.com:9092,cdh3.macro.com:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> groupid,
"auto.offset.reset" -> "earliest",
"enable.auto.commit" -> (false: lang.Boolean)
)
//查询mysql中是否存在偏移量
val sqlProxy = new SqlProxy()
val offsetMap = new mutable.HashMap[TopicPartition, Long]()
val client = DataSourceUtil.getConnection
try {
sqlProxy.executeQuery(client, "select * from `offset_manager` where groupid=?", Array(groupid), new QueryCallback {
override def process(rs: ResultSet): Unit = {
while (rs.next()) {
val model = new TopicPartition(rs.getString(2), rs.getInt(3))
val offset = rs.getLong(4)
offsetMap.put(model, offset)
}
rs.close() //关闭游标
}
})
} catch {
case e: Exception => e.printStackTrace()
} finally {
sqlProxy.shutdown(client)
}
//设置kafka消费数据的参数 判断本地是否有偏移量 有则根据偏移量继续消费 无则重新消费
val stream: InputDStream[ConsumerRecord[String, String]] = if (offsetMap.isEmpty) {
KafkaUtils.createDirectStream(
ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](topics, kafkaMap))
} else {
KafkaUtils.createDirectStream(
ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](topics, kafkaMap, offsetMap))
}
//过滤不正常数据 获取数据
val dsStream = stream.filter(item => item.value().split("t").length == 6).
mapPartitions(partition => partition.map(item => {
val line = item.value()
val arr = line.split("t")
val uid = arr(0) //用户id
val courseid = arr(1) //课程id
val pointid = arr(2) //知识点id
val questionid = arr(3) //题目id
val istrue = arr(4) //是否正确
val createtime = arr(5) //创建时间
(uid, courseid, pointid, questionid, istrue, createtime)
}))
dsStream.foreachRDD(rdd => {
//获取相同用户 同一课程 同一知识点的数据
val groupRdd = rdd.groupBy(item => item._1 "-" item._2 "-" item._3)
groupRdd.foreachPartition(partition => {
//在分区下获取jdbc连接
val sqlProxy = new SqlProxy()
val client = DataSourceUtil.getConnection
try {
partition.foreach { case (key, iters) =>
qzQuestionUpdate(key, iters, sqlProxy, client) //对题库进行更新操作
}
} catch {
case e: Exception => e.printStackTrace()
}
finally {
sqlProxy.shutdown(client)
}
}
)
})
//处理完 业务逻辑后 手动提交offset维护到本地 mysql中
stream.foreachRDD(rdd => {
val sqlProxy = new SqlProxy()
val client = DataSourceUtil.getConnection
try {
val offsetRanges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
for (or <- offsetRanges) {
sqlProxy.executeUpdate(client, "replace into `offset_manager` (groupid,topic,`partition`,untilOffset) values(?,?,?,?)",
Array(groupid, or.topic, or.partition.toString, or.untilOffset))
}
} catch {
case e: Exception => e.printStackTrace()
} finally {
sqlProxy.shutdown(client)
}
})
ssc.start()
ssc.awaitTermination()
}
/**
* 对题目表进行更新操作
*
* @param key
* @param iters
* @param sqlProxy
* @param client
* @return
*/
def qzQuestionUpdate(key: String, iters: Iterable[(String, String, String, String, String, String)], sqlProxy: SqlProxy, client: Connection) = {
val keys = key.split("-")
val userid = keys(0).toInt
val courseid = keys(1).toInt
val pointid = keys(2).toInt
val array = iters.toArray
val questionids = array.map(_._4).distinct //对当前批次的数据下questionid 去重
//查询历史数据下的 questionid
var questionids_history: Array[String] = Array()
sqlProxy.executeQuery(client, "select questionids from qz_point_history where userid=? and courseid=? and pointid=?",
Array(userid, courseid, pointid), new QueryCallback {
override def process(rs: ResultSet): Unit = {
while (rs.next()) {
questionids_history = rs.getString(1).split(",")
}
rs.close() //关闭游标
}
})
//获取到历史数据后再与当前数据进行拼接 去重
val resultQuestionid = questionids.union(questionids_history).distinct
val countSize = resultQuestionid.length
val resultQuestionid_str = resultQuestionid.mkString(",")
val qz_count = questionids.length //去重后的题个数
var qz_sum = array.length //获取当前批次题总数
var qz_istrue = array.filter(_._5.equals("1")).size //获取当前批次做正确的题个数
val createtime = array.map(_._6).min //获取最早的创建时间 作为表中创建时间
//更新qz_point_set 记录表 此表用于存当前用户做过的questionid表
val updatetime = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss").format(LocalDateTime.now())
sqlProxy.executeUpdate(client, "insert into qz_point_history(userid,courseid,pointid,questionids,createtime,updatetime) values(?,?,?,?,?,?) "
" on duplicate key update questionids=?,updatetime=?", Array(userid, courseid, pointid, resultQuestionid_str, createtime, createtime, resultQuestionid_str, updatetime))
var qzSum_history = 0
var istrue_history = 0
sqlProxy.executeQuery(client, "select qz_sum,qz_istrue from qz_point_detail where userid=? and courseid=? and pointid=?",
Array(userid, courseid, pointid), new QueryCallback {
override def process(rs: ResultSet): Unit = {
while (rs.next()) {
qzSum_history = rs.getInt(1)
istrue_history = rs.getInt(2)
}
rs.close()
}
})
qz_sum = qzSum_history
qz_istrue = istrue_history
val correct_rate = qz_istrue.toDouble / qz_sum.toDouble //计算正确率
//计算完成率
//假设每个知识点下一共有30道题 先计算题的做题情况 再计知识点掌握度
val qz_detail_rate = countSize.toDouble / 30 //算出做题情况乘以 正确率 得出完成率 假如30道题都做了那么正确率等于 知识点掌握度
val mastery_rate = qz_detail_rate * correct_rate
sqlProxy.executeUpdate(client, "insert into qz_point_detail(userid,courseid,pointid,qz_sum,qz_count,qz_istrue,correct_rate,mastery_rate,createtime,updatetime)"
" values(?,?,?,?,?,?,?,?,?,?) on duplicate key update qz_sum=?,qz_count=?,qz_istrue=?,correct_rate=?,mastery_rate=?,updatetime=?",
Array(userid, courseid, pointid, qz_sum, countSize, qz_istrue, correct_rate, mastery_rate, createtime, updatetime, qz_sum, countSize, qz_istrue, correct_rate, mastery_rate, updatetime))
}
}
实时统计商品页到订单页,订单页到支付页转换率
用户浏览课程首页点击下订单,跳转到订单页面,再点击支付跳转到支付页面进行支付,收集各页面跳转json数据,解析json数据计算各页面点击数和转换率,计算top3点击量按地区排名(ip字段,需要根据历史数据累计)
需求1:计算首页总浏览数、订单页总浏览数、支付页面总浏览数
需求2:计算商品课程页面到订单页的跳转转换率、订单页面到支付页面的跳转转换率
需求3:根据ip得出相应省份,展示出top3省份的点击数,需要根据历史数据累加
代码语言:javascript复制import java.lang
import java.sql.{Connection, ResultSet}
import java.text.NumberFormat
import com.catelf.qzpoint.util.{DataSourceUtil, ParseJsonData, QueryCallback, SqlProxy}
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkFiles}
import org.lionsoul.ip2region.{DbConfig, DbSearcher}
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
/**
* 页面转换率实时统计
*/
object PageStreaming {
private val groupid = "vip_count_groupid"
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[*]")
.set("spark.streaming.kafka.maxRatePerPartition", "30")
.set("spark.streaming.stopGracefullyOnShutdown", "true")
val ssc = new StreamingContext(conf, Seconds(3))
val topics = Array("page_topic")
val kafkaMap: Map[String, Object] = Map[String, Object](
"bootstrap.servers" -> "cdh1.macro.com:9092,cdh2.macro.com:9092,cdh3.macro.com:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> groupid,
"auto.offset.reset" -> "earliest",
"enable.auto.commit" -> (false: lang.Boolean)
)
//查询mysql中是否存在偏移量
val sqlProxy = new SqlProxy()
val offsetMap = new mutable.HashMap[TopicPartition, Long]()
val client = DataSourceUtil.getConnection
try {
sqlProxy.executeQuery(client, "select *from `offset_manager` where groupid=?", Array(groupid), new QueryCallback {
override def process(rs: ResultSet): Unit = {
while (rs.next()) {
val model = new TopicPartition(rs.getString(2), rs.getInt(3))
val offset = rs.getLong(4)
offsetMap.put(model, offset)
}
rs.close()
}
})
} catch {
case e: Exception => e.printStackTrace()
} finally {
sqlProxy.shutdown(client)
}
//设置kafka消费数据的参数 判断本地是否有偏移量 有则根据偏移量继续消费 无则重新消费
val stream: InputDStream[ConsumerRecord[String, String]] = if (offsetMap.isEmpty) {
KafkaUtils.createDirectStream(
ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](topics, kafkaMap))
} else {
KafkaUtils.createDirectStream(
ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](topics, kafkaMap, offsetMap))
}
//解析json数据
val dsStream = stream.map(item => item.value()).mapPartitions(partition => {
partition.map(item => {
val jsonObject = ParseJsonData.getJsonData(item)
val uid = if (jsonObject.containsKey("uid")) jsonObject.getString("uid") else ""
val app_id = if (jsonObject.containsKey("app_id")) jsonObject.getString("app_id") else ""
val device_id = if (jsonObject.containsKey("device_id")) jsonObject.getString("device_id") else ""
val ip = if (jsonObject.containsKey("ip")) jsonObject.getString("ip") else ""
val last_page_id = if (jsonObject.containsKey("last_page_id")) jsonObject.getString("last_page_id") else ""
val pageid = if (jsonObject.containsKey("page_id")) jsonObject.getString("page_id") else ""
val next_page_id = if (jsonObject.containsKey("next_page_id")) jsonObject.getString("next_page_id") else ""
(uid, app_id, device_id, ip, last_page_id, pageid, next_page_id)
})
}).filter(item => {
!item._5.equals("") && !item._6.equals("") && !item._7.equals("")
})
dsStream.cache()
val pageValueDStream = dsStream.map(item => (item._5 "_" item._6 "_" item._7, 1))
val resultDStream = pageValueDStream.reduceByKey(_ _)
resultDStream.foreachRDD(rdd => {
rdd.foreachPartition(partition => {
//在分区下获取jdbc连接
val sqlProxy = new SqlProxy()
val client = DataSourceUtil.getConnection
try {
partition.foreach(item => {
calcPageJumpCount(sqlProxy, item, client) //计算页面跳转个数
})
} catch {
case e: Exception => e.printStackTrace()
} finally {
sqlProxy.shutdown(client)
}
})
})
ssc.sparkContext.addFile("hdfs://cdh1.macro.com:8020/user/catelf/data/ip2region.db") //广播文件
val ipDStream = dsStream.mapPartitions(patitions => {
val dbFile = SparkFiles.get("ip2region.db")
val ipsearch = new DbSearcher(new DbConfig(), dbFile)
patitions.map { item =>
val ip = item._4
val province = ipsearch.memorySearch(ip).getRegion().split("\|")(2) //获取ip详情 中国|0|上海|上海市|有线通
(province, 1l) //根据省份 统计点击个数
}
}).reduceByKey(_ _)
ipDStream.foreachRDD(rdd => {
//查询mysql历史数据 转成rdd
val ipSqlProxy = new SqlProxy()
val ipClient = DataSourceUtil.getConnection
try {
val history_data = new ArrayBuffer[(String, Long)]()
ipSqlProxy.executeQuery(ipClient, "select province,num from tmp_city_num_detail", null, new QueryCallback {
override def process(rs: ResultSet): Unit = {
while (rs.next()) {
val tuple = (rs.getString(1), rs.getLong(2))
history_data = tuple
}
}
})
val history_rdd = ssc.sparkContext.makeRDD(history_data)
val resultRdd = history_rdd.fullOuterJoin(rdd).map(item => {
val province = item._1
val nums = item._2._1.getOrElse(0l) item._2._2.getOrElse(0l)
(province, nums)
})
resultRdd.foreachPartition(partitions => {
val sqlProxy = new SqlProxy()
val client = DataSourceUtil.getConnection
try {
partitions.foreach(item => {
val province = item._1
val num = item._2
//修改mysql数据 并重组返回最新结果数据
sqlProxy.executeUpdate(client, "insert into tmp_city_num_detail(province,num)values(?,?) on duplicate key update num=?",
Array(province, num, num))
})
} catch {
case e: Exception => e.printStackTrace()
} finally {
sqlProxy.shutdown(client)
}
})
val top3Rdd = resultRdd.sortBy[Long](_._2, false).take(3)
sqlProxy.executeUpdate(ipClient, "truncate table top_city_num", null)
top3Rdd.foreach(item => {
sqlProxy.executeUpdate(ipClient, "insert into top_city_num (province,num) values(?,?)", Array(item._1, item._2))
})
} catch {
case e: Exception => e.printStackTrace()
} finally {
sqlProxy.shutdown(ipClient)
}
})
//计算转换率
//处理完 业务逻辑后 手动提交offset维护到本地 mysql中
stream.foreachRDD(rdd => {
val sqlProxy = new SqlProxy()
val client = DataSourceUtil.getConnection
try {
calcJumRate(sqlProxy, client) //计算转换率
val offsetRanges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
for (or <- offsetRanges) {
sqlProxy.executeUpdate(client, "replace into `offset_manager` (groupid,topic,`partition`,untilOffset) values(?,?,?,?)",
Array(groupid, or.topic, or.partition.toString, or.untilOffset))
}
} catch {
case e: Exception => e.printStackTrace()
} finally {
sqlProxy.shutdown(client)
}
})
ssc.start()
ssc.awaitTermination()
}
/**
* 计算页面跳转个数
*
* @param sqlProxy
* @param item
* @param client
*/
def calcPageJumpCount(sqlProxy: SqlProxy, item: (String, Int), client: Connection): Unit = {
val keys = item._1.split("_")
var num: Long = item._2
val page_id = keys(1).toInt //获取当前page_id
val last_page_id = keys(0).toInt //获取上一page_id
val next_page_id = keys(2).toInt //获取下页面page_id
//查询当前page_id的历史num个数
sqlProxy.executeQuery(client, "select num from page_jump_rate where page_id=?", Array(page_id), new QueryCallback {
override def process(rs: ResultSet): Unit = {
while (rs.next()) {
num = rs.getLong(1)
}
rs.close()
}
//对num 进行修改 并且判断当前page_id是否为首页
if (page_id == 1) {
sqlProxy.executeUpdate(client, "insert into page_jump_rate(last_page_id,page_id,next_page_id,num,jump_rate)"
"values(?,?,?,?,?) on duplicate key update num=num ?", Array(last_page_id, page_id, next_page_id, num, "100%", num))
} else {
sqlProxy.executeUpdate(client, "insert into page_jump_rate(last_page_id,page_id,next_page_id,num)"
"values(?,?,?,?) on duplicate key update num=num ?", Array(last_page_id, page_id, next_page_id, num, num))
}
})
}
/**
* 计算转换率
*/
def calcJumRate(sqlProxy: SqlProxy, client: Connection): Unit = {
var page1_num = 0l
var page2_num = 0l
var page3_num = 0l
sqlProxy.executeQuery(client, "select num from page_jump_rate where page_id=?", Array(1), new QueryCallback {
override def process(rs: ResultSet): Unit = {
while (rs.next()) {
page1_num = rs.getLong(1)
}
}
})
sqlProxy.executeQuery(client, "select num from page_jump_rate where page_id=?", Array(2), new QueryCallback {
override def process(rs: ResultSet): Unit = {
while (rs.next()) {
page2_num = rs.getLong(1)
}
}
})
sqlProxy.executeQuery(client, "select num from page_jump_rate where page_id=?", Array(3), new QueryCallback {
override def process(rs: ResultSet): Unit = {
while (rs.next()) {
page3_num = rs.getLong(1)
}
}
})
val nf = NumberFormat.getPercentInstance
val page1ToPage2Rate = if (page1_num == 0) "0%" else nf.format(page2_num.toDouble / page1_num.toDouble)
val page2ToPage3Rate = if (page2_num == 0) "0%" else nf.format(page3_num.toDouble / page2_num.toDouble)
sqlProxy.executeUpdate(client, "update page_jump_rate set jump_rate=? where page_id=?", Array(page1ToPage2Rate, 2))
sqlProxy.executeUpdate(client, "update page_jump_rate set jump_rate=? where page_id=?", Array(page2ToPage3Rate, 3))
}
}
实时统计学员播放视频各时长
用户在线播放视频进行学习课程,后台记录视频播放开始区间和结束区间,及播放开始时间和播放结束时间,后台手机数据传输kafka需要计算用户播放视频总时长、有效时长、完成时长,及各维度总播放时长。
需求1:计算各章节下的播放总时长(按chapterid聚合统计播放总时长)
需求2:计算各课件下的播放总时长(按cwareid聚合统计播放总时长)
需求3:计算各辅导下的播放总时长(按edutypeid聚合统计播放总时长)
需求4:计算各播放平台下的播放总时长(按sourcetype聚合统计播放总时长)
需求5:计算各科目下的播放总时长(按subjectid聚合统计播放总时长)
需求6:计算用户学习视频的播放总时长、有效时长、完成时长,需求记录视频播历史区间,对于用户多次学习的播放区间不累计有效时长和完成时长。
播放总时长计算:(te-ts)/1000 向下取整 单位:秒
完成时长计算: 根据pe-ps 计算 需要对历史数据进行去重处理
有效时长计算:根据te-ts 除以pe-ts 先计算出播放每一区间需要的实际时长 * 完成时长
代码语言:javascript复制import java.lang
import java.sql.{Connection, ResultSet}
import com.catelf.qzpoint.bean.LearnModel
import com.catelf.qzpoint.util.{DataSourceUtil, ParseJsonData, QueryCallback, SqlProxy}
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.{Seconds, StreamingContext}
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
object CourseLearnStreaming {
private val groupid = "course_learn_test2"
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[*]")
.set("spark.streaming.kafka.maxRatePerPartition", "30")
.set("spark.streaming.stopGracefullyOnShutdown", "true")
val ssc = new StreamingContext(conf, Seconds(3))
val topics = Array("course_learn")
val kafkaMap: Map[String, Object] = Map[String, Object](
"bootstrap.servers" -> "cdh1.macro.com:9092,cdh2.macro.com:9092,cdh3.macro.com:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> groupid,
"auto.offset.reset" -> "earliest",
"enable.auto.commit" -> (false: lang.Boolean)
)
//查询mysql是否存在偏移量
val sqlProxy = new SqlProxy()
val offsetMap = new mutable.HashMap[TopicPartition, Long]()
val client = DataSourceUtil.getConnection
try {
sqlProxy.executeQuery(client, "select *from `offset_manager` where groupid=?", Array(groupid), new QueryCallback {
override def process(rs: ResultSet): Unit = {
while (rs.next()) {
val model = new TopicPartition(rs.getString(2), rs.getInt(3))
val offset = rs.getLong(4)
offsetMap.put(model, offset)
}
rs.close()
}
})
} catch {
case e: Exception => e.printStackTrace()
} finally {
sqlProxy.shutdown(client)
}
//设置kafka消费数据的参数 判断本地是否有偏移量 有则根据偏移量继续消费 无则重新消费
val stream: InputDStream[ConsumerRecord[String, String]] = if (offsetMap.isEmpty) {
KafkaUtils.createDirectStream(
ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](topics, kafkaMap))
} else {
KafkaUtils.createDirectStream(
ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](topics, kafkaMap, offsetMap))
}
//解析json数据
val dsStream = stream.mapPartitions(partitions => {
partitions.map(item => {
val json = item.value()
val jsonObject = ParseJsonData.getJsonData(json)
val userId = jsonObject.getIntValue("uid")
val cwareId = jsonObject.getIntValue("cwareid")
val videoId = jsonObject.getIntValue("videoid")
val chapterId = jsonObject.getIntValue("chapterid")
val edutypeId = jsonObject.getIntValue("edutypeid")
val subjectId = jsonObject.getIntValue("subjectid")
val sourceType = jsonObject.getString("sourceType")
val speed = jsonObject.getIntValue("speed")
val ts = jsonObject.getLong("ts")
val te = jsonObject.getLong("te")
val ps = jsonObject.getIntValue("ps")
val pe = jsonObject.getIntValue("pe")
LearnModel(userId, cwareId, videoId, chapterId, edutypeId, subjectId, sourceType, speed, ts, te, ps, pe)
})
})
dsStream.foreachRDD(rdd => {
rdd.cache()
//统计播放视频 有效时长 完成时长 总时长
rdd.groupBy(item => item.userId "_" item.cwareId "_" item.videoId).foreachPartition(partitoins => {
val sqlProxy = new SqlProxy()
val client = DataSourceUtil.getConnection
try {
partitoins.foreach { case (key, iters) =>
calcVideoTime(key, iters, sqlProxy, client) //计算视频时长
}
} catch {
case e: Exception => e.printStackTrace()
} finally {
sqlProxy.shutdown(client)
}
})
//统计章节下视频播放总时长
rdd.mapPartitions(partitions => {
partitions.map(item => {
val totaltime = Math.ceil((item.te - item.ts) / 1000).toLong
val key = item.chapterId
(key, totaltime)
})
}).reduceByKey(_ _)
.foreachPartition(partitoins => {
val sqlProxy = new SqlProxy()
val client = DataSourceUtil.getConnection
try {
partitoins.foreach(item => {
sqlProxy.executeUpdate(client, "insert into chapter_learn_detail(chapterid,totaltime) values(?,?) on duplicate key"
" update totaltime=totaltime ?", Array(item._1, item._2, item._2))
})
} catch {
case e: Exception => e.printStackTrace()
} finally {
sqlProxy.shutdown(client)
}
})
//统计课件下的总播放时长
rdd.mapPartitions(partitions => {
partitions.map(item => {
val totaltime = Math.ceil((item.te - item.ts) / 1000).toLong
val key = item.cwareId
(key, totaltime)
})
}).reduceByKey(_ _).foreachPartition(partitions => {
val sqlProxy = new SqlProxy()
val client = DataSourceUtil.getConnection
try {
partitions.foreach(item => {
sqlProxy.executeUpdate(client, "insert into cwareid_learn_detail(cwareid,totaltime) values(?,?) on duplicate key "
"update totaltime=totaltime ?", Array(item._1, item._2, item._2))
})
} catch {
case e: Exception => e.printStackTrace()
} finally {
sqlProxy.shutdown(client)
}
})
//统计辅导下的总播放时长
rdd.mapPartitions(partitions => {
partitions.map(item => {
val totaltime = Math.ceil((item.te - item.ts) / 1000).toLong
val key = item.edutypeId
(key, totaltime)
})
}).reduceByKey(_ _).foreachPartition(partitions => {
val sqlProxy = new SqlProxy()
val client = DataSourceUtil.getConnection
try {
partitions.foreach(item => {
sqlProxy.executeUpdate(client, "insert into edutype_learn_detail(edutypeid,totaltime) values(?,?) on duplicate key "
"update totaltime=totaltime ?", Array(item._1, item._2, item._2))
})
} catch {
case e: Exception => e.printStackTrace()
} finally {
sqlProxy.shutdown(client)
}
})
//统计同一资源平台下的总播放时长
rdd.mapPartitions(partitions => {
partitions.map(item => {
val totaltime = Math.ceil((item.te - item.ts) / 1000).toLong
val key = item.sourceType
(key, totaltime)
})
}).reduceByKey(_ _).foreachPartition(partitions => {
val sqlProxy = new SqlProxy()
val client = DataSourceUtil.getConnection
try {
partitions.foreach(item => {
sqlProxy.executeUpdate(client, "insert into sourcetype_learn_detail (sourcetype,totaltime) values(?,?) on duplicate key "
"update totaltime=totaltime ?", Array(item._1, item._2, item._2))
})
} catch {
case e: Exception => e.printStackTrace()
} finally {
sqlProxy.shutdown(client)
}
})
// 统计同一科目下的播放总时长
rdd.mapPartitions(partitions => {
partitions.map(item => {
val totaltime = Math.ceil((item.te - item.ts) / 1000).toLong
val key = item.subjectId
(key, totaltime)
})
}).reduceByKey(_ _).foreachPartition(partitons => {
val sqlProxy = new SqlProxy()
val clinet = DataSourceUtil.getConnection
try {
partitons.foreach(item => {
sqlProxy.executeUpdate(clinet, "insert into subject_learn_detail(subjectid,totaltime) values(?,?) on duplicate key "
"update totaltime=totaltime ?", Array(item._1, item._2, item._2))
})
} catch {
case e: Exception => e.printStackTrace()
} finally {
sqlProxy.shutdown(clinet)
}
})
})
//计算转换率
//处理完 业务逻辑后 手动提交offset维护到本地 mysql中
stream.foreachRDD(rdd => {
val sqlProxy = new SqlProxy()
val client = DataSourceUtil.getConnection
try {
val offsetRanges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
for (or <- offsetRanges) {
sqlProxy.executeUpdate(client, "replace into `offset_manager` (groupid,topic,`partition`,untilOffset) values(?,?,?,?)",
Array(groupid, or.topic, or.partition.toString, or.untilOffset))
}
} catch {
case e: Exception => e.printStackTrace()
} finally {
sqlProxy.shutdown(client)
}
})
ssc.start()
ssc.awaitTermination()
}
/**
* 计算视频 有效时长 完成时长 总时长
*
* @param key
* @param iters
* @param sqlProxy
* @param client
*/
def calcVideoTime(key: String, iters: Iterable[LearnModel], sqlProxy: SqlProxy, client: Connection) = {
val keys = key.split("_")
val userId = keys(0).toInt
val cwareId = keys(1).toInt
val videoId = keys(2).toInt
//查询历史数据
var interval_history = ""
sqlProxy.executeQuery(client, "select play_interval from video_interval where userid=? and cwareid=? and videoid=?",
Array(userId, cwareId, videoId), new QueryCallback {
override def process(rs: ResultSet): Unit = {
while (rs.next()) {
interval_history = rs.getString(1)
}
rs.close()
}
})
var effective_duration_sum = 0l //有效总时长
var complete_duration_sum = 0l //完成总时长
var cumulative_duration_sum = 0l //播放总时长
val learnList = iters.toList.sortBy(item => item.ps) //转成list 并根据开始区间升序排序
learnList.foreach(item => {
if ("".equals(interval_history)) {
//没有历史区间
val play_interval = item.ps "-" item.pe //有效区间
val effective_duration = Math.ceil((item.te - item.ts) / 1000) //有效时长
val complete_duration = item.pe - item.ps //完成时长
effective_duration_sum = effective_duration.toLong
cumulative_duration_sum = effective_duration.toLong
complete_duration_sum = complete_duration
interval_history = play_interval
} else {
//有历史区间进行对比
val interval_arry = interval_history.split(",").sortBy(a => (a.split("-")(0).toInt, a.split("-")(1).toInt))
val tuple = getEffectiveInterval(interval_arry, item.ps, item.pe)
val complete_duration = tuple._1 //获取实际有效完成时长
val effective_duration = Math.ceil((item.te - item.ts) / 1000) / (item.pe - item.ps) * complete_duration //计算有效时长
val cumulative_duration = Math.ceil((item.te - item.ts) / 1000) //累计时长
interval_history = tuple._2
effective_duration_sum = effective_duration.toLong
complete_duration_sum = complete_duration
cumulative_duration_sum = cumulative_duration.toLong
}
sqlProxy.executeUpdate(client, "insert into video_interval(userid,cwareid,videoid,play_interval) values(?,?,?,?) "
"on duplicate key update play_interval=?", Array(userId, cwareId, videoId, interval_history, interval_history))
sqlProxy.executeUpdate(client, "insert into video_learn_detail(userid,cwareid,videoid,totaltime,effecttime,completetime) "
"values(?,?,?,?,?,?) on duplicate key update totaltime=totaltime ?,effecttime=effecttime ?,completetime=completetime ?",
Array(userId, cwareId, videoId, cumulative_duration_sum, effective_duration_sum, complete_duration_sum, cumulative_duration_sum,
effective_duration_sum, complete_duration_sum))
})
}
/**
* 计算有效区间
*
* @param array
* @param start
* @param end
* @return
*/
def getEffectiveInterval(array: Array[String], start: Int, end: Int) = {
var effective_duration = end - start
var bl = false //是否对有效时间进行修改
import scala.util.control.Breaks._
breakable {
for (i <- 0 until array.length) {
//循环各区间段
var historyStart = 0 //获取其中一段的开始播放区间
var historyEnd = 0 //获取其中一段结束播放区间
val item = array(i)
try {
historyStart = item.split("-")(0).toInt
historyEnd = item.split("-")(1).toInt
} catch {
case e: Exception => throw new Exception("error array:" array.mkString(","))
}
if (start >= historyStart && historyEnd >= end) {
//已有数据占用全部播放时长 此次播放无效
effective_duration = 0
bl = true
break()
} else if (start <= historyStart && end > historyStart && end < historyEnd) {
//和已有数据左侧存在交集 扣除部分有效时间(以老数据为主进行对照)
effective_duration -= end - historyStart
array(i) = start "-" historyEnd
bl = true
} else if (start > historyStart && start < historyEnd && end >= historyEnd) {
//和已有数据右侧存在交集 扣除部分有效时间
effective_duration -= historyEnd - start
array(i) = historyStart "-" end
bl = true
} else if (start < historyStart && end > historyEnd) {
//现数据 大于旧数据 扣除旧数据所有有效时间
effective_duration -= historyEnd - historyStart
array(i) = start "-" end
bl = true
}
}
}
val result = bl match {
case false => {
//没有修改原array 没有交集 进行新增
val distinctArray2 = ArrayBuffer[String]()
distinctArray2.appendAll(array)
distinctArray2.append(start "-" end)
val distinctArray = distinctArray2.distinct.sortBy(a => (a.split("-")(0).toInt, a.split("-")(1).toInt))
val tmpArray = ArrayBuffer[String]()
tmpArray.append(distinctArray(0))
for (i <- 1 until distinctArray.length) {
val item = distinctArray(i).split("-")
val tmpItem = tmpArray(tmpArray.length - 1).split("-")
val itemStart = item(0)
val itemEnd = item(1)
val tmpItemStart = tmpItem(0)
val tmpItemEnd = tmpItem(1)
if (tmpItemStart.toInt < itemStart.toInt && tmpItemEnd.toInt < itemStart.toInt) {
//没有交集
tmpArray.append(itemStart "-" itemEnd)
} else {
//有交集
val resultStart = tmpItemStart
val resultEnd = if (tmpItemEnd.toInt > itemEnd.toInt) tmpItemEnd else itemEnd
tmpArray(tmpArray.length - 1) = resultStart "-" resultEnd
}
}
val play_interval = tmpArray.sortBy(a => (a.split("-")(0).toInt, a.split("-")(1).toInt)).mkString(",")
play_interval
}
case true => {
//修改了原array 进行区间重组
val distinctArray = array.distinct.sortBy(a => (a.split("-")(0).toInt, a.split("-")(1).toInt))
val tmpArray = ArrayBuffer[String]()
tmpArray.append(distinctArray(0))
for (i <- 1 until distinctArray.length) {
val item = distinctArray(i).split("-")
val tmpItem = tmpArray(tmpArray.length - 1).split("-")
val itemStart = item(0)
val itemEnd = item(1)
val tmpItemStart = tmpItem(0)
val tmpItemEnd = tmpItem(1)
if (tmpItemStart.toInt < itemStart.toInt && tmpItemEnd.toInt < itemStart.toInt) {
//没有交集
tmpArray.append(itemStart "-" itemEnd)
} else {
//有交集
val resultStart = tmpItemStart
val resultEnd = if (tmpItemEnd.toInt > itemEnd.toInt) tmpItemEnd else itemEnd
tmpArray(tmpArray.length - 1) = resultStart "-" resultEnd
}
}
val play_interval = tmpArray.sortBy(a => (a.split("-")(0).toInt, a.split("-")(1).toInt)).mkString(",")
play_interval
}
}
(effective_duration, result)
}
}