HBase的相关操作-JavaAPI方式
一、需求说明
某某自来水公司,需要存储大量的缴费明细数据。以下截取了缴费明细的一部分内容
因为缴费明细的数据记录非常庞大,该公司的信息部门决定使用HBase来存储这些数据。并且,他们希望能够通过Java程序来访问这些数据。
二、准备工作
1、创建IDEA Maven 项目
2、导入相关pom依赖
代码语言:javascript复制 <repositories><!-- 代码库 -->
<repository>
<id>aliyun</id>
<url>http://maven.aliyun.com/nexus/content/groups/public/</url>
<releases>
<enabled>true</enabled>
</releases>
<snapshots>
<enabled>false</enabled>
<updatePolicy>never</updatePolicy>
</snapshots>
</repository>
</repositories>
<dependencies>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.6</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
<dependency>
<groupId>org.testng</groupId>
<artifactId>testng</artifactId>
<version>6.14.3</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<target>1.8</target>
<source>1.8</source>
</configuration>
</plugin>
</plugins>
</build>
3、创建包结构和类
在test目录创建 cn.it.hbase.admin.api_test 包结构
创建TableAmdinTest类
需求一: 使用java代码创建表
创建一个名为WATER_BILL的表,包含一个列蔟C1
代码语言:javascript复制//1 如何创建hbase中表 : WATER_BILL
@Test
public void test01() throws Exception{
//1. 创建 java 连接Hbase的 连接对象
//Configuration conf = new Configuration();
Configuration conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum","node1:2181,node2:2181,node3:2181"); // 如果告知hbase: 只需要设置zookeeper的地址即可, 因为zookeeper记录了hbase的各种元数据信息
Connection hbConn = ConnectionFactory.createConnection(conf);
//2. 根据连接获取管理对象: admin(执行对表操作) table(执行对表数据的操作)
Admin admin = hbConn.getAdmin();
//3. 执行相关的操作
//3.1: 首先判断表是否存在
boolean flag = admin.tableExists(TableName.valueOf("WATER_BILL")); // 存在 返回true
if(!flag){ // 说明表不存在
//3.2: 通过表构建器构建表信息对象 : 指定表名
TableDescriptorBuilder tableDescriptorBuilder = TableDescriptorBuilder.newBuilder(TableName.valueOf("WATER_BILL"));
//3.2: 在表构建表中, 添加列族
ColumnFamilyDescriptor familyDescriptor = ColumnFamilyDescriptorBuilder.newBuilder("C1".getBytes()).build();
tableDescriptorBuilder.setColumnFamily(familyDescriptor);
//3.3: 构建表的信息对象
TableDescriptor tableDescriptor = tableDescriptorBuilder.build();
//3.4: 执行创建表
admin.createTable(tableDescriptor);
}
//4. 释放资源
admin.close();
hbConn.close();
}
需求二: 往表中插入一条数据
代码语言:javascript复制// 需求2: 添加一条数据
@Test
public void test02() throws Exception{
//1. 创建 java 连接Hbase的 连接对象
Configuration conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum","node1:2181,node2:2181,node3:2181"); // 如果告知hbase: 只需要设置zookeeper的地址即可, 因为zookeeper记录了hbase的各种元数据信息
Connection hbConn = ConnectionFactory.createConnection(conf);
//2. 根据连接获取管理对象: admin(执行对表操作) table(执行对表数据的操作)
Table table = hbConn.getTable(TableName.valueOf("WATER_BILL"));
//3. 执行相关的操作 : 添加数据命令 put操作
Put put = new Put("4944191".getBytes());
put.addColumn("C1".getBytes(),"name".getBytes(),"登卫红".getBytes());
put.addColumn("C1".getBytes(),"address".getBytes(),"贵州省铜仁市德江县".getBytes());
put.addColumn("C1".getBytes(),"sex".getBytes(),"男".getBytes());
table.put(put);
//4. 释放资源
table.close();
hbConn.close();
}
优化操作: 抽取公共方法
代码语言:javascript复制 private Connection hBaseConn; private Admin admin; private Table table; private String tableName = "WATER_BILL" ;
@Before
public void before() throws Exception {
//1. 根据连接工厂构建hbase的连接对象
Configuration conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum", "node1:2181,node2:2181,node3:2181");
hBaseConn = ConnectionFactory.createConnection(conf);
//2. 根据连接对象, 获取相关的管理对象: admin table
admin = hBaseConn.getAdmin();
table = hBaseConn.getTable(TableName.valueOf(tableName));
}
@After
public void after() throws Exception {
//4. 释放资源
table.close(); admin.close();hBaseConn.close();
}
需求三: 查看一条数据
代码语言:javascript复制// 需求三: 查询某一条数据: id为 4944191
@Test
public void test03() throws Exception{
//1. 创建 java 连接Hbase的 连接对象
Configuration conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum","node1:2181,node2:2181,node3:2181"); // 如果告知hbase: 只需要设置zookeeper的地址即可, 因为zookeeper记录了hbase的各种元数据信息
Connection hbConn = ConnectionFactory.createConnection(conf);
//2. 根据连接获取管理对象: admin(执行对表操作) table(执行对表数据的操作)
Table table = hbConn.getTable(TableName.valueOf("WATER_BILL"));
//3. 执行相关的操作: 查询某一条数据 get
Get get = new Get("4944191".getBytes());
Result result = table.get(get); // 一个 Result 对象 表示 一行数据
//4. 处理结果集(查询)
List<Cell> listCells = result.listCells();
for (Cell cell : listCells) { // 从单元格中能获取到哪些内容: rowkey 列族 列名 列值
/*byte[] familyArray = cell.getFamilyArray();
byte familyLength = cell.getFamilyLength();
int familyOffset = cell.getFamilyOffset();
String family = new String(familyArray,familyOffset,familyLength);*/
byte[] rowBytes = CellUtil.cloneRow(cell);
byte[] familyBytes = CellUtil.cloneFamily(cell);
byte[] qualifierBytes = CellUtil.cloneQualifier(cell);
byte[] valueBytes = CellUtil.cloneValue(cell);
String row = Bytes.toString(rowBytes);
String family = Bytes.toString(familyBytes);
String qualifier = Bytes.toString(qualifierBytes);
String value = Bytes.toString(valueBytes);
System.out.println("rowkey:" row ";列族为:" family ";列名为:" qualifier "; 列值:" value);
}
//5. 释放资源:
table.close();
hbConn.close();
}
需求四: 删除一条数据
代码语言:javascript复制 @Test
public void test03() throws Exception {
//3. 执行相关操作:
Delete delete = new Delete("4944191".getBytes());
delete.addFamily("C2".getBytes());
delete.addColumn("C1".getBytes(),"name".getBytes());
table.delete(delete);
}
需求五: 删除表
代码语言:javascript复制 @Test
public void test04() throws Exception {
//3. 执行相关操作
if(admin.isTableEnabled(TableName.valueOf(tableName))){
admin.disableTable(TableName.valueOf(tableName));
}
admin.deleteTable(TableName.valueOf(tableName));
}
需求六: 导入数据
- 在资料中,有一份10W的抄表数据文件,我们需要将这里面的数据导入到HBase中
- 说明:
- 在HBase中,有一个Import的MapReduce作业,可以专门用来将数据文件导入到HBase中
- 用法:
hbase org.apache.hadoop.hbase.mapreduce.Import 表名 HDFS数据文件路径
- 开始导入:
- 将资料中数据文件上传到Linux中
- 再将文件上传到hdfs中
hadoop fs -mkdir -p /water_bill/output_ept_10W
hadoop fs -put part-m-00000_10w /water_bill/output_ept_10W
执行命令: hbase org.apache.hadoop.hbase.mapreduce.Import WATER_BILL /water_bill/output_ept_10W
注意: 一定要启动yarn集群
代码语言:javascript复制hbase org.apache.hadoop.hbase.mapreduce.Export WATER_BILL /water_bill/output_ept_10W_export
需求七: 查询数据 查询2020年6月份所有用户的用水量 : C1:RECORD_DATE
代码语言:javascript复制//需求七: 查询 2020年 6月份所有用户的用户量: C1:RECORD_DATE
@Test
public void test06() throws Exception{
//1. 创建 java 连接Hbase的 连接对象
Configuration conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum","node1:2181,node2:2181,node3:2181"); // 如果告知hbase: 只需要设置zookeeper的地址即可, 因为zookeeper记录了hbase的各种元数据信息
Connection hbConn = ConnectionFactory.createConnection(conf);
//2. 根据连接获取管理对象: admin(执行对表操作) table(执行对表数据的操作)
Table table = hbConn.getTable(TableName.valueOf("WATER_BILL"));
//3. 执行相关的操作
Scan scan = new Scan();
// LATEST_DATE >= 2020-06-01
// 列值过滤器
SingleColumnValueFilter start_filter = new SingleColumnValueFilter("C1".getBytes(), "LATEST_DATE".getBytes(),
CompareOperator.GREATER_OR_EQUAL, new BinaryComparator("2020-06-01".getBytes()));
// LATEST_DATE < 2020-07-01
SingleColumnValueFilter end_filter = new SingleColumnValueFilter("C1".getBytes(), "LATEST_DATE".getBytes(),
CompareOperator.LESS, new BinaryComparator("2020-07-01".getBytes()));
// 将两个条件合并在一起
FilterList filterList = new FilterList();
filterList.addFilter(start_filter);
filterList.addFilter(end_filter);
// 将条件封装到查询中
scan.setFilter(filterList);
scan.setLimit(10);
// 执行查询操作
ResultScanner results = table.getScanner(scan);
//4. 处理结果集(查询)
for (Result result : results) {
List<Cell> listCells = result.listCells();
for (Cell cell : listCells) {
byte[] rowBytes = CellUtil.cloneRow(cell);
byte[] familyBytes = CellUtil.cloneFamily(cell);
byte[] qualifierBytes = CellUtil.cloneQualifier(cell);
byte[] valueBytes = CellUtil.cloneValue(cell);
String row = Bytes.toString(rowBytes);
String family = Bytes.toString(familyBytes);
String qualifier = Bytes.toString(qualifierBytes);
if("NUM_CURRENT".equalsIgnoreCase(qualifier) || "NUM_PREVIOUS".equalsIgnoreCase(qualifier)
|| "NUM_USAGE".equalsIgnoreCase(qualifier) || "TOTAL_MONEY".equalsIgnoreCase(qualifier)){
Double value = Bytes.toDouble(valueBytes);
System.out.println("rowkey:" row ";列族为:" family ";列名为:" qualifier "; 列值:" value);
}else {
String value = Bytes.toString(valueBytes);
System.out.println("rowkey:" row ";列族为:" family ";列名为:" qualifier "; 列值:" value);
}
}
System.out.println("------------------------");
}
//5. 释放资源: @After
}