文章目录
- 引言
- 今天给大家带来一个Spark综合练习案例--电影评分
- 总结
引言
大家好,我是ChinaManor,直译过来就是中国码农的意思,俺希望自己能成为国家复兴道路的铺路人,大数据领域的耕耘者,一个平凡而不平庸的人。
今天给大家带来一个Spark综合练习案例–电影评分
老师:给定需求统计评分次数>200的电影平均分Top10,并写入Mysql数据库中
我:所有字我都认识,怎么连在一起我就不认识了
不管了先new个实例对象,总没错吧
代码语言:javascript复制val sparkSession = SparkSession
.builder()
.config("spark.sql.shuffle.partitions", "4")
.appName("电影数据分析")
.master("local[2]")
.getOrCreate()
然后大数据无非输入,转换,输出,我再弄个spark读取文件?
代码语言:javascript复制 val lines: RDD[String] = sparkSession.read.textFile("E:\xx\SparkDemo\input\ratings.dat").rdd
再然后RDD转换成DF
代码语言:javascript复制val rdd: RDD[(Int, Int, Int, Long)] = lines.mapPartitions { item => {
item.map { line => {
val strings: Array[String] = line.trim.split("::")
(strings(0).toInt, strings(1).toInt, strings(2).toInt, strings(3).toLong)
}
}
}
}
import sparkSession.implicits._
val reusltDF: DataFrame = rdd.toDF("user_id", "item_id", "rating", "timestamp")
测试一下行不行
代码语言:javascript复制// 查看约束
reusltDF.printSchema()
//查看数据
reusltDF.show()
好像跑通了!!笑容逐渐放肆~什么SQL不整了,上来直接DSL
代码语言:javascript复制val resultDS: Dataset[Row] = reusltDF
//a.对数据按电影id进行分组
.groupBy($"item_id")
//b.对聚合数据求平均值和评分次数
.agg(
round(avg($"rating"), 2).as("avg_rating"),
count($"user_id").as("cnt_rating")
)
//c.过滤出评分大于2000的
.filter($"cnt_rating" > 2000)
//d.按照评分的平均值进行降序排序
.orderBy($"avg_rating".desc)
//e.取前十条数据
.limit(10)
最后最后保存到Mysql SaveToMysql(resultDF);
代码语言:javascript复制/**
* 保存数据至MySQL数据库,使用函数foreachPartition对每个分区数据操作,主键存在时更新,不存在时插入
*/
def saveToMySQL(dataFrame: DataFrame): Unit = {
dataFrame.rdd.coalesce(1).foreachPartition{ iter =>
// a. 加载驱动类
Class.forName("com.mysql.cj.jdbc.Driver")
// 声明变量
var conn: Connection = null
var pstmt: PreparedStatement = null
try{
// b. 获取连接
conn = DriverManager.getConnection(
"jdbc:mysql://192.168.88.100:3306/?serverTimezone=UTC&characterEncoding=utf8&useUnicode=true",
"root", //
"123456"
)
// c. 获取PreparedStatement对象
val insertSql ="""
|INSERT
|O
| db_test.demo
| (item_id, avg_rating, cnt_rating)
|VALUES (?, ?, ?)
|""".stripMargin
pstmt = conn.prepareStatement(insertSql)
conn.setAutoCommit(false)
// d. 将分区中数据插入到表中,批量插入
iter.foreach{ row =>
pstmt.setInt(1, row.getAs[Int]("item_id"))
pstmt.setInt(2, row.getAs[Int]("avg_rating"))
pstmt.setInt(3, row.getAs[Int]("cnt_rating"))
// 加入批次
pstmt.addBatch()
}
// TODO: 批量插入
pstmt.executeBatch()
conn.commit()
}catch {
case e: Exception => e.printStackTrace()
}finally {
if(null != pstmt) pstmt.close()
if(null != conn) conn.close()
} }
大功告成了!
总结
以上便是电影评分数据分析spark版,愿你读过之后有自己的收获,如果有收获不妨一键三连一下~