写在前面: 博主是一名软件工程系大数据应用开发专业大二的学生,昵称来源于《爱丽丝梦游仙境》中的Alice和自己的昵称。作为一名互联网小白,
写博客一方面是为了记录自己的学习历程,一方面是希望能够帮助到很多和自己一样处于起步阶段的萌新。由于水平有限,博客中难免会有一些错误,有纰漏之处恳请各位大佬不吝赐教!个人小站:http://alices.ibilibili.xyz/ , 博客主页:https://alice.blog.csdn.net/ 尽管当前水平可能不及各位大佬,但我还是希望自己能够做得更好,因为一天的生活就是一生的缩影。我希望在最美的年华,做最好的自己!
之前博主利用业余时间,梳理了一份《SparkSQL编程系列》,奈何当时考虑不周,写的不是很详细。于是在正式开始学习了之后,决定整理一篇适合像我一样的小白级别都能看得懂的IDEA操作SparkSQL教程,于是就有了下文…
码字不易,先赞后看,养成习惯!

使用IDEA开发SparkSQL
准备好POM
代码语言:javascript复制<!-- 指定仓库位置,依次为aliyun、cloudera和jboss仓库 -->
<repositories>
<repository>
<id>aliyun</id>
<url>http://maven.aliyun.com/nexus/content/groups/public/</url>
</repository>
<repository>
<id>cloudera</id>
<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
</repository>
<repository>
<id>jboss</id>
<url>http://repository.jboss.com/nexus/content/groups/public</url>
</repository>
</repositories>
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<encoding>UTF-8</encoding>
<scala.version>2.11.8</scala.version>
<scala.compat.version>2.11</scala.compat.version>
<hadoop.version>2.7.4</hadoop.version>
<spark.version>2.2.0</spark.version>
</properties>
<dependencies>
<!--<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
<version>${spark.version}</version>
</dependency>-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.1.1</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.27</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive-thriftserver_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<!-- <dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
<version>${spark.version}</version>
</dependency>-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<!--<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.6.0-mr1-cdh5.14.0</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>1.2.0-cdh5.14.0</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>1.2.0-cdh5.14.0</version>
</dependency>-->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.7.4</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>1.3.1</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>1.3.1</version>
</dependency>
<dependency>
<groupId>com.typesafe</groupId>
<artifactId>config</artifactId>
<version>1.3.3</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.38</version>
</dependency>
</dependencies>1. 创建DataFrame/DataSet
Spark会根据文件信息尝试着去推断DataFrame/DataSet的Schema,当然我们也可以手动指定,手动指定的方式有以下几种:
第1种:指定列名添加Schema 第2种:通过StructType指定Schema 第3种:编写样例类,利用反射机制推断Schema
下面将针对上面出现的三种类型为大家一一展示
这里我们先准备好数据源tt.txt
代码语言:javascript复制19 zhhshang 66
20 lisi 66
19 wangwu 77
31 zhaoliu 66
19 maqi 881.1 指定列名添加Schema
代码语言:javascript复制object Demo01 {
def main(args: Array[String]): Unit = {
//1.创建SparkSession
val spark: SparkSession = SparkSession.builder().master("local[*]").appName("SparkSQL").getOrCreate()
val sc: SparkContext = spark.sparkContext
//设置日志级别
sc.setLogLevel("WARN")
//2.读取文件
val fileRDD: RDD[String] = sc.textFile("in/person.txt")
val linesRDD: RDD[Array[String]] = fileRDD.map(_.split(" "))
val rowRDD: RDD[(Int, String, Int)] = linesRDD.map(line =>(line(0).toInt,line(1),line(2).toInt))
//3.将RDD转成DF
//注意:RDD中原本没有toDF方法,新版本中要给它增加一个方法,可以使用隐式转换
import spark.implicits._
val personDF: DataFrame = rowRDD.toDF("id","name","age")
//查询前十行数据
personDF.show(10)
//打印元数据信息
personDF.printSchema()
//关闭资源
sc.stop()
spark.stop()
}
}运行结果

1.2 StructType指定Schema
代码语言:javascript复制object Demo02 {
def main(args: Array[String]): Unit = {
//1.创建SparkSession
val spark: SparkSession = SparkSession.builder().master("local[*]").appName("SparkSQL").getOrCreate()
val sc: SparkContext = spark.sparkContext
//设置日志级别
sc.setLogLevel("WARN")
//2.读取文件
val fileRDD: RDD[String] = sc.textFile("in/tt.txt")
val linesRDD: RDD[Array[String]] = fileRDD.map(_.split(" "))
// 将数据封装成ROW类型的 行数据
val rowRDD: RDD[Row] = linesRDD.map(line =>Row(line(0).toInt,line(1),line(2).toInt))
//3.将RDD转成DF
//注意:RDD中原本没有toDF方法,新版本中要给它增加一个方法,可以使用隐式转换
//import spark.implicits._
//设置表的一个模式
//
val schema: StructType = StructType(Seq(
StructField("id", IntegerType, true), //允许为空
StructField("name", StringType, true),
StructField("age", IntegerType, true))
)
val personDF: DataFrame = spark.createDataFrame(rowRDD,schema)
//
//查询前十行数据
personDF.show(10)
//打印元数据信息
personDF.printSchema()
//关闭资源
sc.stop()
spark.stop()
}
}运行结果

1.3 反射推断Schema
代码语言:javascript复制object Demo03 {
// 定义一个样例类
case class Person(id:Int,name:String,age:Int)
def main(args: Array[String]): Unit = {
//1.创建SparkSession
val spark: SparkSession = SparkSession.builder().master("local[*]").appName("SparkSQL")
.getOrCreate()
val sc: SparkContext = spark.sparkContext
sc.setLogLevel("WARN")
//2.读取文件
val fileRDD: RDD[String] = sc.textFile("in/tt.txt")
val linesRDD: RDD[Array[String]] = fileRDD.map(_.split(" "))
val rowRDD: RDD[Person] = linesRDD.map(line =>Person(line(0).toInt,line(1),line(2).toInt))
//3.将RDD转成DF
//注意:RDD中原本没有toDF方法,新版本中要给它增加一个方法,可以使用隐式转换
import spark.implicits._
//注意:上面的rowRDD的泛型是Person,里面包含了Schema信息
//所以SparkSQL可以通过反射自动获取到并添加给DF
val personDF: DataFrame = rowRDD.toDF
personDF.show(10)
personDF.printSchema()
sc.stop()
spark.stop()
}
}运行结果

可以发现以上三种方法都可以成功创建DataFrame/DataSet,接下来讲解的是在利用SparkSQL花式查询数据。
2. 花式查询
代码语言:javascript复制object QueryDemo {
case class Person(id:Int,name:String,age:Int)
def main(args: Array[String]): Unit = {
//1.创建SparkSession
val spark: SparkSession = SparkSession.builder().master("local[*]").appName("SparkSQL")
.getOrCreate()
val sc: SparkContext = spark.sparkContext
sc.setLogLevel("WARN")
//2.读取文件
val fileRDD: RDD[String] = sc.textFile("in/person.txt")
val linesRDD: RDD[Array[String]] = fileRDD.map(_.split(" "))
val rowRDD: RDD[Person] = linesRDD.map(line =>Person(line(0).toInt,line(1),line(2).toInt))
//3.将RDD转成DF
//注意:RDD中原本没有toDF方法,新版本中要给它增加一个方法,可以使用隐式转换
import spark.implicits._
//注意:上面的rowRDD的泛型是Person,里面包含了Schema信息
//所以SparkSQL可以通过反射自动获取到并添加给DF
val personDF: DataFrame = rowRDD.toDF
personDF.show(10)
personDF.printSchema()
//=======================SQL方式查询=======================
//0.注册表
personDF.createOrReplaceTempView("t_person")
//1.查询所有数据
spark.sql("select * from t_person").show()
//2.查询age 1
spark.sql("select age,age 1 from t_person").show()
//3.查询age最大的两人
spark.sql("select name,age from t_person order by age desc limit 2").show()
//4.查询各个年龄的人数
spark.sql("select age,count(*) from t_person group by age").show()
//5.查询年龄大于30的
spark.sql("select * from t_person where age > 30").show()
//=======================DSL方式查询=======================
//1.查询所有数据
personDF.select("name","age")
//2.查询age 1
personDF.select($"name",$"age" 1)
//3.查询age最大的两人
personDF.sort($"age".desc).show(2)
//4.查询各个年龄的人数
personDF.groupBy("age").count().show()
//5.查询年龄大于30的
personDF.filter($"age" > 30).show()
sc.stop()
spark.stop()
}
}查询出来的结果很多,博主就不贴长图出来了。感兴趣的朋友可以复制代码自行测试。
3. 相互转化
RDD、DF、DS之间的相互转换有很多(6种),但是我们实际操作就只有2类:
1)使用RDD算子操作 2)使用DSL/SQL对表操作
代码语言:javascript复制object TransformDemo {
case class Person(id:Int,name:String,age:Int)
def main(args: Array[String]): Unit = {
//1.创建SparkSession
val spark: SparkSession = SparkSession.builder().master("local[*]").appName("SparkSQL").getOrCreate()
val sc: SparkContext = spark.sparkContext
sc.setLogLevel("WARN")
//2.读取文件
val fileRDD: RDD[String] = sc.textFile("in/person.txt")
val linesRDD: RDD[Array[String]] = fileRDD.map(_.split(" "))
val personRDD: RDD[Person] = linesRDD.map(line =>Person(line(0).toInt,line(1),line(2).toInt))
//3.将RDD转成DF
//注意:RDD中原本没有toDF方法,新版本中要给它增加一个方法,可以使用隐式转换
import spark.implicits._
//注意:上面的rowRDD的泛型是Person,里面包含了Schema信息
//所以SparkSQL可以通过反射自动获取到并添加给DF
//=========================相互转换======================
//1.RDD-->DF
val personDF: DataFrame = personRDD.toDF
//2.DF-->RDD
val rdd: RDD[Row] = personDF.rdd
//3.RDD-->DS
val DS: Dataset[Person] = personRDD.toDS()
//4.DS-->RDD
val rdd2: RDD[Person] = DS.rdd
//5.DF-->DS
val DS2: Dataset[Person] = personDF.as[Person]
//6.DS-->DF
val DF: DataFrame = DS2.toDF()
sc.stop()
spark.stop()
}
}4. Spark SQL完成WordCount
作为一个经典的案例,初学SparkSQL怎么能少得了WordCount的身影呢,下面为大家带来的就是使用SparkSQL完成WordCount的开发过程。同样,分为SQL风格和DSL风格~
准备数据
words.txt
代码语言:javascript复制hadoop hadoop spark spark spark java java
sqoop sqoop jdk jdk
hive hive hive hbase
hbase flume
flume oozie oozie
flink flink flink
hello hello hello scala scala4.1 SQL风格
代码语言:javascript复制object WordCount {
def main(args: Array[String]): Unit = {
//1.创建SparkSession
val spark: SparkSession = SparkSession.builder().master("local[*]").appName("SparkSQL").getOrCreate()
val sc: SparkContext = spark.sparkContext
sc.setLogLevel("WARN")
//2.读取文件
val fileDF: DataFrame = spark.read.text("in/words.txt")
val fileDS: Dataset[String] = spark.read.textFile("in/words.txt")
//fileDF.show()
//fileDS.show()
//3.对每一行按照空格进行切分并压平
//fileDF.flatMap(_.split(" ")) //注意:错误,因为DF没有泛型,不知道_是String
import spark.implicits._
val wordDS: Dataset[String] = fileDS.flatMap(_.split(" "))//注意:正确,因为DS有泛型,知道_是String
//wordDS.show()
/*
-----
|value|
-----
|hello|
| me|
|hello|
| you|
...
*/
//4.对上面的数据进行WordCount
wordDS.createOrReplaceTempView("t_word")
val sql =
"""
|select value ,count(value) as count
|from t_word
|group by value
|order by count desc
""".stripMargin
spark.sql(sql).show()
sc.stop()
spark.stop()
}
}运行结果

4.2 DSL风格
代码语言:javascript复制object WordCount2 {
def main(args: Array[String]): Unit = {
//1.创建SparkSession
val spark: SparkSession = SparkSession.builder().master("local[*]").appName("SparkSQL").getOrCreate()
val sc: SparkContext = spark.sparkContext
sc.setLogLevel("WARN")
//2.读取文件
val fileDF: DataFrame = spark.read.text("in/words.txt")
val fileDS: Dataset[String] = spark.read.textFile("in/words.txt")
//fileDF.show()
//fileDS.show()
//3.对每一行按照空格进行切分并压平
//fileDF.flatMap(_.split(" ")) //注意:错误,因为DF没有泛型,不知道_是String
import spark.implicits._
val wordDS: Dataset[String] = fileDS.flatMap(_.split(" "))//注意:正确,因为DS有泛型,知道_是String
//wordDS.show()
/*
-----
|value|
-----
|hello|
| me|
|hello|
| you|
...
*/
//4.对上面的数据进行WordCount
wordDS.groupBy("value").count().orderBy($"count".desc).show()
sc.stop()
spark.stop()
}
}运行结果

本次的分享就到这里了,关于SparkSQL最基础的内容就在这里了,受益或对大数据技术感兴趣的朋友记得点赞关注(^U^)ノ~YO 后续博主还会更SparkSQL一些进阶拓展的内容,敬请期待(✪ω✪)


