spark从hbase读数据到存入hbase数据两种版本写法

2021-04-19 18:01:04 浏览数 (1)

spark2版本:

代码语言:javascript复制
object SparkCoreTest {
  def main(args: Array[String]): Unit = {
    // 使用sparksession来创建对象
    val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkCoreTest")
    val spark: SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()
    // 设置读表和写表
    val readTable: String = "hydrogenation_flow_record"
    val writeTable: String = "test200"

    // 创建hbase输入的配置文件,并且把服务器上的hbase-site放进resources目录下
    val hBaseConfRead: Configuration = HBaseConfiguration.create()
    // inputtable代表是读数据的配置
    hBaseConfRead.set(TableInputFormat.INPUT_TABLE, readTable)


    //配置写入表,要定义一个Jobconf,与读表不同
    val hBaseConfWrite: Configuration = HBaseConfiguration.create()
    val jobConf = new JobConf(hBaseConfWrite)
    jobConf.setOutputFormat(classOf[TableOutputFormat])
    jobConf.set(TableOutputFormat.OUTPUT_TABLE, writeTable);

    // 创建hadooprdd算子,出来的rdd为一个元组对象,第一个元素类型为ImmutableBytesWritable,所以写入时也要转成同样的转子
    val hbaseRDD: RDD[(ImmutableBytesWritable, Result)] = spark.sparkContext.newAPIHadoopRDD(hBaseConfRead, classOf[TableInputFormat],
      classOf[ImmutableBytesWritable],
      classOf[Result])
    // 导入toDF变成dataframe的隐式依赖,让下面可以用toDF方法
    import spark.implicits._

    val sps: DataFrame = hbaseRDD.map(r => (
      Bytes.toString(r._2.getValue(Bytes.toBytes("SPSFlowInfo"), Bytes.toBytes("SPSFlowTotal"))),
      Bytes.toString(r._2.getRow)
    )).toDF("SPSFlowTotal", "row")
    // 创建出来的dataframe进行命名
    sps.createOrReplaceTempView("sps")
    // 执行sql语句
    val frame: DataFrame = spark.sql("SELECT sum(SPSFlowTotal) as A FROM sps WHERE row BETWEEN '4000069:1618539744390' and '4000069:1618539744426'")
    // 将查到的数据组装成元组类型,元组的第一个为qualifier,元组的第二个是从dataframe里读到的数据
    val tupleDS: Dataset[(String, String)] = frame.map(t => ("SPSFlowTotal", t(0).toString))
    // 配置输出到hbase的rdd,新建一个put,第一个为row,第二个为具体列,具体列可以填写列族列,值,可以同时加多个列
    val rdd: RDD[(ImmutableBytesWritable, Put)] = tupleDS.rdd.map { a => {
      val put: Put = new Put(Bytes.toBytes("34353454353"))
      put.addColumn(Bytes.toBytes("info"), Bytes.toBytes(a._1.toString), Bytes.toBytes(a._2))
      // 封装成元组时第一个必须为ImmutableBytesWritable,符合spark和hadoop的规范
      (new ImmutableBytesWritable, put)
    }
    }
    // 执行保存操作
    rdd.saveAsHadoopDataset(jobConf)
    // 关闭session
    spark.stop()
  }

}

spark老版本:

代码语言:javascript复制
object SparkCoreTest {
  def main(args: Array[String]): Unit = {
    val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkCoreTest")
//    val spark: SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()
    val sc: SparkContext = new SparkContext(sparkConf)

    val hBaseConf = HBaseConfiguration.create()
    hBaseConf.set(TableInputFormat.INPUT_TABLE, "hydrogenation_flow_record")
        val sqlContext = new SQLContext(sc)
        import sqlContext.implicits._
//    import spark.implicits._
    val hBaseConf1 = HBaseConfiguration.create()
    val jobConf = new JobConf(hBaseConf1)
    jobConf.setOutputFormat(classOf[TableOutputFormat])
    jobConf.set(TableOutputFormat.OUTPUT_TABLE, "test200");


    val hbaseRDD: RDD[(ImmutableBytesWritable, Result)] = sc.newAPIHadoopRDD(hBaseConf, classOf[TableInputFormat],
      classOf[ImmutableBytesWritable],
      classOf[Result])

    val sps: DataFrame = hbaseRDD.map(r => (
      Bytes.toString(r._2.getValue(Bytes.toBytes("SPSFlowInfo"), Bytes.toBytes("SPSFlowTotal"))),
      Bytes.toString(r._2.getRow)
    )).toDF("SPSFlowTotal", "row")
    //    sps.registerTempTable("sps")
    sps.createOrReplaceTempView("sps")

    val frame: DataFrame = sqlContext.sql("SELECT sum(SPSFlowTotal) as A FROM sps WHERE row BETWEEN '4000069:1618539744390' and '4000069:1618539744426'")
    val tupleDS: Dataset[(String, String)] = frame.map(t => ("SPSFlowTotal", t(0).toString))

    val rdd: RDD[(ImmutableBytesWritable, Put)] = tupleDS.rdd.map { a => {
      val put: Put = new Put(Bytes.toBytes("343534543533".toString))
      put.addColumn(Bytes.toBytes("info"), Bytes.toBytes(a._1.toString), Bytes.toBytes(a._2))
      (new ImmutableBytesWritable, put)
    }
    }
    rdd.saveAsHadoopDataset(jobConf)
    spark.stop()
  }

}

0 人点赞