由于HIVE更新的机制极其不适应SPARK环境,于是利用HBase来执行HIVE中某些统计结果的更新。首先要做的是实现Spark Hive访问,得到RDD,再将这个RDD导入到HBase中操作。 然而网上关于这一块目前资料还真很少。但是其原理总体上来说是非常简单的。 步骤主要是两步: (1)开启hive连接器,实现spark hive的访问,得到dataframe对象。
(2)对dataframe进行RDD转换,进行hbase的批量导入bulkput函数来实现。
hbaseContext.bulkPut[Row](rddFromSql.rdd, tableName, (putRecord) => { val put = new Put(Bytes.toBytes(putRecord.getString(0))) put.add(Bytes.toBytes(columnFamily1),Bytes.toBytes("receiver"),Bytes.toBytes(putRecord.getString(1))) put.add(Bytes.toBytes(columnFamily1),Bytes.toBytes("count"),Bytes.toBytes(putRecord.getLong(2))) put }, true);
运行成功,成功导入600W数据.