案例三:电影评分数据分析
使用电影评分数据进行数据分析,分别使用DSL编程和SQL编程,熟悉数据处理函数及SQL使用,业务需求说明:
对电影评分数据进行统计分析,获取Top10电影(电影评分平均值最高,并且每个电影被评分的次数大于200)。
数据格式如下,每行数据各个字段之间使用双冒号分开:
数据处理分析步骤如下:
- 第一步、读取电影评分数据,从本地文件系统读取
- 第二步、转换数据,指定Schema信息,封装到DataFrame
- 第三步、基于SQL方式分析
- 第四步、基于DSL方式分析
代码实现
电影评分数据分析,经过数据ETL、数据分析(SQL分析和DSL分析)及最终保存结果,整套数据处理分析流程,其中涉及到很多数据细节,完整代码如下:
代码语言:javascript复制package cn.itcast.sql
import java.util.Properties
import org.apache.spark.SparkContext
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
import org.apache.spark.storage.StorageLevel
/**
* 需求:对电影评分数据进行统计分析,获取Top10电影(电影评分平均值最高,并且每个电影被评分的次数大于2000)
*/
object SparkTop10Movie {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.appName(this.getClass.getSimpleName.stripSuffix("$"))
.master("local[*]")
// TODO: 设置shuffle时分区数目
.config("spark.sql.shuffle.partitions", "4")
.getOrCreate()
val sc: SparkContext = spark.sparkContext
sc.setLogLevel("WARN")
import spark.implicits._
// 1. 读取电影评分数据,从本地文件系统读取
val rawRatingsDS: Dataset[String] = spark.read.textFile("data/input/rating_100k.data")
// 2. 转换数据
val ratingsDF: DataFrame = rawRatingsDS
// 过滤数据
.filter(line => null != line && line.trim.split("t").length == 4)
// 提取转换数据
.mapPartitions{iter =>
iter.map{line =>
// 按照分割符分割,拆箱到变量中
val Array(userId, movieId, rating, timestamp) = line.trim.split("t")
// 返回四元组
(userId, movieId, rating.toDouble, timestamp.toLong)
}
}
// 指定列名添加Schema
.toDF("userId", "movieId", "rating", "timestamp")
/*
root
|-- userId: string (nullable = true)
|-- movieId: string (nullable = true)
|-- rating: double (nullable = false)
|-- timestamp: long (nullable = false)
*/
ratingsDF.printSchema()
/*
------ ------- ------ ---------
|userId|movieId|rating|timestamp|
------ ------- ------ ---------
| 1| 1193| 5.0|978300760|
| 1| 661| 3.0|978302109|
| 1| 594| 4.0|978302268|
| 1| 919| 4.0|978301368|
------ ------- ------ ---------
*/
ratingsDF.show(4)
// TODO: 基于SQL方式分析
// 第一步、注册DataFrame为临时视图
ratingsDF.createOrReplaceTempView("view_temp_ratings")
// 第二步、编写SQL
val top10MovieDF: DataFrame = spark.sql(
"""
|SELECT
| movieId, ROUND(AVG(rating), 2) AS avg_rating, COUNT(movieId) AS cnt_rating
|FROM
| view_temp_ratings
|GROUP BY
| movieId
|HAVING
| cnt_rating > 200
|ORDER BY
| avg_rating DESC, cnt_rating DESC
|LIMIT
| 10
""".stripMargin)
//top10MovieDF.printSchema()
top10MovieDF.show(10, truncate = false)
println("===============================================================")
// TODO: 基于DSL=Domain Special Language(特定领域语言) 分析
import org.apache.spark.sql.functions._
val resultDF: DataFrame = ratingsDF
// 选取字段
.select($"movieId", $"rating")
// 分组:按照电影ID,获取平均评分和评分次数
.groupBy($"movieId")
.agg(
round(avg($"rating"), 2).as("avg_rating"),
count($"movieId").as("cnt_rating")
)
// 过滤:评分次数大于200
.filter($"cnt_rating" > 200)
// 排序:先按照评分降序,再按照次数降序
.orderBy($"avg_rating".desc, $"cnt_rating".desc)
// 获取前10
.limit(10)
//resultDF.printSchema()
resultDF.show(10)
/*// TODO: 将分析的结果数据保存MySQL数据库和CSV文件
// 结果DataFrame被使用多次,缓存
resultDF.persist(StorageLevel.MEMORY_AND_DISK)
// 1. 保存MySQL数据库表汇总
resultDF
.coalesce(1)
.write
.mode("overwrite")
.option("driver", "com.mysql.jdbc.Driver")
.option("user", "root")
.option("password", "root")
.jdbc(
"jdbc:mysql://localhost:3306/bigdata?characterEncoding=UTF-8",
"top10_movies",
new Properties()
)
// 2. 保存CSV文件:每行数据中个字段之间使用逗号隔开
resultDF
.coalesce(1)
.write.mode("overwrite")
.csv("data/output/top10-movies")
// 释放缓存数据
resultDF.unpersist()*/
spark.stop()
}
}
Shuffle分区数
运行上述程序时,查看WEB UI监控页面发现,某个Stage中有200个Task任务,也就是说RDD有200分区Partition。
原因:在SparkSQL中当Job中产生Shuffle时,默认的分区数(spark.sql.shuffle.partitions )为200,在实际项目中要合理的设置。可以在构建SparkSession实例对象时进行设置
代码语言:javascript复制val spark = SparkSession.builder()
.appName(this.getClass.getSimpleName.stripSuffix("$"))
.master("local[*]")
// TODO: 设置shuffle时分区数目
.config("spark.sql.shuffle.partitions", "4")
.getOrCreate()
val sc: SparkContext = spark.sparkContext
sc.setLogLevel("WARN")
import spark.implicits._