在 Spark SQL 中有两种方式可以在 DataFrame 和 RDD 中进行转换:
- ① 利用反射机制,推导包含某种类型的 RDD,通过反射将其转换为指定类型的 DataFrame,适用于提前知道 RDD 的 Schema。
- ② 通过编程借口与 RDD 进行交互获取 Schema,并动态创建 DataFrame,在运行时决定列及其类型。
DataFrame 中的数据结构信息,即为 Scheme
① 通过反射获取 RDD 内的 Scheme
(使用条件)已知类的 Schema,使用这种基于反射的方法会让代码更加简洁而且效果也更好。在 Scala 中,使用 case class 类型导入 RDD 并转换为 DataFrame,通过 case class 创建 Schema,case class 的参数名称会被利用反射机制作为列名。case class 可以嵌套组合成 Sequences 或者 Array。这种 RDD 可以高效的转换为 DataFrame 并注册为表。
其次,如果需要 RDD 与 DFS 或者 DS 之间互相操作,那么需要引入 import sqlContext.implicits._
- 这里的 sqlContext 不是包名,而是创建的 SparkSession 对象(这里为 SQLContext 对象)的变量名称,所以必须先创建 SparkSession 对象再导入。
- 这里 sqlContext 对象不能使用 var 声明,因为 Scala 只支持 val 修饰的对象的引入。
SparkSession 是 Spark 2.0 引入的概念,其封装了 SQLContext 和 HiveContext。
代码语言:javascript复制package sparksql
import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}
object DataFrametoRDDofReflection {
def main(args: Array[String]): Unit = {
}
def method1():Unit = {
val sparkConf = new SparkConf().setAppName("DataFrametoRDDofReflection").setMaster("local[2]")
val sc = new SparkContext(sparkConf)
val sqlContext = new SQLContext(sc)
// 引入 sqlContext.implicits._
import sqlContext.implicits._
// 将 RDD 转成 DataFrame
/*val people = sc.textFile("people.txt").toDF()*/
val people = sc.textFile("people.txt").map(_.split(",")).map(p=>Person(p(0),p(1).trim.toInt)).toDF()
people.show()
people.registerTempTable("people")
val teenagers = sqlContext.sql("SELECT name,age FROM people WHERE age >= 13 AND age <= 19")
teenagers.show()
// DataFrame 转成 RDD 进行操作:根据索引号取值
teenagers.map(t=>"Name:" t(0)).collect().foreach(println)
// DataFrame 转成 RDD 进行操作:根据字段名称取值
teenagers.map(t=>"Name:" t.getAs[String]("name")).collect().foreach(println)
// DataFrame 转成 RDD 进行操作:一次返回多列的值
teenagers.map(_.getValuesMap[Any](List("name","age"))).collect().foreach(println)
sc.stop()
}
/**
* 定义 Person 类
* @param name 姓名
* @param age 年龄
*/
case class Person(name:String,age:Int)
}
② 通过编程接口执行 Scheme
通过 Spark SQL 的接口创建 RDD 的 Schema,这种方式会让代码比较冗长。这种方法的好处是,在运行时才知道数据的列以及列的类型的情况下,可以动态生成 Schema。可以通过以下三步创建 DataFrame:
- 第一步将 RDD 转为包含 row 对象的 RDD
- 第二步基于 structType 类型创建 Schema,与第一步创建的 RDD 想匹配
- 第三步通过 SQLContext 的 createDataFrame 方法对第一步的 RDD 应用 Schema
package sparksql
import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}
object DataFrametoRDDofInterface {
def main(args: Array[String]): Unit = {
method2()
}
def method2(): Unit = {
val sparkConf = new SparkConf().setAppName("DataFrametoRDDofInterface").setMaster("local[2]")
val sc = new SparkContext(sparkConf)
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._
val people = sc.textFile("people.txt")
// 以字符串的方式定义 DataFrame 的 Schema 信息
val schemaString = "name age"
// 导入所需要的类
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{StructType,StructField,StringType}
// 根据自定义的字符串 schema 信息产生 DataFrame 的 Schema
val schema = StructType(
schemaString.split(" ").map(fieldName=>StructField(fieldName,StringType,true)))
// 将 RDD 转换成 Row
val rowRDD = people.map(_.split(",")).map(p=>Row(p(0),p(1).trim))
// 将 Schema 作用到 RDD 上
val peopleDataFrame = sqlContext.createDataFrame(rowRDD,schema)
// 将 DataFrame 注册成临时表
peopleDataFrame.registerTempTable("people")
// 获取 name 字段的值
val results = sqlContext.sql("SELECT name FROM people")
results.map(t => "Name" t(0)).collect().foreach(println)
sc.stop()
}
}