文章目录- 引言
- 数据介绍:使用的文件movies.csv和ratings.csv
- 建表语句
- 项目结构一览图
- 由题意可知
- 总结
引言
大家好,我是ChinaManor,直译过来就是中国码农的意思,俺希望自己能成为国家复兴道路的铺路人,大数据领域的耕耘者,平凡但不甘于平庸的人。
这是我的上篇博文,当时仅是做了一个实现案例(demo级别 ),没想到居然让我押中了题,还让我稳稳的及格了(这次测试试卷难度极大,考60分都能在班上排进前10)
不过我在复盘的时候,发现自己的致命弱点:写sql的能力太菜了。。
于是我重做了一遍,并满足了导师提的3个需求:
需求1: 查找电影评分个数超过50,且平均评分较高的前十部电影名称及其对应的平均评分 需求2: 查找每个电影类别及其对应的平均评分 需求3: 查找被评分次数较多的前十部电影
数据介绍:使用的文件movies.csv和ratings.csv
movies.csv该文件是电影数据,对应的为维表数据,其数据格式为 movieId title genres 电影id 电影名称 电影所属分类 样例数据如下所示:逗号分隔 1,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy
ratings.csv该文件为定影评分数据,其数据格式为 userId movieId rating timestamp 电影id 电影名称 电影所属分类 时间戳
建表语句
代码语言:javascript复制CREATE DATABASE db_movies;
USE db_movies;
CREATE TABLE `ten_movies_avgrating` (
`id` int(11) NOT NULL AUTO_INCREMENT COMMENT '自增id',
`movieId` int(11) NOT NULL COMMENT '电影id',
`ratingNum` int(11) NOT NULL COMMENT '评分个数',
`title` varchar(100) NOT NULL COMMENT '电影名称',
`avgRating` decimal(10,2) NOT NULL COMMENT '平均评分',
`update_time` datetime DEFAULT CURRENT_TIMESTAMP COMMENT '更新时间',
PRIMARY KEY (`id`),
UNIQUE KEY `movie_id_UNIQUE` (`movieId`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
CREATE TABLE `genres_average_rating` (
`id` int(11) NOT NULL AUTO_INCREMENT COMMENT '自增id',
`genres` varchar(100) NOT NULL COMMENT '电影类别',
`avgRating` decimal(10,2) NOT NULL COMMENT '电影类别平均评分',
`update_time` datetime DEFAULT CURRENT_TIMESTAMP COMMENT '更新时间',
PRIMARY KEY (`id`),
UNIQUE KEY `genres_UNIQUE` (`genres`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
CREATE TABLE `ten_most_rated_films` (
`id` int(11) NOT NULL AUTO_INCREMENT COMMENT '自增id',
`movieId` int(11) NOT NULL COMMENT '电影Id',
`title` varchar(100) NOT NULL COMMENT '电影名称',
`ratingCnt` int(11) NOT NULL COMMENT '电影被评分的次数',
`update_time` datetime DEFAULT CURRENT_TIMESTAMP COMMENT '更新时间',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
项目结构一览图
由题意可知
先创建实体类,字段是从建表语句中得来的。
Entry.scala
代码语言:javascript复制package cn.movies.Packet
/**
* @author ChinaManor
* #Description Entry
* #Date: 6/6/2021 17:23
*/
object Entry {
case class Movies(
movieId: String, // 电影的id
title: String, // 电影的标题
genres: String // 电影类别
)
case class Ratings(
userId: String, // 用户的id
movieId: String, // 电影的id
rating: String, // 用户评分
timestamp: String // 时间戳
)
// 需求1MySQL结果表
case class tenGreatestMoviesByAverageRating(
movieId: String, // 电影的id
ratingNum:String,
title: String, // 电影的标题
avgRating: String // 电影平均评分
)
// 需求2MySQL结果表
case class topGenresByAverageRating(
genres: String, //电影类别
avgRating: String // 平均评分
)
// 需求3MySQL结果表
case class tenMostRatedFilms(
movieId: String, // 电影的id
title: String, // 电影的标题
ratingCnt: String // 电影被评分的次数
)
}
再创建个表结构~~
Schema.scala
代码语言:javascript复制package cn.movies.Packet
import org.apache.spark.sql.types.{DataTypes, StructType}
/**
* @author ChinaManor
* #Description Schema
* #Date: 6/6/2021 17:34
*/
object Schema {
class SchemaLoader {
// movies数据集schema信息
private val movieSchema = new StructType()
.add("movieId", DataTypes.StringType, false)
.add("title", DataTypes.StringType, false)
.add("genres", DataTypes.StringType, false)
// ratings数据集schema信息
private val ratingSchema = new StructType()
.add("userId", DataTypes.StringType, false)
.add("movieId", DataTypes.StringType, false)
.add("rating", DataTypes.StringType, false)
.add("timestamp", DataTypes.StringType, false)
def getMovieSchema: StructType = movieSchema
def getRatingSchema: StructType = ratingSchema
}
}
然后开始写Main方法,其实只有区区八十行代码。。。
spark总要有实例对象吧。
代码语言:javascript复制// 创建spark session
val spark = SparkSession
.builder
.appName(this.getClass.getSimpleName.stripSuffix("$"))
.master("local[4]")
.getOrCreate
然后 new个schema信息
代码语言:javascript复制val schemaLoader = new SchemaLoader
然后尝试读取csv文件, // 读取Movie数据集 val movieDF: DataFrame = readCsvIntoDataSet(spark, MOVIES_CSV_FILE_PATH, schemaLoader.getMovieSchema) // 读取Rating数据集 val ratingDF: DataFrame = readCsvIntoDataSet(spark, RATINGS_CSV_FILE_PATH, schemaLoader.getRatingSchema)
发现读取方法和路径都没有,于是补救一下
代码语言:javascript复制 // 文件路径
private val MOVIES_CSV_FILE_PATH = "D:\Users\Administrator\Desktop\exam0601\datas\movies.csv"
private val RATINGS_CSV_FILE_PATH = "D:\Users\Administrator\Desktop\exam0601\datas\ratings.csv"
/**
* 读取数据文件,转成DataFrame
*
* @param spark
* @param path
* @param schema
* @return
*/
def readCsvIntoDataSet(spark: SparkSession, path: String, schema: StructType) = {
val dataDF: DataFrame = spark.read
.format("csv")
.option("header", "true")
.schema(schema)
.load(path)
dataDF
}
紧接着重头戏来了。。 写sql语句,在大数据行业懂得写sql就等于会了80%
代码语言:javascript复制WITH ratings_filter_cnt AS (
SELECT
movieId,
count( * ) AS rating_cnt,
Round(avg( rating ),2) AS avg_rating
FROM
ratings
GROUP BY
movieId
HAVING
count( * ) >= 50
),
ratings_filter_score AS (
SELECT
movieId, -- 电影id
rating_cnt, -- 个数
avg_rating -- 电影平均评分
FROM ratings_filter_cnt
ORDER BY avg_rating DESC -- 平均评分降序排序
LIMIT 10 -- 平均分较高的前十部电影
)
SELECT
m.movieId,
r.rating_cnt AS ratingNum,
m.title,
r.avg_rating AS avgRating
FROM
ratings_filter_score r
JOIN movies m ON m.movieId = r.movieId ORDER BY r.avg_rating DESC
关键点在于 WITH XXX AS SELECT
最后保存写入mysql表中
代码语言:javascript复制 def saveToMysql(reportDF: DataFrame) = {
// TODO: 使用SparkSQL提供内置Jdbc数据源保存数据
reportDF
.coalesce(1)
.write
// 追加模式,将数据追加到MySQL表中,再次运行,主键存在,报错异常
.mode(SaveMode.Append)
// 覆盖模式,无需测试,直接将以前数据全部删除,再次重新重建表,肯定不行
//.mode(SaveMode.Overwrite)
.format("jdbc")
.option("driver", "com.mysql.jdbc.Driver")
.option("url", "jdbc:mysql://192.168.88.100:3306/db_movies?serverTimezone=UTC&characterEncoding=utf8&useUnicode=true")
.option("user", "root")
.option("password", "123456")
.option("dbtable", "db_movies.ten_most_rated_films")
.save()
}
另外两个需求的SQL:
代码语言:javascript复制 // 需求2:查找每个电影类别及其对应的平均评分
WITH explode_movies AS (
SELECT
movieId,
title,
category
FROM
movies lateral VIEW explode ( split ( genres, "\|" ) ) temp AS category //爆炸函数拆一下|
)
SELECT
m.category AS genres,
Round(avg( r.rating ),2) AS avgRating
FROM
explode_movies m
JOIN ratings r ON m.movieId = r.movieId
GROUP BY
m.category
ORDER BY avgRating DESC
// 需求3:查找被评分次数较多的前十部电影
WITH rating_group AS (
SELECT
movieId,
count( * ) AS ratingCnt
FROM ratings
GROUP BY movieId
),
rating_filter AS (
SELECT
movieId,
ratingCnt
FROM rating_group
ORDER BY ratingCnt DESC
LIMIT 10
)
SELECT
m.movieId,
m.title,
r.ratingCnt
FROM
rating_filter r
JOIN movies m ON r.movieId = m.movieId ORDER BY r.ratingCnt DESC
总结
以上便是spark电影评分数据分析二次改写,比之前一篇sql更复杂,需求更多,
愿你读过之后有自己的收获