Hbase提供了丰富的Java API,以及线程池操作,下面我用线程池来展示一下使用Java API操作Hbase。
代码语言:javascript复制 1 package com.b510.hbase.util.dao;
3 import java.util.List;
5 import org.apache.hadoop.hbase.client.HTableInterface;
8 /**
9 * @author Hongten
10 * @created 7 Nov 2018
11 */
12 public interface HbaseDao {
14 // initial table
15 public HTableInterface getHTableFromPool(String tableName);
17 // check if the table is exist
18 public boolean isHTableExist(String tableName);
20 // create table
21 public void createHTable(String tableName, String[] columnFamilys);
23 // insert new row
24 public void addRow(String tableName, String rowKey, String columnFamily, String column, String value);
26 // get row by row key
27 public void getRow(String tableName, String rowKey);
29 public void getAllRows(String tableName);
31 // get rows by giving range
32 public void getRowsByRange(String tableName, String startRowKey, String endRowKey);
34 //delete row
35 public void delRow(String tableName, String rowKey);
37 //delete rows by row keys
38 public void delRowsByRowKeys(String tableName, List<String> rowKeys);
40 // auto flush data when close
41 public void closeAutoFlush(HTableInterface table);
43 // close table
44 public void closeTable(HTableInterface table);
46 // close pool connection
47 public void closePoolConnection();
49 // delete table
50 public void deleteHTable(String tableName);
51 }
代码语言:javascript复制 1 package com.b510.hbase.util.dao.impl;
3 import java.io.IOException;
4 import java.util.List;
6 import org.apache.hadoop.conf.Configuration;
7 import org.apache.hadoop.hbase.Cell;
8 import org.apache.hadoop.hbase.CellUtil;
9 import org.apache.hadoop.hbase.HBaseConfiguration;
10 import org.apache.hadoop.hbase.HColumnDescriptor;
11 import org.apache.hadoop.hbase.HTableDescriptor;
12 import org.apache.hadoop.hbase.MasterNotRunningException;
13 import org.apache.hadoop.hbase.TableName;
14 import org.apache.hadoop.hbase.ZooKeeperConnectionException;
15 import org.apache.hadoop.hbase.client.Delete;
16 import org.apache.hadoop.hbase.client.Get;
17 import org.apache.hadoop.hbase.client.HBaseAdmin;
18 import org.apache.hadoop.hbase.client.HTableInterface;
19 import org.apache.hadoop.hbase.client.HTablePool;
20 import org.apache.hadoop.hbase.client.Put;
21 import org.apache.hadoop.hbase.client.Result;
22 import org.apache.hadoop.hbase.client.ResultScanner;
23 import org.apache.hadoop.hbase.client.Scan;
25 import com.b510.hbase.util.dao.HbaseDao;
27 /**
28 * @author Hongten
29 * @created 7 Nov 2018
30 */
31 @SuppressWarnings("deprecation")
32 public class HbaseDaoImpl implements HbaseDao {
34 private static Configuration conf = null;
35 private static HBaseAdmin hAdmin;
36 private static HTablePool pool;
38 private static int defaultPoolSize = 5;
40 public HbaseDaoImpl(int poolSize) {
41 conf = HBaseConfiguration.create();
42 conf.set("hbase.zookeeper.quorum", "node1:2888,node2:2888,node3:2888");
43 try {
44 hAdmin = new HBaseAdmin(conf);
45 // the default pool size is 5.
46 pool = new HTablePool(conf, poolSize <= 0 ? defaultPoolSize : poolSize);
47 } catch (MasterNotRunningException e) {
48 e.printStackTrace();
49 } catch (ZooKeeperConnectionException e) {
50 e.printStackTrace();
51 } catch (IOException e) {
52 e.printStackTrace();
53 }
54 }
56 @Override
57 public HTableInterface getHTableFromPool(String tableName) {
58 HTableInterface table = pool.getTable(tableName);
59 return table;
60 }
62 @Override
63 public boolean isHTableExist(String tableName) {
64 try {
65 return hAdmin.tableExists(tableName);
66 } catch (IOException e) {
67 e.printStackTrace();
68 }
69 return false;
70 }
72 @Override
73 public void createHTable(String tableName, String[] columnFamilys) {
74 if (!isHTableExist(tableName)) {
75 HTableDescriptor tableDescriptor = new HTableDescriptor(TableName.valueOf(tableName));
76 // The Hbase suggested the number of column family should be less than 3.
77 // Normally, there only have 1 column family.
78 for (String cfName : columnFamilys) {
79 HColumnDescriptor hColumnDescriptor = new HColumnDescriptor(cfName);
80 tableDescriptor.addFamily(hColumnDescriptor);
81 }
82 try {
83 hAdmin.createTable(tableDescriptor);
84 } catch (IOException e) {
85 e.printStackTrace();
86 }
87 System.out.println("The table [" tableName "] is created.");
88 } else {
89 System.out.println("The table [" tableName "] is existing already.");
90 }
92 }
94 @Override
95 public void addRow(String tableName, String rowKey, String columnFamily, String column, String value) {
96 if (isHTableExist(tableName)) {
97 HTableInterface table = getHTableFromPool(tableName);
98 Put put = new Put(rowKey.getBytes());
99 put.add(columnFamily.getBytes(), column.getBytes(), value.getBytes());
100 try {
101 table.put(put);
102 } catch (IOException e) {
103 e.printStackTrace();
104 }
105 System.out.println("Insert into table [" tableName "], Rowkey=[" rowKey "], Column=[" columnFamily ":" column "], Vlaue=[" value "].");
106 closeTable(table);
107 } else {
108 System.out.println("The table [" tableName "] does not exist.");
109 }
110 }
112 @Override
113 public void getRow(String tableName, String rowKey) {
114 if (isHTableExist(tableName)) {
115 HTableInterface table = getHTableFromPool(tableName);
116 Get get = new Get(rowKey.getBytes());
117 Result result;
118 try {
119 result = table.get(get);
120 String columnName = "";
121 String timeStamp = "";
122 String columnFamily = "";
123 String value = "";
124 for (Cell cell : result.rawCells()) {
125 timeStamp = String.valueOf(cell.getTimestamp());
126 columnFamily = new String(CellUtil.cloneFamily(cell));
127 columnName = new String(CellUtil.cloneQualifier(cell));
128 value = new String(CellUtil.cloneValue(cell));
130 System.out.println("Get from table [" tableName "], Rowkey=[" rowKey "], Column=[" columnFamily ":" columnName "], Timestamp=[" timeStamp "], Vlaue=[" value "].");
131 }
132 } catch (IOException e) {
133 e.printStackTrace();
134 }
135 closeTable(table);
136 } else {
137 System.out.println("The table [" tableName "] does not exist.");
138 }
139 }
141 @Override
142 public void getAllRows(String tableName) {
143 if (isHTableExist(tableName)) {
144 Scan scan = new Scan();
145 scanHTable(tableName, scan);
146 } else {
147 System.out.println("The table [" tableName "] does not exist.");
148 }
149 }
151 private void scanHTable(String tableName, Scan scan) {
152 try {
153 HTableInterface table = getHTableFromPool(tableName);
154 ResultScanner results = table.getScanner(scan);
155 for (Result result : results) {
156 String rowKey = "";
157 String columnName = "";
158 String timeStamp = "";
159 String columnFamily = "";
160 String value = "";
161 for (Cell cell : result.rawCells()) {
162 rowKey = new String(CellUtil.cloneRow(cell));
163 timeStamp = String.valueOf(cell.getTimestamp());
164 columnFamily = new String(CellUtil.cloneFamily(cell));
165 columnName = new String(CellUtil.cloneQualifier(cell));
166 value = new String(CellUtil.cloneValue(cell));
168 System.out.println("Get from table [" tableName "], Rowkey=[" rowKey "], Column=[" columnFamily ":" columnName "], Timestamp=[" timeStamp "], Vlaue=[" value "].");
169 }
170 }
171 closeTable(table);
172 } catch (IOException e) {
173 e.printStackTrace();
174 }
175 }
177 @Override
178 public void getRowsByRange(String tableName, String startRowKey, String endRowKey) {
179 if (isHTableExist(tableName)) {
180 Scan scan = new Scan();
181 scan.setStartRow(startRowKey.getBytes());
182 // not equals Stop Row Key, it mean the result does not include the stop row record(exclusive).
183 // the hbase version is 0.98.9
184 scan.setStopRow(endRowKey.getBytes());
185 scanHTable(tableName, scan);
186 } else {
187 System.out.println("The table [" tableName "] does not exist.");
188 }
189 }
191 @Override
192 public void delRow(String tableName, String rowKey) {
193 if (isHTableExist(tableName)) {
194 HTableInterface table = getHTableFromPool(tableName);
195 deleteRow(table, rowKey);
196 } else {
197 System.out.println("The table [" tableName "] does not exist.");
198 }
199 }
201 private void deleteRow(HTableInterface table, String rowKey) {
202 Delete del = new Delete(rowKey.getBytes());
203 try {
204 table.delete(del);
205 System.out.println("Delete from table [" new String(table.getTableName()) "], Rowkey=[" rowKey "].");
206 closeTable(table);
207 } catch (IOException e) {
208 e.printStackTrace();
209 }
210 }
212 @Override
213 public void delRowsByRowKeys(String tableName, List<String> rowKeys) {
214 if (rowKeys != null && rowKeys.size() > 0) {
215 for (String rowKey : rowKeys) {
216 delRow(tableName, rowKey);
217 }
218 }
219 }
221 @Override
222 public void deleteHTable(String tableName) {
223 if (isHTableExist(tableName)) {
224 try {
225 hAdmin.disableTable(tableName.getBytes());
226 hAdmin.deleteTable(tableName.getBytes());
227 System.out.println("The table [" tableName "] is deleted.");
228 } catch (IOException e) {
229 e.printStackTrace();
230 }
231 } else {
232 System.out.println("The table [" tableName "] does not exist.");
233 }
235 }
237 @Override
238 public void closeAutoFlush(HTableInterface table) {
239 table.setAutoFlush(false, false);
240 }
242 @Override
243 public void closeTable(HTableInterface table) {
244 try {
245 table.close();
246 } catch (IOException e) {
247 e.printStackTrace();
248 }
249 }
251 @Override
252 public void closePoolConnection() {
253 try {
254 pool.close();
255 } catch (IOException e) {
256 e.printStackTrace();
257 }
258 }
260 }
代码语言:javascript复制 1 package com.b510.hbase.util.dao.test;
3 import java.util.ArrayList;
4 import java.util.List;
6 import org.junit.Test;
8 import com.b510.hbase.util.dao.HbaseDao;
9 import com.b510.hbase.util.dao.impl.HbaseDaoImpl;
11 /**
12 * @author Hongten
13 * @created 7 Nov 2018
14 */
15 public class HbaseDaoTest {
17 HbaseDao dao = new HbaseDaoImpl(4);
19 public static final String tableName = "t_test";
20 public static final String columnFamilyName = "cf1";
21 public static final String[] CFs = { columnFamilyName };
23 public static final String COLUMN_NAME_NAME = "name";
24 public static final String COLUMN_NAME_AGE = "age";
26 @Test
27 public void main() {
28 createTable();
29 addRow();
30 getRow();
31 getAllRows();
32 getRowsByRange();
33 delRow();
34 delRowsByRowKeys();
35 deleteHTable();
36 }
38 public void createTable() {
39 System.out.println("=== create table ====");
40 dao.createHTable(tableName, CFs);
41 }
43 public void addRow() {
44 System.out.println("=== insert record ====");
45 dao.addRow(tableName, "12345566", columnFamilyName, COLUMN_NAME_NAME, "Hongten");
46 dao.addRow(tableName, "12345566", columnFamilyName, COLUMN_NAME_AGE, "22");
48 dao.addRow(tableName, "12345567", columnFamilyName, COLUMN_NAME_NAME, "Tom");
49 dao.addRow(tableName, "12345567", columnFamilyName, COLUMN_NAME_AGE, "25");
51 dao.addRow(tableName, "12345568", columnFamilyName, COLUMN_NAME_NAME, "Jone");
52 dao.addRow(tableName, "12345568", columnFamilyName, COLUMN_NAME_AGE, "30");
54 dao.addRow(tableName, "12345569", columnFamilyName, COLUMN_NAME_NAME, "Jobs");
55 dao.addRow(tableName, "12345569", columnFamilyName, COLUMN_NAME_AGE, "24");
56 }
58 public void getRow() {
59 System.out.println("=== get record ====");
60 dao.getRow(tableName, "12345566");
61 }
63 public void getAllRows() {
64 System.out.println("=== scan table ====");
65 dao.getAllRows(tableName);
66 }
68 public void getRowsByRange() {
69 System.out.println("=== scan record by giving range ====");
70 // it will return the '12345567' and '12345568' rows.
71 dao.getRowsByRange(tableName, "12345567", "12345569");
72 }
74 public void delRow() {
75 System.out.println("=== delete record ====");
76 dao.delRow(tableName, "12345568");
77 // only '12345567' row.
78 getRowsByRange();
79 }
81 public void delRowsByRowKeys() {
82 System.out.println("=== delete batch records ====");
83 List<String> rowKeys = new ArrayList<String>();
84 rowKeys.add("12345566");
85 rowKeys.add("12345569");
86 dao.delRowsByRowKeys(tableName, rowKeys);
87 // can not find the '12345566' and '12345569'
88 getAllRows();
89 }
91 public void deleteHTable() {
92 System.out.println("=== delete table ====");
93 dao.deleteHTable(tableName);
94 }
95 }
代码语言:javascript复制log4j:WARN No appenders could be found for logger (org.apache.hadoop.metrics2.lib.MutableMetricsFactory).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
=== create table ====
The table [t_test] is created.
=== insert record ====
Insert into table [t_test], Rowkey=[12345566], Column=[cf1:name], Vlaue=[Hongten].
Insert into table [t_test], Rowkey=[12345566], Column=[cf1:age], Vlaue=[22].
Insert into table [t_test], Rowkey=[12345567], Column=[cf1:name], Vlaue=[Tom].
Insert into table [t_test], Rowkey=[12345567], Column=[cf1:age], Vlaue=[25].
Insert into table [t_test], Rowkey=[12345568], Column=[cf1:name], Vlaue=[Jone].
Insert into table [t_test], Rowkey=[12345568], Column=[cf1:age], Vlaue=[30].
Insert into table [t_test], Rowkey=[12345569], Column=[cf1:name], Vlaue=[Jobs].
Insert into table [t_test], Rowkey=[12345569], Column=[cf1:age], Vlaue=[24].
=== get record ====
Get from table [t_test], Rowkey=[12345566], Column=[cf1:age], Timestamp=[1541652952697], Vlaue=[22].
Get from table [t_test], Rowkey=[12345566], Column=[cf1:name], Timestamp=[1541652952626], Vlaue=[Hongten].
=== scan table ====
Get from table [t_test], Rowkey=[12345566], Column=[cf1:age], Timestamp=[1541652952697], Vlaue=[22].
Get from table [t_test], Rowkey=[12345566], Column=[cf1:name], Timestamp=[1541652952626], Vlaue=[Hongten].
Get from table [t_test], Rowkey=[12345567], Column=[cf1:age], Timestamp=[1541652952779], Vlaue=[25].
Get from table [t_test], Rowkey=[12345567], Column=[cf1:name], Timestamp=[1541652952743], Vlaue=[Tom].
Get from table [t_test], Rowkey=[12345568], Column=[cf1:age], Timestamp=[1541652952834], Vlaue=[30].
Get from table [t_test], Rowkey=[12345568], Column=[cf1:name], Timestamp=[1541652952807], Vlaue=[Jone].
Get from table [t_test], Rowkey=[12345569], Column=[cf1:age], Timestamp=[1541652952928], Vlaue=[24].
Get from table [t_test], Rowkey=[12345569], Column=[cf1:name], Timestamp=[1541652952869], Vlaue=[Jobs].
=== scan record by giving range ====
Get from table [t_test], Rowkey=[12345567], Column=[cf1:age], Timestamp=[1541652952779], Vlaue=[25].
Get from table [t_test], Rowkey=[12345567], Column=[cf1:name], Timestamp=[1541652952743], Vlaue=[Tom].
Get from table [t_test], Rowkey=[12345568], Column=[cf1:age], Timestamp=[1541652952834], Vlaue=[30].
Get from table [t_test], Rowkey=[12345568], Column=[cf1:name], Timestamp=[1541652952807], Vlaue=[Jone].
=== delete record ====
Delete from table [t_test], Rowkey=[12345568].
=== scan record by giving range ====
Get from table [t_test], Rowkey=[12345567], Column=[cf1:age], Timestamp=[1541652952779], Vlaue=[25].
Get from table [t_test], Rowkey=[12345567], Column=[cf1:name], Timestamp=[1541652952743], Vlaue=[Tom].
=== delete batch records ====
Delete from table [t_test], Rowkey=[12345566].
Delete from table [t_test], Rowkey=[12345569].
=== scan table ====
Get from table [t_test], Rowkey=[12345567], Column=[cf1:age], Timestamp=[1541652952779], Vlaue=[25].
Get from table [t_test], Rowkey=[12345567], Column=[cf1:name], Timestamp=[1541652952743], Vlaue=[Tom].
=== delete table ====
The table [t_test] is deleted.