环境配置
Maven添加hbase-client的依赖
代码语言:txt复制 <!--HBase Client-->
<dependencies>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>2.1.2</version>
</dependency>
</dependencies>
Scala操作HBase
创建HBase的配置、Connection、Admin
代码语言:txt复制 /*
*创建一个HBase的配置,创建的时候会去加载classpath下的hbase-default.xml和hbase-site.xml两个配置文件
*/
private val conf = HBaseConfiguration.create()
//设置Zookeeper的地址和端口来访问HBase,先从配置中读取,如配置中不存在,设置地址为localhost,端口为默认端口2181
conf.set(HConstants.ZOOKEEPER_QUORUM, conf.get(HConstants.ZOOKEEPER_QUORUM, HConstants.LOCALHOST))
conf.set(HConstants.ZOOKEEPER_CLIENT_PORT, conf.get(HConstants.ZOOKEEPER_CLIENT_PORT, HConstants.DEFAULT_ZOOKEEPER_CLIENT_PORT.toString))
//创建操作HBase的入口connection
private val conn: Connection = ConnectionFactory.createConnection(conf)
//创建操作HBase表的入口Admin
private val admin: Admin = conn.getAdmin
获取表
代码语言:txt复制 /**
* 获取表
*
* @param tableName 表名
* @return HBase表
*/
def getTable(tableName: String): Table = {
val table = Try(conn.getTable(TableName.valueOf(tableName)))
table.get.close()
table match {
case Success(v) => v;
case Failure(e) => e.printStackTrace()
null
}
}
创建表
代码语言:txt复制/**
* 创建表
*
* @param tableName 表名
* @param cf 列族
*/
def createTable(tableName: String, cf: String): Unit = {
//创建表
val tableDesc = TableDescriptorBuilder.newBuilder(TableName.valueOf(tableName))
tableDesc.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder("basic".getBytes).build())
println(s"Creating table `$tableName`. ")
Try {
if (admin.tableExists(TableName.valueOf(tableName))) {
admin.disableTable(TableName.valueOf(tableName))
admin.deleteTable(TableName.valueOf(tableName))
}
admin.createTable(tableDesc.build())
admin.close()
println("Done!")
} match {
case Success(_) =>
case Failure(e) => e.printStackTrace()
}
}
删除表
代码语言:txt复制/**
* 删除表
*
* @param tableName 表名
* @param rowKey 行键
*/
def delete(tableName: String, rowKey: String): Unit = {
val table = conn.getTable(TableName.valueOf(tableName))
Try {
val d = new Delete(rowKey.getBytes)
table.delete(d)
table.close()
} match {
case Success(_) =>
case Failure(e) => e.printStackTrace()
}
}
往表中存放数据
代码语言:javascript复制/**
*
* 往表中存放数据
*
* @param tableName 表名
* @param rowKey 行键
* @param cf 列族
* @param qualifier 列限定符
* @param value 具体的值
*/
def put(tableName: String, rowKey: String, cf: String, qualifier: String, value: String): Unit = {
println(s"Put row key $rowKey into $tableName. ")
val table = conn.getTable(TableName.valueOf(tableName))
Try {
//准备一个row key
val p = new Put(rowKey.getBytes)
//为put操作指定 column qualifier 和 value
p.addColumn(cf.getBytes, qualifier.getBytes, value.getBytes)
//放数据到表中
table.put(p)
table.close()
} match {
case Success(_) => println("Done!")
case Failure(e) => e.printStackTrace()
}
}
获得表中的数据
代码语言:txt复制 /**
* 获得表里面的数据
*
* @param tableName 表名
* @param rowKey 行键
* @param cf 列族
* @param qualifier 列限定符
* @return 获得的数据
*/
def get(tableName: String, rowKey: String, cf: String, qualifier: String): String = {
val table = conn.getTable(TableName.valueOf(tableName))
Try {
val g = new Get(rowKey.getBytes)
val result = table.get(g)
table.close()
Bytes.toString(result.getValue(cf.getBytes(), qualifier.getBytes()))
} match {
case Success(v) => v
case Failure(e) => e.printStackTrace()
null
}
}
扫描表中的数据
代码语言:txt复制 /**
* 扫描数据
*
* @param tableName 表名
* @param cf 列族
* @param qualifier 列限定符
*/
def scan(tableName: String, cf: String, qualifier: String): Unit = {
val table = conn.getTable(TableName.valueOf(tableName))
val s = new Scan()
s.addColumn(cf.getBytes, qualifier.getBytes)
val scanner = table.getScanner(s)
Try {
val iterator = scanner.iterator()
while (iterator.hasNext) {
val next = iterator.next()
println("Found row: " next)
println("Found value: " Bytes.toString(
next.getValue(cf.getBytes, qualifier.getBytes)))
}
scanner.close()
table.close()
} match {
case Success(_) =>
case Failure(e) => e.printStackTrace()
}
}
附录
完整的代码已经上传到gist。
file-hbaseutils-scala
参考文献:
Spark 下操作 HBase(1.0.0 新 API)