hadoop2-HBase的Java API操作

2018-12-10 10:22:54 浏览数 (1)

Hbase提供了丰富的Java API,以及线程池操作,下面我用线程池来展示一下使用Java API操作Hbase。

项目结构如下:

我使用的Hbase的版本是

hbase-0.98.9-hadoop2-bin.tar.gz

大家下载后,可以拿到里面的lib目录下面的jar文件,即上所示的hbase-lib资源。

接口类:

/hbase-util/src/com/b510/hbase/util/dao/HbaseDao.java

代码语言:javascript复制
 1 package com.b510.hbase.util.dao;
 2 
 3 import java.util.List;
 4 
 5 import org.apache.hadoop.hbase.client.HTableInterface;
 6 
 7 
 8 /**
 9  * @author Hongten
10  * @created 7 Nov 2018
11  */
12 public interface HbaseDao {
13 
14     // initial table
15     public HTableInterface getHTableFromPool(String tableName);
16 
17     // check if the table is exist
18     public boolean isHTableExist(String tableName);
19 
20     // create table
21     public void createHTable(String tableName, String[] columnFamilys);
22 
23     // insert new row
24     public void addRow(String tableName, String rowKey, String columnFamily, String column, String value);
25 
26     // get row by row key
27     public void getRow(String tableName, String rowKey);
28 
29     public void getAllRows(String tableName);
30 
31     // get rows by giving range
32     public void getRowsByRange(String tableName, String startRowKey, String endRowKey);
33 
34     //delete row
35     public void delRow(String tableName, String rowKey);
36     
37     //delete rows by row keys
38     public void delRowsByRowKeys(String tableName, List<String> rowKeys);
39 
40     // auto flush data when close
41     public void closeAutoFlush(HTableInterface table);
42 
43     // close table
44     public void closeTable(HTableInterface table);
45 
46     // close pool connection
47     public void closePoolConnection();
48 
49     // delete table
50     public void deleteHTable(String tableName);
51 }

实现类:

/hbase-util/src/com/b510/hbase/util/dao/impl/HbaseDaoImpl.java

代码语言:javascript复制
  1 package com.b510.hbase.util.dao.impl;
  2 
  3 import java.io.IOException;
  4 import java.util.List;
  5 
  6 import org.apache.hadoop.conf.Configuration;
  7 import org.apache.hadoop.hbase.Cell;
  8 import org.apache.hadoop.hbase.CellUtil;
  9 import org.apache.hadoop.hbase.HBaseConfiguration;
 10 import org.apache.hadoop.hbase.HColumnDescriptor;
 11 import org.apache.hadoop.hbase.HTableDescriptor;
 12 import org.apache.hadoop.hbase.MasterNotRunningException;
 13 import org.apache.hadoop.hbase.TableName;
 14 import org.apache.hadoop.hbase.ZooKeeperConnectionException;
 15 import org.apache.hadoop.hbase.client.Delete;
 16 import org.apache.hadoop.hbase.client.Get;
 17 import org.apache.hadoop.hbase.client.HBaseAdmin;
 18 import org.apache.hadoop.hbase.client.HTableInterface;
 19 import org.apache.hadoop.hbase.client.HTablePool;
 20 import org.apache.hadoop.hbase.client.Put;
 21 import org.apache.hadoop.hbase.client.Result;
 22 import org.apache.hadoop.hbase.client.ResultScanner;
 23 import org.apache.hadoop.hbase.client.Scan;
 24 
 25 import com.b510.hbase.util.dao.HbaseDao;
 26 
 27 /**
 28  * @author Hongten
 29  * @created 7 Nov 2018
 30  */
 31 @SuppressWarnings("deprecation")
 32 public class HbaseDaoImpl implements HbaseDao {
 33 
 34     private static Configuration conf = null;
 35     private static HBaseAdmin hAdmin;
 36     private static HTablePool pool;
 37 
 38     private static int defaultPoolSize = 5;
 39 
 40     public HbaseDaoImpl(int poolSize) {
 41         conf = HBaseConfiguration.create();
 42         conf.set("hbase.zookeeper.quorum", "node1:2888,node2:2888,node3:2888");
 43         try {
 44             hAdmin = new HBaseAdmin(conf);
 45             // the default pool size is 5.
 46             pool = new HTablePool(conf, poolSize <= 0 ? defaultPoolSize : poolSize);
 47         } catch (MasterNotRunningException e) {
 48             e.printStackTrace();
 49         } catch (ZooKeeperConnectionException e) {
 50             e.printStackTrace();
 51         } catch (IOException e) {
 52             e.printStackTrace();
 53         }
 54     }
 55 
 56     @Override
 57     public HTableInterface getHTableFromPool(String tableName) {
 58         HTableInterface table = pool.getTable(tableName);
 59         return table;
 60     }
 61 
 62     @Override
 63     public boolean isHTableExist(String tableName) {
 64         try {
 65             return hAdmin.tableExists(tableName);
 66         } catch (IOException e) {
 67             e.printStackTrace();
 68         }
 69         return false;
 70     }
 71 
 72     @Override
 73     public void createHTable(String tableName, String[] columnFamilys) {
 74         if (!isHTableExist(tableName)) {
 75             HTableDescriptor tableDescriptor = new HTableDescriptor(TableName.valueOf(tableName));
 76             // The Hbase suggested the number of column family should be less than 3.
 77             // Normally, there only have 1 column family.
 78             for (String cfName : columnFamilys) {
 79                 HColumnDescriptor hColumnDescriptor = new HColumnDescriptor(cfName);
 80                 tableDescriptor.addFamily(hColumnDescriptor);
 81             }
 82             try {
 83                 hAdmin.createTable(tableDescriptor);
 84             } catch (IOException e) {
 85                 e.printStackTrace();
 86             }
 87             System.out.println("The table ["   tableName   "]  is created.");
 88         } else {
 89             System.out.println("The table ["   tableName   "]  is existing already.");
 90         }
 91 
 92     }
 93 
 94     @Override
 95     public void addRow(String tableName, String rowKey, String columnFamily, String column, String value) {
 96         if (isHTableExist(tableName)) {
 97             HTableInterface table = getHTableFromPool(tableName);
 98             Put put = new Put(rowKey.getBytes());
 99             put.add(columnFamily.getBytes(), column.getBytes(), value.getBytes());
100             try {
101                 table.put(put);
102             } catch (IOException e) {
103                 e.printStackTrace();
104             }
105             System.out.println("Insert into table ["   tableName   "], Rowkey=["   rowKey   "], Column=["   columnFamily   ":"   column   "], Vlaue=["   value   "].");
106             closeTable(table);
107         } else {
108             System.out.println("The table ["   tableName   "] does not exist.");
109         }
110     }
111 
112     @Override
113     public void getRow(String tableName, String rowKey) {
114         if (isHTableExist(tableName)) {
115             HTableInterface table = getHTableFromPool(tableName);
116             Get get = new Get(rowKey.getBytes());
117             Result result;
118             try {
119                 result = table.get(get);
120                 String columnName = "";
121                 String timeStamp = "";
122                 String columnFamily = "";
123                 String value = "";
124                 for (Cell cell : result.rawCells()) {
125                     timeStamp = String.valueOf(cell.getTimestamp());
126                     columnFamily = new String(CellUtil.cloneFamily(cell));
127                     columnName = new String(CellUtil.cloneQualifier(cell));
128                     value = new String(CellUtil.cloneValue(cell));
129 
130                     System.out.println("Get from table ["   tableName   "], Rowkey=["   rowKey   "], Column=["   columnFamily   ":"   columnName   "], Timestamp=["   timeStamp   "], Vlaue=["   value   "].");
131                 }
132             } catch (IOException e) {
133                 e.printStackTrace();
134             }
135             closeTable(table);
136         } else {
137             System.out.println("The table ["   tableName   "] does not exist.");
138         }
139     }
140 
141     @Override
142     public void getAllRows(String tableName) {
143         if (isHTableExist(tableName)) {
144             Scan scan = new Scan();
145             scanHTable(tableName, scan);
146         } else {
147             System.out.println("The table ["   tableName   "] does not exist.");
148         }
149     }
150 
151     private void scanHTable(String tableName, Scan scan) {
152         try {
153             HTableInterface table = getHTableFromPool(tableName);
154             ResultScanner results = table.getScanner(scan);
155             for (Result result : results) {
156                 String rowKey = "";
157                 String columnName = "";
158                 String timeStamp = "";
159                 String columnFamily = "";
160                 String value = "";
161                 for (Cell cell : result.rawCells()) {
162                     rowKey = new String(CellUtil.cloneRow(cell));
163                     timeStamp = String.valueOf(cell.getTimestamp());
164                     columnFamily = new String(CellUtil.cloneFamily(cell));
165                     columnName = new String(CellUtil.cloneQualifier(cell));
166                     value = new String(CellUtil.cloneValue(cell));
167 
168                     System.out.println("Get from table ["   tableName   "], Rowkey=["   rowKey   "], Column=["   columnFamily   ":"   columnName   "], Timestamp=["   timeStamp   "], Vlaue=["   value   "].");
169                 }
170             }
171             closeTable(table);
172         } catch (IOException e) {
173             e.printStackTrace();
174         }
175     }
176 
177     @Override
178     public void getRowsByRange(String tableName, String startRowKey, String endRowKey) {
179         if (isHTableExist(tableName)) {
180             Scan scan = new Scan();
181             scan.setStartRow(startRowKey.getBytes());
182             // not equals Stop Row Key, it mean the result does not include the stop row record(exclusive).
183             // the hbase version is 0.98.9
184             scan.setStopRow(endRowKey.getBytes());
185             scanHTable(tableName, scan);
186         } else {
187             System.out.println("The table ["   tableName   "] does not exist.");
188         }
189     }
190 
191     @Override
192     public void delRow(String tableName, String rowKey) {
193         if (isHTableExist(tableName)) {
194             HTableInterface table = getHTableFromPool(tableName);
195             deleteRow(table, rowKey);
196         } else {
197             System.out.println("The table ["   tableName   "] does not exist.");
198         }
199     }
200 
201     private void deleteRow(HTableInterface table, String rowKey) {
202         Delete del = new Delete(rowKey.getBytes());
203         try {
204             table.delete(del);
205             System.out.println("Delete from table ["   new String(table.getTableName())   "], Rowkey=["   rowKey   "].");
206             closeTable(table);
207         } catch (IOException e) {
208             e.printStackTrace();
209         }
210     }
211 
212     @Override
213     public void delRowsByRowKeys(String tableName, List<String> rowKeys) {
214         if (rowKeys != null && rowKeys.size() > 0) {
215             for (String rowKey : rowKeys) {
216                 delRow(tableName, rowKey);
217             }
218         }
219     }
220 
221     @Override
222     public void deleteHTable(String tableName) {
223         if (isHTableExist(tableName)) {
224             try {
225                 hAdmin.disableTable(tableName.getBytes());
226                 hAdmin.deleteTable(tableName.getBytes());
227                 System.out.println("The table ["   tableName   "] is deleted.");
228             } catch (IOException e) {
229                 e.printStackTrace();
230             }
231         } else {
232             System.out.println("The table ["   tableName   "] does not exist.");
233         }
234 
235     }
236 
237     @Override
238     public void closeAutoFlush(HTableInterface table) {
239         table.setAutoFlush(false, false);
240     }
241 
242     @Override
243     public void closeTable(HTableInterface table) {
244         try {
245             table.close();
246         } catch (IOException e) {
247             e.printStackTrace();
248         }
249     }
250 
251     @Override
252     public void closePoolConnection() {
253         try {
254             pool.close();
255         } catch (IOException e) {
256             e.printStackTrace();
257         }
258     }
259 
260 }

测试类:

/hbase-util/src/com/b510/hbase/util/dao/test/HbaseDaoTest.java

代码语言:javascript复制
 1 package com.b510.hbase.util.dao.test;
 2 
 3 import java.util.ArrayList;
 4 import java.util.List;
 5 
 6 import org.junit.Test;
 7 
 8 import com.b510.hbase.util.dao.HbaseDao;
 9 import com.b510.hbase.util.dao.impl.HbaseDaoImpl;
10 
11 /**
12  * @author Hongten
13  * @created 7 Nov 2018
14  */
15 public class HbaseDaoTest {
16 
17     HbaseDao dao = new HbaseDaoImpl(4);
18 
19     public static final String tableName = "t_test";
20     public static final String columnFamilyName = "cf1";
21     public static final String[] CFs = { columnFamilyName };
22 
23     public static final String COLUMN_NAME_NAME = "name";
24     public static final String COLUMN_NAME_AGE = "age";
25 
26     @Test
27     public void main() {
28         createTable();
29         addRow();
30         getRow();
31         getAllRows();
32         getRowsByRange();
33         delRow();
34         delRowsByRowKeys();
35         deleteHTable();
36     }
37 
38     public void createTable() {
39         System.out.println("=== create table ====");
40         dao.createHTable(tableName, CFs);
41     }
42 
43     public void addRow() {
44         System.out.println("=== insert record ====");
45         dao.addRow(tableName, "12345566", columnFamilyName, COLUMN_NAME_NAME, "Hongten");
46         dao.addRow(tableName, "12345566", columnFamilyName, COLUMN_NAME_AGE, "22");
47 
48         dao.addRow(tableName, "12345567", columnFamilyName, COLUMN_NAME_NAME, "Tom");
49         dao.addRow(tableName, "12345567", columnFamilyName, COLUMN_NAME_AGE, "25");
50 
51         dao.addRow(tableName, "12345568", columnFamilyName, COLUMN_NAME_NAME, "Jone");
52         dao.addRow(tableName, "12345568", columnFamilyName, COLUMN_NAME_AGE, "30");
53 
54         dao.addRow(tableName, "12345569", columnFamilyName, COLUMN_NAME_NAME, "Jobs");
55         dao.addRow(tableName, "12345569", columnFamilyName, COLUMN_NAME_AGE, "24");
56     }
57 
58     public void getRow() {
59         System.out.println("=== get record ====");
60         dao.getRow(tableName, "12345566");
61     }
62 
63     public void getAllRows() {
64         System.out.println("=== scan table ====");
65         dao.getAllRows(tableName);
66     }
67 
68     public void getRowsByRange() {
69         System.out.println("=== scan record by giving range ====");
70         // it will return the '12345567' and '12345568' rows.
71         dao.getRowsByRange(tableName, "12345567", "12345569");
72     }
73 
74     public void delRow() {
75         System.out.println("=== delete record ====");
76         dao.delRow(tableName, "12345568");
77         // only '12345567' row.
78         getRowsByRange();
79     }
80 
81     public void delRowsByRowKeys() {
82         System.out.println("=== delete batch records ====");
83         List<String> rowKeys = new ArrayList<String>();
84         rowKeys.add("12345566");
85         rowKeys.add("12345569");
86         dao.delRowsByRowKeys(tableName, rowKeys);
87         // can not find the '12345566' and '12345569'
88         getAllRows();
89     }
90 
91     public void deleteHTable() {
92         System.out.println("=== delete table ====");
93         dao.deleteHTable(tableName);
94     }
95 }

测试结果:

代码语言:javascript复制
log4j:WARN No appenders could be found for logger (org.apache.hadoop.metrics2.lib.MutableMetricsFactory).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
=== create table ====
The table [t_test]  is created.
=== insert record ====
Insert into table [t_test], Rowkey=[12345566], Column=[cf1:name], Vlaue=[Hongten].
Insert into table [t_test], Rowkey=[12345566], Column=[cf1:age], Vlaue=[22].
Insert into table [t_test], Rowkey=[12345567], Column=[cf1:name], Vlaue=[Tom].
Insert into table [t_test], Rowkey=[12345567], Column=[cf1:age], Vlaue=[25].
Insert into table [t_test], Rowkey=[12345568], Column=[cf1:name], Vlaue=[Jone].
Insert into table [t_test], Rowkey=[12345568], Column=[cf1:age], Vlaue=[30].
Insert into table [t_test], Rowkey=[12345569], Column=[cf1:name], Vlaue=[Jobs].
Insert into table [t_test], Rowkey=[12345569], Column=[cf1:age], Vlaue=[24].
=== get record ====
Get from table [t_test], Rowkey=[12345566], Column=[cf1:age], Timestamp=[1541652952697], Vlaue=[22].
Get from table [t_test], Rowkey=[12345566], Column=[cf1:name], Timestamp=[1541652952626], Vlaue=[Hongten].
=== scan table ====
Get from table [t_test], Rowkey=[12345566], Column=[cf1:age], Timestamp=[1541652952697], Vlaue=[22].
Get from table [t_test], Rowkey=[12345566], Column=[cf1:name], Timestamp=[1541652952626], Vlaue=[Hongten].
Get from table [t_test], Rowkey=[12345567], Column=[cf1:age], Timestamp=[1541652952779], Vlaue=[25].
Get from table [t_test], Rowkey=[12345567], Column=[cf1:name], Timestamp=[1541652952743], Vlaue=[Tom].
Get from table [t_test], Rowkey=[12345568], Column=[cf1:age], Timestamp=[1541652952834], Vlaue=[30].
Get from table [t_test], Rowkey=[12345568], Column=[cf1:name], Timestamp=[1541652952807], Vlaue=[Jone].
Get from table [t_test], Rowkey=[12345569], Column=[cf1:age], Timestamp=[1541652952928], Vlaue=[24].
Get from table [t_test], Rowkey=[12345569], Column=[cf1:name], Timestamp=[1541652952869], Vlaue=[Jobs].
=== scan record by giving range ====
Get from table [t_test], Rowkey=[12345567], Column=[cf1:age], Timestamp=[1541652952779], Vlaue=[25].
Get from table [t_test], Rowkey=[12345567], Column=[cf1:name], Timestamp=[1541652952743], Vlaue=[Tom].
Get from table [t_test], Rowkey=[12345568], Column=[cf1:age], Timestamp=[1541652952834], Vlaue=[30].
Get from table [t_test], Rowkey=[12345568], Column=[cf1:name], Timestamp=[1541652952807], Vlaue=[Jone].
=== delete record ====
Delete from table [t_test], Rowkey=[12345568].
=== scan record by giving range ====
Get from table [t_test], Rowkey=[12345567], Column=[cf1:age], Timestamp=[1541652952779], Vlaue=[25].
Get from table [t_test], Rowkey=[12345567], Column=[cf1:name], Timestamp=[1541652952743], Vlaue=[Tom].
=== delete batch records ====
Delete from table [t_test], Rowkey=[12345566].
Delete from table [t_test], Rowkey=[12345569].
=== scan table ====
Get from table [t_test], Rowkey=[12345567], Column=[cf1:age], Timestamp=[1541652952779], Vlaue=[25].
Get from table [t_test], Rowkey=[12345567], Column=[cf1:name], Timestamp=[1541652952743], Vlaue=[Tom].
=== delete table ====
The table [t_test] is deleted.

源码下载:

hbase-util.zip

0 人点赞