Spark操作Kudu
一、添加Maven依赖
使用SparkSQL操作Kudu,这里需要导入Kudu与SparkSQL整合的包和SparkSQL的包,在Maven中导入如下依赖:
代码语言:javascript复制<!--添加kudu-spark 依赖-->
<dependency>
<groupId>org.apache.kudu</groupId>
<artifactId>kudu-spark2_2.11</artifactId>
<version>1.10.0-cdh6.3.2</version>
</dependency>
<!-- Spark SQL -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.3.1</version>
</dependency>
二、KuduContext创建表
KuduContext创建Kudu表,与Java api 操作Kudu类似,经过以下步骤:
- 创建SparkSession对象
- 创建SparkContext对象
- 创建KuduContext对象
- 创建Kudu表
代码如下:
代码语言:javascript复制val session: SparkSession = SparkSession
.builder()
.master("local")
.appName("create_kudu_table")
.getOrCreate()
//获取SparkContext
val sc: SparkContext = session.sparkContext
//构建KuduContext对象
val kuduContext = new KuduContext("cm1:7051,cm2:7051",sc)
/**
* 创建Kudu表
*/
//设置表名
val KUDU_TABLE_NAME="t_spark_kudu"
//设置表Schema信息
val schema = StructType(Array[StructField](
StructField("id",IntegerType,false),
StructField("name",StringType,false),
StructField("age",IntegerType,false),
StructField("score",DoubleType,false)
))
//指定Kudu表的Option信息 ,设置分区信息
val options = new CreateTableOptions()
options.addHashPartitions(util.Arrays.asList("id"),10)
//创建Kudu表 ,参数:表名,表Schema信息,指定主键,设置表分区选项
if(!kuduContext.tableExists(KUDU_TABLE_NAME)){
kuduContext.createTable(KUDU_TABLE_NAME,schema,Seq[String]("id"),options)
}
经过以上操作,可以在Kudu WebUI中查看到对应的表:
三、KuduContext CRUD-增删改查数据
代码语言:javascript复制case class PersonInfo(id:Int,name:String,age:Int,score:Double)
object SparkSQLCRUDToKudu {
def main(args: Array[String]): Unit = {
val session: SparkSession = SparkSession
.builder()
.master("local")
.appName("create_kudu_table")
.getOrCreate()
//获取SparkContext
val sc: SparkContext = session.sparkContext
sc.setLogLevel("Error")
//构建KuduContext对象
val kuduContext = new KuduContext("cm1:7051,cm2:7051",sc)
//Kudu表
val KUDU_TABLE = "t_spark_kudu"
/**
* 向表中插入数据
*/
// insertData(session,kuduContext,KUDU_TABLE)
/**
* 查询Kudu表数据
*/
// queryData(kuduContext,sc,KUDU_TABLE)
/**
* 向Kudu表更新数据
*/
// updateData(session,kuduContext,KUDU_TABLE)
/**
* 删除Kudu表中的数据
*/
deleteData(session,kuduContext,KUDU_TABLE)
queryData(kuduContext,sc,KUDU_TABLE)
session.stop()
}
def insertData(session:SparkSession,kuduContext:KuduContext,tbl:String) = {
//准备DataFrame
val personList = List[PersonInfo](
PersonInfo(1,"zhangsan",18,100),
PersonInfo(2,"zhangsan",19,200),
PersonInfo(3,"zhangsan",20,300)
)
import session.implicits._
val df = personList.toDF()
//向Kudu表 t_spark_kudu中插入数据
kuduContext.insertRows(df,tbl)
}
def queryData(kuduContext: KuduContext,sc:SparkContext,tbl:String)={
//查询kudu表中的数据,加载RDD
val rdd: RDD[Row] = kuduContext.kuduRDD(sc,tbl,Seq[String]("id","name","age","score"))
rdd.foreach(println)
}
def updateData(session:SparkSession,kuduContext: KuduContext,tbl:String)={
val list = List[PersonInfo](
PersonInfo(100,"tianqi",30,400)
)
import session.implicits._
val updateDF = list.toDF()
//更新数据,主键不存在就报错,主键存在就更新
// kuduContext.updateRows(updateDF,tbl)
//更新数据,主键不存就直接插入,主键存在就更新
kuduContext.upsertRows(updateDF,tbl)
}
def deleteData(session:SparkSession,kuduContext: KuduContext,tbl:String)={
val list = List[PersonInfo](
PersonInfo(1,"zhangsan",18,100),
PersonInfo(2,"zhangsan",19,200),
PersonInfo(3,"zhangsan",20,300)
)
import session.implicits._
//删除Kudu表中的数据,需要只传入主键列
val deleteKeysDF = list.toDF().select("id")
kuduContext.deleteRows(deleteKeysDF,tbl)
}
}
四、SparkSQL 操作Kudu表
代码语言:javascript复制val session: SparkSession = SparkSession
.builder()
.master("local")
.appName("create_kudu_table")
.getOrCreate()
//SparkSQL 读取Kudu表中的数据
val kuduOptionMap = Map[String,String](
"kudu.master" -> "cm1:7051,cm2:7051",
"kudu.table" ->"t_spark_kudu"
)
//frame注册表操作
frame.createTempView("tmp")
session.sql(
"""
|select count(*) from tmp
""".stripMargin).show()
//加载数据
val frame: DataFrame = session.read.options(kuduOptionMap).format("kudu").load()
frame.show()
//准备插入到Kudu表的 DataFrame 数据,如果主键存在,在Kudu中就会被替换
val list = List[PersonInfo](
PersonInfo(10,"a",20,100),
PersonInfo(11,"a",21,101),
PersonInfo(12,"a",22,102)
)
import session.implicits._
val resultDF: DataFrame = list.toDF()
//将DataFrame结果保存到Kudu表中,目前仅支持Append模式
resultDF.write.options(kuduOptionMap).mode(SaveMode.Append).format("kudu").save()
//再次查询Kudu表 t_spark_kudu 数据
session.read.options(kuduOptionMap).format("kudu").load().show()