spark2版本:
代码语言:javascript复制object SparkCoreTest {
def main(args: Array[String]): Unit = {
// 使用sparksession来创建对象
val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkCoreTest")
val spark: SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()
// 设置读表和写表
val readTable: String = "hydrogenation_flow_record"
val writeTable: String = "test200"
// 创建hbase输入的配置文件,并且把服务器上的hbase-site放进resources目录下
val hBaseConfRead: Configuration = HBaseConfiguration.create()
// inputtable代表是读数据的配置
hBaseConfRead.set(TableInputFormat.INPUT_TABLE, readTable)
//配置写入表,要定义一个Jobconf,与读表不同
val hBaseConfWrite: Configuration = HBaseConfiguration.create()
val jobConf = new JobConf(hBaseConfWrite)
jobConf.setOutputFormat(classOf[TableOutputFormat])
jobConf.set(TableOutputFormat.OUTPUT_TABLE, writeTable);
// 创建hadooprdd算子,出来的rdd为一个元组对象,第一个元素类型为ImmutableBytesWritable,所以写入时也要转成同样的转子
val hbaseRDD: RDD[(ImmutableBytesWritable, Result)] = spark.sparkContext.newAPIHadoopRDD(hBaseConfRead, classOf[TableInputFormat],
classOf[ImmutableBytesWritable],
classOf[Result])
// 导入toDF变成dataframe的隐式依赖,让下面可以用toDF方法
import spark.implicits._
val sps: DataFrame = hbaseRDD.map(r => (
Bytes.toString(r._2.getValue(Bytes.toBytes("SPSFlowInfo"), Bytes.toBytes("SPSFlowTotal"))),
Bytes.toString(r._2.getRow)
)).toDF("SPSFlowTotal", "row")
// 创建出来的dataframe进行命名
sps.createOrReplaceTempView("sps")
// 执行sql语句
val frame: DataFrame = spark.sql("SELECT sum(SPSFlowTotal) as A FROM sps WHERE row BETWEEN '4000069:1618539744390' and '4000069:1618539744426'")
// 将查到的数据组装成元组类型,元组的第一个为qualifier,元组的第二个是从dataframe里读到的数据
val tupleDS: Dataset[(String, String)] = frame.map(t => ("SPSFlowTotal", t(0).toString))
// 配置输出到hbase的rdd,新建一个put,第一个为row,第二个为具体列,具体列可以填写列族列,值,可以同时加多个列
val rdd: RDD[(ImmutableBytesWritable, Put)] = tupleDS.rdd.map { a => {
val put: Put = new Put(Bytes.toBytes("34353454353"))
put.addColumn(Bytes.toBytes("info"), Bytes.toBytes(a._1.toString), Bytes.toBytes(a._2))
// 封装成元组时第一个必须为ImmutableBytesWritable,符合spark和hadoop的规范
(new ImmutableBytesWritable, put)
}
}
// 执行保存操作
rdd.saveAsHadoopDataset(jobConf)
// 关闭session
spark.stop()
}
}
spark老版本:
代码语言:javascript复制object SparkCoreTest {
def main(args: Array[String]): Unit = {
val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkCoreTest")
// val spark: SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()
val sc: SparkContext = new SparkContext(sparkConf)
val hBaseConf = HBaseConfiguration.create()
hBaseConf.set(TableInputFormat.INPUT_TABLE, "hydrogenation_flow_record")
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._
// import spark.implicits._
val hBaseConf1 = HBaseConfiguration.create()
val jobConf = new JobConf(hBaseConf1)
jobConf.setOutputFormat(classOf[TableOutputFormat])
jobConf.set(TableOutputFormat.OUTPUT_TABLE, "test200");
val hbaseRDD: RDD[(ImmutableBytesWritable, Result)] = sc.newAPIHadoopRDD(hBaseConf, classOf[TableInputFormat],
classOf[ImmutableBytesWritable],
classOf[Result])
val sps: DataFrame = hbaseRDD.map(r => (
Bytes.toString(r._2.getValue(Bytes.toBytes("SPSFlowInfo"), Bytes.toBytes("SPSFlowTotal"))),
Bytes.toString(r._2.getRow)
)).toDF("SPSFlowTotal", "row")
// sps.registerTempTable("sps")
sps.createOrReplaceTempView("sps")
val frame: DataFrame = sqlContext.sql("SELECT sum(SPSFlowTotal) as A FROM sps WHERE row BETWEEN '4000069:1618539744390' and '4000069:1618539744426'")
val tupleDS: Dataset[(String, String)] = frame.map(t => ("SPSFlowTotal", t(0).toString))
val rdd: RDD[(ImmutableBytesWritable, Put)] = tupleDS.rdd.map { a => {
val put: Put = new Put(Bytes.toBytes("343534543533".toString))
put.addColumn(Bytes.toBytes("info"), Bytes.toBytes(a._1.toString), Bytes.toBytes(a._2))
(new ImmutableBytesWritable, put)
}
}
rdd.saveAsHadoopDataset(jobConf)
spark.stop()
}
}