大数据Kudu(九):Spark操作Kudu

2022-12-21 08:51:54 浏览数 (2)

Spark操作Kudu

一、​​​​​​​​​​​​​​添加Maven依赖

使用SparkSQL操作Kudu,这里需要导入Kudu与SparkSQL整合的包和SparkSQL的包,在Maven中导入如下依赖:

代码语言:javascript复制
<!--添加kudu-spark 依赖-->
<dependency>
    <groupId>org.apache.kudu</groupId>
    <artifactId>kudu-spark2_2.11</artifactId>
    <version>1.10.0-cdh6.3.2</version>
</dependency>

<!-- Spark SQL -->
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql_2.11</artifactId>
    <version>2.3.1</version>
</dependency>

​​​​​​​​​​​​​​二、KuduContext创建表

KuduContext创建Kudu表,与Java api 操作Kudu类似,经过以下步骤:

  • 创建SparkSession对象
  • 创建SparkContext对象
  • 创建KuduContext对象
  • 创建Kudu表

代码如下:

代码语言:javascript复制
val session: SparkSession = SparkSession
  .builder()
  .master("local")
  .appName("create_kudu_table")
  .getOrCreate()

//获取SparkContext
val sc: SparkContext = session.sparkContext

//构建KuduContext对象
val kuduContext = new  KuduContext("cm1:7051,cm2:7051",sc)

/**
  * 创建Kudu表
  */

//设置表名
val KUDU_TABLE_NAME="t_spark_kudu"

//设置表Schema信息
val schema = StructType(Array[StructField](
  StructField("id",IntegerType,false),
  StructField("name",StringType,false),
  StructField("age",IntegerType,false),
  StructField("score",DoubleType,false)
))

//指定Kudu表的Option信息 ,设置分区信息
val options = new CreateTableOptions()
options.addHashPartitions(util.Arrays.asList("id"),10)

//创建Kudu表 ,参数:表名,表Schema信息,指定主键,设置表分区选项
if(!kuduContext.tableExists(KUDU_TABLE_NAME)){
  kuduContext.createTable(KUDU_TABLE_NAME,schema,Seq[String]("id"),options)
}

经过以上操作,可以在Kudu WebUI中查看到对应的表:

三、​​​​​​​​​​​​​​KuduContext CRUD-增删改查数据

代码语言:javascript复制
case class PersonInfo(id:Int,name:String,age:Int,score:Double)

object SparkSQLCRUDToKudu {
  def main(args: Array[String]): Unit = {
    val session: SparkSession = SparkSession
      .builder()
      .master("local")
      .appName("create_kudu_table")
      .getOrCreate()

    //获取SparkContext
    val sc: SparkContext = session.sparkContext
    sc.setLogLevel("Error")

    //构建KuduContext对象
    val kuduContext = new  KuduContext("cm1:7051,cm2:7051",sc)

    //Kudu表
    val KUDU_TABLE = "t_spark_kudu"

    /**
      *  向表中插入数据
      */
//    insertData(session,kuduContext,KUDU_TABLE)

    /**
      * 查询Kudu表数据
      */
//    queryData(kuduContext,sc,KUDU_TABLE)

    /**
      * 向Kudu表更新数据
      */
//    updateData(session,kuduContext,KUDU_TABLE)

    /**
      * 删除Kudu表中的数据
      */
    deleteData(session,kuduContext,KUDU_TABLE)
    queryData(kuduContext,sc,KUDU_TABLE)

    session.stop()

  }

  def insertData(session:SparkSession,kuduContext:KuduContext,tbl:String) = {
    //准备DataFrame
    val personList = List[PersonInfo](
      PersonInfo(1,"zhangsan",18,100),
      PersonInfo(2,"zhangsan",19,200),
      PersonInfo(3,"zhangsan",20,300)
    )
    import session.implicits._
    val df = personList.toDF()
    //向Kudu表 t_spark_kudu中插入数据
    kuduContext.insertRows(df,tbl)
  }


  def queryData(kuduContext: KuduContext,sc:SparkContext,tbl:String)={
    //查询kudu表中的数据,加载RDD
    val rdd: RDD[Row] = kuduContext.kuduRDD(sc,tbl,Seq[String]("id","name","age","score"))
     rdd.foreach(println)
  }

  def updateData(session:SparkSession,kuduContext: KuduContext,tbl:String)={
    val list = List[PersonInfo](
      PersonInfo(100,"tianqi",30,400)
    )
    import session.implicits._
    val updateDF = list.toDF()

    //更新数据,主键不存在就报错,主键存在就更新
//    kuduContext.updateRows(updateDF,tbl)

    //更新数据,主键不存就直接插入,主键存在就更新
    kuduContext.upsertRows(updateDF,tbl)
  }


  def deleteData(session:SparkSession,kuduContext: KuduContext,tbl:String)={
    val list = List[PersonInfo](
      PersonInfo(1,"zhangsan",18,100),
      PersonInfo(2,"zhangsan",19,200),
      PersonInfo(3,"zhangsan",20,300)
    )
    import session.implicits._
    //删除Kudu表中的数据,需要只传入主键列
    val deleteKeysDF = list.toDF().select("id")
    kuduContext.deleteRows(deleteKeysDF,tbl)
  }
}

四、​​​​​​​​​​​​​​SparkSQL 操作Kudu表

代码语言:javascript复制
val session: SparkSession = SparkSession
  .builder()
  .master("local")
  .appName("create_kudu_table")
  .getOrCreate()

//SparkSQL 读取Kudu表中的数据
val kuduOptionMap = Map[String,String](
  "kudu.master" -> "cm1:7051,cm2:7051",
  "kudu.table" ->"t_spark_kudu"
)

//frame注册表操作
frame.createTempView("tmp")
session.sql(
  """
    |select count(*) from tmp
  """.stripMargin).show()

//加载数据
val frame: DataFrame = session.read.options(kuduOptionMap).format("kudu").load()
frame.show()

//准备插入到Kudu表的 DataFrame 数据,如果主键存在,在Kudu中就会被替换
val list = List[PersonInfo](
  PersonInfo(10,"a",20,100),
  PersonInfo(11,"a",21,101),
  PersonInfo(12,"a",22,102)
)
import session.implicits._
val resultDF: DataFrame = list.toDF()

//将DataFrame结果保存到Kudu表中,目前仅支持Append模式
resultDF.write.options(kuduOptionMap).mode(SaveMode.Append).format("kudu").save()

//再次查询Kudu表 t_spark_kudu 数据
session.read.options(kuduOptionMap).format("kudu").load().show()

0 人点赞