Hbase
安装
这里我使用docker安装,就直接给出命令了
首次启动 输入下列命令
代码语言:javascript复制docker run -d -h myhbase -p 2181:2181 -p 8080:8080 -p 8085:8085 -p 9090:9090 -p 9095:9095 -p 16000:16000 -p 16010:16010 -p 16201:16201 -p 16301:16301 --name hbase harisekhon/hbase
这里设置的hbase的主机名为myhbase d 后续启动,输入
代码语言:javascript复制docker start hbase
进入hbase shell命令为
代码语言:javascript复制docker exec -it hbase /bin/bash
hbase shell
shell
查看状态 status
代码语言:javascript复制hbase> status
查看所有表 list
代码语言:javascript复制hbase> list
创建表 create
代码语言:javascript复制hbase> create 'FileTable','fileInfo','saveInfo'
获取表的相关信息 desc
代码语言:javascript复制hbase> desc 'FileTable'
添加列族 alter
代码语言:javascript复制hbase> alter 'FileTable','cf'
删除列族 alter
代码语言:javascript复制hbase> alter 'FileTable',{NAME=>'cf',METHOD=>'delete'}
插入数据 put
代码语言:javascript复制hbase> put 'FileTable','rowKey1','fileInfo:name','file1.txt'
hbase> put 'FileTable','rowKey1','fileInfo:type','txt'
hbase> put 'FileTable','rowKey1','fileInfo:size','1024'
hbase> put 'FileTable','rowKey1','saveInfo:path','/home'
hbase> put 'FileTable','rowKey1','saveInfo:creator','tom'
统计行数 count
代码语言:javascript复制hbase> count 'FileTable'
查询数据 get
代码语言:javascript复制hbase> get 'FileTable','rowKey1'
只看指定列族的数据
代码语言:javascript复制hbase> get 'FileTable','rowKey1','fileInfo'
查询整张表的数据 scan
代码语言:javascript复制hbase> scan 'FileTable'
指定列族
代码语言:javascript复制hbase> scan 'FileTable',{COLUMN=>'fileInfo'}
包含其他条件
代码语言:javascript复制hbase> scan 'FileTable',{STARTROW=>'rowKey1',LIMIT=>1,VERSION=>1}
删除数据 delete
代码语言:javascript复制hbase> delete 'fileTable','rowKey1','fileInfo:size'
删除整行数据
代码语言:javascript复制hbase> deleteall 'fileTable','rowKey1'
删除表 disable drop 先禁用 再删除
代码语言:javascript复制hbase> disable 'FileTable'
hbase> drop 'FileTable'
检查是否禁用 is_enabled is_disabled
代码语言:javascript复制hbase> is_enabled 'FileTable'
hbase> is_disabled 'FileTable'
判断表是否存在 exists
代码语言:javascript复制hbase> exists 'FileTable'
java api连接hbase
hbase连接类
代码语言:javascript复制import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Table;
import java.io.IOException;
/**
* 单例habse连接
*
* @author: EarthChen
* @date: 2018/07/27
*/
public class HbaseConn {
/**
* Hbase连接
*/
private static final HbaseConn INSTANCE = new HbaseConn();
/**
* hbase连接配置
*/
private static Configuration configuration;
/**
* 连接
*/
private static Connection connection;
private HbaseConn() {
try {
if (configuration == null) {
configuration = HBaseConfiguration.create();
configuration.set("hbase.zookeeper.quorum", "172.17.0.1");
configuration.set("hbase.zookeeper.property.clientPort", "2181");
}
} catch (Exception e) {
e.printStackTrace();
}
}
private Connection getConnection() {
if (connection == null || connection.isClosed()) {
try {
connection = ConnectionFactory.createConnection(configuration);
} catch (Exception e) {
e.printStackTrace();
}
}
return connection;
}
/**
* 获取hbase连接
*
* @return
*/
public static Connection getHbaseConn() {
return INSTANCE.getConnection();
}
/**
* 获取表
*
* @return
*/
public static Table getTable(String tableName) throws IOException {
return INSTANCE.getConnection().getTable(TableName.valueOf(tableName));
}
/**
* 关闭连接
*/
public static void closeConn() {
if (connection != null) {
try {
connection.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
注意:
- 由于我用的是虚拟机,需要在本机hosts中配置一下主机名和ip的对应关系我这里是172.16.0.1 myhbase,不然会提示未知的主机名myhbase
- 如果不配置host,将会出现
org.apache.hadoop.hbase.client.RetriesExhaustedException: Failed after attempts=36, exceptions:
Sat Jul 28 21:51:33 CST 2018, null, java.net.SocketTimeoutException: callTimeout=60000, callDuration=68531: myhbase row 'FileTable,,' on table 'hbase:meta' at region=hbase:meta,,1.1588230740, hostname=myhbase,16201,1532771272296, seqNum=0
at org.apache.hadoop.hbase.client.RpcRetryingCallerWithReadReplicas.throwEnrichedException(RpcRetryingCallerWithReadReplicas.java:276)
at org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.call(ScannerCallableWithReplicas.java:210)
at org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.call(ScannerCallableWithReplicas.java:60)
at org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithoutRetries(RpcRetryingCaller.java:212)
at org.apache.hadoop.hbase.client.ClientScanner.call(ClientScanner.java:314)
at org.apache.hadoop.hbase.client.ClientScanner.nextScanner(ClientScanner.java:289)
at org.apache.hadoop.hbase.client.ClientScanner.initializeScannerInConstruction(ClientScanner.java:164)
at org.apache.hadoop.hbase.client.ClientScanner.<init>(ClientScanner.java:159)
at org.apache.hadoop.hbase.client.HTable.getScanner(HTable.java:796)
at org.apache.hadoop.hbase.MetaTableAccessor.fullScan(MetaTableAccessor.java:602)
at org.apache.hadoop.hbase.MetaTableAccessor.tableExists(MetaTableAccessor.java:366)
at org.apache.hadoop.hbase.client.HBaseAdmin.tableExists(HBaseAdmin.java:408)
at org.apache.hadoop.hbase.client.HBaseAdmin.tableExists(HBaseAdmin.java:418)
at com.earthchen.hbase.api.HbaseUtil.createTable(HbaseUtil.java:33)
at com.earthchen.hbase.api.HbaseUtilTest.createTable(HbaseUtilTest.java:15)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47)
at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
Caused by: java.net.SocketTimeoutException: callTimeout=60000, callDuration=68531: myhbase row 'FileTable,,' on table 'hbase:meta' at region=hbase:meta,,1.1588230740, hostname=myhbase,16201,1532771272296, seqNum=0
at org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithRetries(RpcRetryingCaller.java:171)
at org.apache.hadoop.hbase.client.ResultBoundedCompletionService$QueueingFuture.run(ResultBoundedCompletionService.java:65)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.net.UnknownHostException: myhbase
连接类的单元测试
代码语言:javascript复制import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Table;
import org.junit.Assert;
import org.junit.Test;
import java.io.IOException;
import static org.junit.Assert.*;
/**
* @author: EarthChen
* @date: 2018/07/27
*/
public class HbaseConnTest {
@Test
public void getConnTest() {
Connection connection = HbaseConn.getHbaseConn();
Assert.assertFalse(connection.isClosed());
HbaseConn.closeConn();
Assert.assertTrue(connection.isClosed());
}
@Test
public void getTableTest() {
try {
Table table = HbaseConn.getTable("test");
System.out.println(table.getName().getNameAsString());
table.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
对hbase表的基本操作
代码语言:javascript复制import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.util.Bytes;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
/**
* 对hbase的crd
* <p>
* (hbase数据都是byte数组,并且hbase没有更新操作)
*
* @author: EarthChen
* @date: 2018/07/28
*/
public class HbaseUtil {
/**
* 新建hbase表
*
* @param tableName 表名
* @param cfs 列族数组
* @return 是否创建成功
*/
public static boolean createTable(String tableName, String[] cfs) {
try (HBaseAdmin admin = (HBaseAdmin) HbaseConn.getHbaseConn().getAdmin()) {
if (admin.tableExists(tableName)) {
return false;
}
HTableDescriptor tableDescriptor = new HTableDescriptor(TableName.valueOf(tableName));
// 列族相关信息
Arrays.stream(cfs).forEach(cf -> {
HColumnDescriptor columnDescriptor = new HColumnDescriptor(cf);
columnDescriptor.setMaxVersions(1);
// 增加列族
tableDescriptor.addFamily(columnDescriptor);
});
// 创建表
admin.createTable(tableDescriptor);
} catch (Exception e) {
e.printStackTrace();
}
return true;
}
/**
* 添加列
* <p>
* hbase插入一条数据
*
* @param tableName 表名
* @param rowKey 唯一标识
* @param cfName 列族名
* @param qualifier 列标识
* @param data 数据
* @return 是否插入成功
*/
public static boolean putRow(String tableName, String rowKey, String cfName, String qualifier, String data) {
// 获取表的实例
try (Table table = HbaseConn.getTable(tableName)) {
// 新建put数据模型
Put put = new Put(Bytes.toBytes(rowKey));
put.addColumn(Bytes.toBytes(cfName), Bytes.toBytes(qualifier), Bytes.toBytes(data));
// 插入数据
table.put(put);
} catch (IOException e) {
e.printStackTrace();
}
return true;
}
/**
* 批量插入数据
*
* @param tableName 表名
* @param putList put数据模型
* @return 是否插入成功
*/
public static boolean putRows(String tableName, List<Put> putList) {
// 获取表的实例
try (Table table = HbaseConn.getTable(tableName)) {
table.put(putList);
} catch (IOException e) {
e.printStackTrace();
}
return true;
}
/**
* 获取单条数据
*
* @param tableName 表名
* @param rowKey 唯一标识
* @return
*/
public static Result getRow(String tableName, String rowKey) {
// 获取表的实例
try (Table table = HbaseConn.getTable(tableName)) {
// 获取get实例
Get get = new Get(Bytes.toBytes(rowKey));
return table.get(get);
} catch (IOException e) {
e.printStackTrace();
}
return null;
}
/**
* 通过过滤器获取单条数据
*
* @param tableName
* @param rowKey
* @param filterList
* @return
*/
public static Result getRow(String tableName, String rowKey, FilterList filterList) {
// 获取表的实例
try (Table table = HbaseConn.getTable(tableName)) {
// 获取get实例
Get get = new Get(Bytes.toBytes(rowKey));
get.setFilter(filterList);
return table.get(get);
} catch (IOException e) {
e.printStackTrace();
}
return null;
}
/**
* 批量检索数据,缓存1000
*
* @param tableName 表名
* @return
*/
public static ResultScanner getScanner(String tableName) {
// 获取表的实例
try (Table table = HbaseConn.getTable(tableName)) {
Scan scan = new Scan();
scan.setCaching(1000);
return table.getScanner(scan);
} catch (IOException e) {
e.printStackTrace();
}
return null;
}
/**
* 批量检索数据
*
* @param tableName 表名
* @param startRowKey 起始rowKey
* @param endRowKey 终止rowKey
* @return ResultScanner
*/
public static ResultScanner getScanner(String tableName, String startRowKey, String endRowKey) {
// 获取表的实例
try (Table table = HbaseConn.getTable(tableName)) {
Scan scan = new Scan();
scan.setStartRow(Bytes.toBytes(startRowKey));
scan.setStopRow(Bytes.toBytes(endRowKey));
scan.setCaching(1000);
return table.getScanner(scan);
} catch (IOException e) {
e.printStackTrace();
}
return null;
}
/**
* 使用过滤器检索数据
*
* @param tableName
* @param startRowKey
* @param endRowKey
* @return
*/
public static ResultScanner getScanner(String tableName, String startRowKey, String endRowKey, FilterList filterList) {
// 获取表的实例
try (Table table = HbaseConn.getTable(tableName)) {
Scan scan = new Scan();
scan.setStartRow(Bytes.toBytes(startRowKey));
scan.setStopRow(Bytes.toBytes(endRowKey));
// 设置过滤器
scan.setFilter(filterList);
scan.setCaching(1000);
return table.getScanner(scan);
} catch (IOException e) {
e.printStackTrace();
}
return null;
}
/**
* hbase 删除一行记录
*
* @param tableName 表名
* @param rowKey 唯一标识
* @return 是否删除成功
*/
public static boolean deleteRow(String tableName, String rowKey) {
// 获取表的实例
try (Table table = HbaseConn.getTable(tableName)) {
Delete delete = new Delete(Bytes.toBytes(rowKey));
table.delete(delete);
} catch (IOException e) {
e.printStackTrace();
}
return true;
}
/**
* 删除列族
*
* @param tableName
* @param cfName
* @return
*/
public static boolean deleteColumnFamily(String tableName, String cfName) {
try (HBaseAdmin admin = (HBaseAdmin) HbaseConn.getHbaseConn().getAdmin()) {
admin.deleteColumn(tableName, cfName);
} catch (Exception e) {
e.printStackTrace();
}
return true;
}
/**
* 删除列标识
*
* @param tableName 表名
* @param rowKey 唯一标识
* @param cfName 列族名
* @param qualifier 列标识
* @return 是否删除
*/
public static boolean deleteQualifier(String tableName, String rowKey, String cfName, String qualifier) {
// 获取表的实例
try (Table table = HbaseConn.getTable(tableName)) {
Delete delete = new Delete(Bytes.toBytes(rowKey));
delete.addColumn(Bytes.toBytes(cfName), Bytes.toBytes(qualifier));
table.delete(delete);
} catch (IOException e) {
e.printStackTrace();
}
return true;
}
}
几个单元测试
代码语言:javascript复制import org.apache.commons.collections.bag.HashBag;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.exceptions.HBaseException;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.Test;
import static org.junit.Assert.*;
/**
* @author: EarthChen
* @date: 2018/07/28
*/
public class HbaseUtilTest {
@Test
public void createTable() {
HbaseUtil.createTable("FileTable", new String[]{"fileInfo", "saveInfo"});
}
@Test
public void putRow() {
HbaseUtil.putRow("FileTable", "rowKey1", "fileInfo", "name", "file1.txt");
HbaseUtil.putRow("FileTable", "rowKey1", "fileInfo", "type", "txt");
HbaseUtil.putRow("FileTable", "rowKey1", "fileInfo", "size", "1024");
HbaseUtil.putRow("FileTable", "rowKey1", "saveInfo", "creator", "earthchen");
HbaseUtil.putRow("FileTable", "rowKey2", "fileInfo", "name", "file1.txt");
HbaseUtil.putRow("FileTable", "rowKey2", "fileInfo", "type", "txt");
HbaseUtil.putRow("FileTable", "rowKey2", "fileInfo", "size", "1024");
HbaseUtil.putRow("FileTable", "rowKey2", "saveInfo", "creator", "earthchen");
}
@Test
public void getRow() {
Result result = HbaseUtil.getRow("FileTable", "rowKey1");
if (result != null) {
System.out.println(Bytes.toString(result.getRow()));
System.out.println(Bytes.toString(result.getValue(Bytes.toBytes("fileInfo"), Bytes.toBytes("name"))));
}
}
@Test
public void getScanner() {
ResultScanner scanner = HbaseUtil.getScanner("FileTable", "rowKey2", "rowKey2");
if (scanner != null) {
scanner.forEach(result -> {
System.out.println(Bytes.toString(result.getRow()));
System.out.println(Bytes.toString(result.getValue(Bytes.toBytes("fileInfo"), Bytes.toBytes("name"))));
});
scanner.close();
}
}
@Test
public void deleteRow() {
HbaseUtil.deleteRow("FileTable", "rowKey2");
}
@Test
public void deleteTable() {
HbaseUtil.deleteTable("FileTable");
}
}
过滤器
代码语言:javascript复制/**
* 行过滤器
*/
@Test
public void rowFilterTest() {
Filter filter = new RowFilter(CompareFilter.CompareOp.EQUAL, new BinaryComparator(Bytes.toBytes("rowKey1")));
FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ALL,
Collections.singletonList(filter));
ResultScanner resultScanner = HbaseUtil.getScanner("FileTable", "rowKey1", "rowKey3", filterList);
if (resultScanner != null) {
resultScanner.forEach(result -> {
System.out.println(Bytes.toString(result.getRow()));
System.out.println(Bytes.toString(result.getValue(Bytes.toBytes("fileInfo"), Bytes.toBytes("name"))));
});
}
}
/**
* 前缀过滤器
*/
@Test
public void prefixFilterTest() {
Filter filter = new PrefixFilter(Bytes.toBytes("rowKey"));
FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ALL,
Collections.singletonList(filter));
ResultScanner resultScanner = HbaseUtil.getScanner("FileTable", "rowKey1", "rowKey3", filterList);
if (resultScanner != null) {
resultScanner.forEach(result -> {
System.out.println(Bytes.toString(result.getRow()));
System.out.println(Bytes.toString(result.getValue(Bytes.toBytes("fileInfo"), Bytes.toBytes("name"))));
});
}
}
/**
* key过滤器
*/
@Test
public void keyOnlyFilerTest() {
Filter filter = new KeyOnlyFilter(true);
FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ALL,
Collections.singletonList(filter));
ResultScanner resultScanner = HbaseUtil.getScanner("FileTable", "rowKey1", "rowKey3", filterList);
if (resultScanner != null) {
resultScanner.forEach(result -> {
System.out.println(Bytes.toString(result.getRow()));
System.out.println(Bytes.toString(result.getValue(Bytes.toBytes("fileInfo"), Bytes.toBytes("name"))));
});
}
}
/**
* 行前缀过滤器
*/
@Test
public void columnPrefixFilter() {
Filter filter = new ColumnPrefixFilter(Bytes.toBytes("nam"));
FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ALL,
Collections.singletonList(filter));
ResultScanner resultScanner = HbaseUtil.getScanner("FileTable", "rowKey1", "rowKey3", filterList);
if (resultScanner != null) {
resultScanner.forEach(result -> {
System.out.println(Bytes.toString(result.getRow()));
System.out.println(Bytes.toString(result.getValue(Bytes.toBytes("fileInfo"), Bytes.toBytes("name"))));
System.out.println(Bytes.toString(result.getValue(Bytes.toBytes("fileInfo"), Bytes.toBytes("type"))));
});
}
}
phoenix
安装
下载和你hbase对应的phoenix包
将两个jar包复制到hbase的lib目录下
代码语言:javascript复制cp phoenix-core-4.14.0-HBase-1.3.jar ../hbase-1.3.1/lib/
cp phoenix-4.14.0-HBase-1.3-server.jar ../hbase-1.3.1/lib/
进入hbase的bin目录下,重启hbase服务即可
代码语言:javascript复制./stop-hbase.sh
./start-hbase.sh
测试,进入phoenix的bin目录下,输入以下命令
代码语言:javascript复制./sqlline.py
得到以下输出即为成功:
Setting property: [incremental, false]
Setting property: [isolation, TRANSACTION_READ_COMMITTED]
issuing: !connect jdbc:phoenix: none none org.apache.phoenix.jdbc.PhoenixDriver
Connecting to jdbc:phoenix:
18/07/28 17:54:16 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Connected to: Phoenix (version 4.14)
Driver: PhoenixEmbeddedDriver (version 4.14)
Autocommit status: true
Transaction isolation: TRANSACTION_READ_COMMITTED
Building list of tables and columns for tab-completion (set fastconnect to true to skip)...
136/136 (100%) Done
Done
sqlline version 1.2.0
0: jdbc:phoenix:>
使用phoenix对hbase进行crud
创建表
代码语言:javascript复制create table if not exists person (ID INTEGER NOT NULL PRIMARY KEY,NAME VARCHAR(20),AGE INTEGER);
查询
代码语言:javascript复制select * from person;
插入一条数据
代码语言:javascript复制upsert into person (id,name,age) values(100,'tom',22);
upsert into person (id,name,age) values(101,'tom1',22);
upsert into person (id,name,age) values(102,'tom2',22);
修改表的结构
代码语言:javascript复制alter table person add sex varchar(10);