本文原作者:赖博先,经授权后发布。
一、简介
Spark SQL是spark主要组成模块之一,其主要作用与结构化数据,与hadoop生态中的hive是对标的。而DataFrame是spark SQL的一种编程抽象,提供更加便捷同时类同与SQL查询语句的API,让熟悉hive的数据分析工程师能够非常快速上手。
DataFrame是一种以命名列的方式组织的分布式数据集,可以类比于hive中的表。但是比hive表更加灵活的是,你可以使用各种数据源来构建一个DataFrame,如:结构化数据文件(例如json数据)、hive表格、外部数据库,还可以直接从已有的RDD变换得来。后面会把相关方法、接口跟大家一一道来。
二、初步使用
大家学习一门语言可能都是从“hello word!”开始的,这主要目的是让学习者熟悉程序运行的环境,同时亲身感受程序运行过程。这里我们也会从环境到运行的步骤进行讲解。
导入spark运行环境相关的类
所有spark相关的操作都是以sparkContext类作为入口,而Spark SQL相关的所有功能都是以SQLContext类作为入口。下面的语句是新建入口类的对象。最下面的语句是引入隐式转换,隐式的将RDD转换为DataFrame。
下面就是从tdw表中读取对应的表格数据,然后就可以使用DataFrame的API来操作数据表格,其中TDWSQLProvider是数平提供的spark tookit,可以在KM上找到这些API的详细用法。
这段代码的意思是从tdw 表中读取对应分区的数据,select出表格中对应的字段(这里面的字段名字就是表格字段名字,需要用双引号)toDF将筛选出来的字段转换成DataFrame,在进行groupBy操作,这里的groupBy操作跟TDW hive操作是一样的意思,对指定字段进行分组操作,count函数用来计数计数,这里得到的DataFrame最后有一个”count”命名的字段保存每个分组的个数(这里特别需要注意函数的返回类型,groupby函数返回的并不是dataframe类型的数据,后面会提到)。接下来的printSchema函数是打印出edge的视图,可以理解成tdw idea里面的show DDL,Show函数是打印出这个DataFrame前20行数据(默认),当然可以指定行数打印。
从上面的例子中可以看出,DataFrame基本把SQL函数给实现了,在hive中用到的很多操作(如:select、groupBy、count、join等等)可以使用同样的编程习惯写出spark程序,这对于没有函数式编程经验的同学来说绝对福利。
三、函数说明及其用法
函数式编程是spark编程的最大特点,而函数则是函数式编程的最小操作单元,这边主要列举DataFrame常用函数以及主要用法:
Action 操作
特别注意每个函数的返回类型
1、 collect() ,返回值是一个数组,返回dataframe集合所有的行
2、 collectAsList() 返回值是一个java类型的数组,返回dataframe集合所有的行
3、 count() 返回一个number类型的,返回dataframe集合的行数
4、 describe(cols: String*) 返回一个通过数学计算的类表值(count, mean, stddev, min, and max),这个可以传多个参数,中间用逗号分隔,如果有字段为空,那么不参与运算,只这对数值类型的字段。例如df.describe("age", "height").show()
5、 first() 返回第一行 ,类型是row类型
6、 head() 返回第一行 ,类型是row类型
7、 head(n:Int)返回n行 ,类型是row 类型
8、 show()返回dataframe集合的值 默认是20行,返回类型是unit
9、 show(n:Int)返回n行,,返回值类型是unit
10、 table(n:Int) 返回n行 ,类型是row 类型
DataFrame的基本操作
1、 cache()同步数据的内存
2、 columns 返回一个string类型的数组,返回值是所有列的名字
3、 dtypes返回一个string类型的二维数组,返回值是所有列的名字以及类型
4、 explan()打印执行计划
5、 explain(n:Boolean) 输入值为 false 或者true ,返回值是unit 默认是false ,如果输入true 将会打印 逻辑的和物理的
6、 isLocal 返回值是Boolean类型,如果允许模式是local返回true 否则返回false
7、 persist(newlevel:StorageLevel) 返回一个dataframe.this.type 输入存储模型类型
8、 printSchema() 打印出字段名称和类型 按照树状结构来打印
9、 registerTempTable(tablename:String) 返回Unit ,将df的对象只放在一张表里面,这个表随着对象的删除而删除了
10、 schema 返回structType 类型,将字段名称和类型按照结构体类型返回
11、 toDF()返回一个新的dataframe类型的
12、 toDF(colnames:String*)将参数中的几个字段返回一个新的dataframe类型的,
13、 unpersist() 返回dataframe.this.type 类型,去除模式中的数据
14、 unpersist(blocking:Boolean)返回dataframe.this.type类型 true 和unpersist是一样的作用false 是去除RDD
聚合函数:
1、 agg(expers:column*) 返回dataframe类型 ,同数学计算求值
df.agg(max("age"), avg("salary"))
df.groupBy().agg(max("age"), avg("salary"))
2、 agg(exprs: Map[String, String]) 返回dataframe类型 ,同数学计算求值 map类型的
df.agg(Map("age" -> "max", "salary" -> "avg"))
df.groupBy().agg(Map("age" -> "max", "salary" -> "avg"))
3、 agg(aggExpr: (String, String), aggExprs: (String, String)*) 返回dataframe类型 ,同数学计算求值
df.agg(Map("age" -> "max", "salary" -> "avg"))
df.groupBy().agg(Map("age" -> "max", "salary" -> "avg"))
4、 apply(colName: String) 返回column类型,捕获输入进去列的对象
5、 as(alias: String) 返回一个新的dataframe类型,就是原来的一个别名
6、 col(colName: String) 返回column类型,捕获输入进去列的对象
7、 cube(col1: String, cols: String*) 返回一个GroupedData类型,根据某些字段来汇总
8、 distinct 去重 返回一个dataframe类型
9、 drop(col: Column) 删除某列 返回dataframe类型
10、 dropDuplicates(colNames: Array[String]) 删除相同的列 返回一个dataframe
11、 except(other: DataFrame) 返回一个dataframe,返回在当前集合存在的在其他集合不存在的;这个操作非常有用呀
12、 explode[A, B](inputColumn: String, outputColumn: String)(f: (A) ⇒ TraversableOnce[B])(implicit arg0: scala.reflect.api.JavaUniverse.TypeTag[B]) 返回值是dataframe类型,这个 将一个字段进行更多行的拆分
df.explode("name","names") {name :String=> name.split(" ")}.show();
将name字段根据空格来拆分,拆分的字段放在names里面
13、 filter(conditionExpr: String): 刷选部分数据,返回dataframe类型
df.filter("age>10").show();
df.filter(df("age")>10).show();
df.where(df("age")>10).show(); 都可以
14、 groupBy(col1: String, cols: String*) 根据某写字段来汇总返回groupedate类型
df.groupBy("age").agg(Map("age"->"count")).show();
df.groupBy("age").avg().show();都可以
这里如果要把groupBy之后的结果转换成一个Dataframe需要另一个函数转换一下,比如 count
15、 intersect(other: DataFrame) 返回一个dataframe,在2个dataframe都存在的元素
16、 join(right: DataFrame, joinExprs: Column, joinType: String)
一个是关联的dataframe,第二个关联的条件,第三个关联的类型:inner, outer, left_outer, right_outer, leftsemi
df.join(ds,df("name")===ds("name")
and df("age")===ds("age"),"outer").show();
17、 limit(n: Int) 返回dataframe类型 去n 条数据出来
18、 na: DataFrameNaFunctions ,可以调用dataframenafunctions的功能区做过滤df.na.drop().show(); 删除为空的行
19、 orderBy(sortExprs: Column*) 做alise排序,还可以指定进行降序排序desc
20、 select(cols:string*) dataframe 做字段的刷选
df.select($"colA", $"colB" 1)
这里面select有两种类型的参数,一种是上面的string类型,就是前面没有$符号,如果加了$标识这是一个column类型。使用这种类型需要加import sqlContext.implicits._ (这些是从身边spark大神xuehao同学那里学到的)这些细节真的从实践中来,所以大家赶紧收藏!
21、selectExpr(exprs: String*) 做字段的刷选
df.selectExpr("name","name as names","upper(name)","age 1").show();
22、 sort(sortExprs: Column*) 排序
df.sort(df("age").desc).show(); 默认是asc;这里的写法可以有很多几种。
23、 unionAll(other:Dataframe) 合并 df.unionAll(ds).show();
24、 withColumnRenamed(existingName: String, newName: String)
修改列表 df.withColumnRenamed("name","names").show();
25、 withColumn(colName: String, col: Column) 增加一列
df.withColumn("aa",df("name")).show();
具体例子:
产看表格数据和表格视图
获取指定列并对齐进行操作
这里注意,这里的$”field”表示类型是column
根据条件进行过滤
首先是filter函数,这个跟RDD的是类同的,根据条件进行逐行过滤。现在的filter函数支持两种类型的参数,如下:一种是string类型,上图所示,运算符是在字符串里面的,还有一种是column类型也就是带$,注意运算符是在外面的。
另外一个where函数,类似,看图不赘述;
指定行或者多行进行排序排序
Sort和orderBY都可以达到排序的效果,可以指定根据一行或者多行进行排序,默认是升序,如果要使用降序进行排序,请使用column类型;
分组操作
分组聚合是在数据分析中最长用到的操作之一,比如上图所示,需要对某个字段进行分组求和、求平均、求最大最小等,可以直接使用groupBy函数,比SQL语句更类似于自然语言。这里还是那句话,得注意每个函数的返回类型。
Join操作
Join操作可以支持TDW sql涉及到的连接操作,格式也非常固定。
这里就先讲到这里,其实这里介绍的只是spark DataFrame最基础的一些函数,官方还提供了非常高级的API,比如bloomFilter、corr等等,同学们如果掌握了上面的内容,其他高级的可以查看官网提供的API介绍:
http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.DataFrameNaFunctions