HBase 配置
hbase-site.xml
代码语言:javascript复制<configuration>
<property>
<name>hbase.cluster.distributed</name>
<value>true</value>
</property>
<property>
<name>hbase.tmp.dir</name>
<value>./tmp</value>
</property>
<property>
<name>hbase.unsafe.stream.capability.enforce</name>
<value>false</value>
</property>
<property>
<name>hbase.rootdir</name>
<value>hdfs://hadoop-1:9000/hbase</value>
</property>
<property>
<name>hbase.zookeeper.quorum</name>
<value>hadoop-1</value>
</property>
</configuration>
Java 客户端 HBase操作
添加依赖
代码语言:javascript复制<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>2.2.3</version>
</dependency>
获取连接
代码语言:javascript复制private static Connection getConn() throws IOException {
Configuration conf = HBaseConfiguration.create();
conf.set("hadoop.rootdir", "hdfs://hadoop-1:9000/hbase");
conf.set("hbase.zookeeper.quorum", "hadoop-1");
return ConnectionFactory.createConnection(conf);
}
添加表
代码语言:javascript复制private static void createTable(String tableName, List<String> columnNames) throws IOException {
Connection conn = getConn();
Admin admin = conn.getAdmin();
TableDescriptorBuilder tableDescriptorBuilder = TableDescriptorBuilder.newBuilder(TableName.valueOf(tableName));
List<ColumnFamilyDescriptor> families = new ArrayList<>();
for (String columnName : columnNames) {
families.add(ColumnFamilyDescriptorBuilder.of(columnName));
}
tableDescriptorBuilder.setColumnFamilies(families);
admin.createTable(tableDescriptorBuilder.build());
close(conn);
}
比如我们调用:
代码语言:javascript复制createTable("emp", Arrays.asList("id", "name"));
然后使用 desc 'emp'
可以看到创建好的表结构
hbase(main):013:0* desc 'emp'
Table emp is ENABLED
emp
COLUMN FAMILIES DESCRIPTION
{NAME => 'id', VERSIONS => '1', EVICT_BLOCKS_ON_CLOSE => 'false', NEW_VERSION_BEHAVIOR => 'false', KEEP_DELETED_CELLS => 'FALSE', CACHE_DATA_ON_WRITE => 'fals
e', DATA_BLOCK_ENCODING => 'NONE', TTL => 'FOREVER', MIN_VERSIONS => '0', REPLICATION_SCOPE => '0', BLOOMFILTER => 'ROW', CACHE_INDEX_ON_WRITE => 'false', IN_
MEMORY => 'false', CACHE_BLOOMS_ON_WRITE => 'false', PREFETCH_BLOCKS_ON_OPEN => 'false', COMPRESSION => 'NONE', BLOCKCACHE => 'true', BLOCKSIZE => '65536'}
{NAME => 'name', VERSIONS => '1', EVICT_BLOCKS_ON_CLOSE => 'false', NEW_VERSION_BEHAVIOR => 'false', KEEP_DELETED_CELLS => 'FALSE', CACHE_DATA_ON_WRITE => 'fa
lse', DATA_BLOCK_ENCODING => 'NONE', TTL => 'FOREVER', MIN_VERSIONS => '0', REPLICATION_SCOPE => '0', BLOOMFILTER => 'ROW', CACHE_INDEX_ON_WRITE => 'false', I
N_MEMORY => 'false', CACHE_BLOOMS_ON_WRITE => 'false', PREFETCH_BLOCKS_ON_OPEN => 'false', COMPRESSION => 'NONE', BLOCKCACHE => 'true', BLOCKSIZE => '65536'}
2 row(s)
QUOTAS
0 row(s)
Took 4.3331 seconds
删除表
删除表之前需要表的状态是disable状态,所以需要先禁用表
代码语言:javascript复制private static void deleteTable(String tableName) throws IOException {
Connection conn = getConn();
Admin admin = conn.getAdmin();
admin.disableTable(TableName.valueOf(tableName));
admin.deleteTable(TableName.valueOf(tableName));
close(conn);
}
添加、修改记录(整行、列族、列)
可以addColumn来设置多个值
代码语言:javascript复制private static void insert(String tableName, String rowKey, String family, String qualifier, String value) throws IOException {
Connection conn = getConn();
Table table = conn.getTable(TableName.valueOf(tableName));
table.put(new Put(Bytes.toBytes(rowKey)).addColumn(Bytes.toBytes(family), Bytes.toBytes(qualifier),
Bytes.toBytes(value)));
close(conn);
}
删除记录(整行、列族、列)
如果指定了 family或者 qualifier则删除的是部分字段,否则是删除整行
代码语言:javascript复制private static void delete(String tableName, String rowKey, String family, String qualifier) throws IOException {
Connection conn = getConn();
Table table = conn.getTable(TableName.valueOf(tableName));
Delete delete = new Delete(Bytes.toBytes(rowKey));
if (StringUtils.isNotBlank(qualifier)) {
delete.addColumn(Bytes.toBytes(family), Bytes.toBytes(qualifier));
} else if (StringUtils.isNotBlank(family)) {
delete.addFamily(Bytes.toBytes(family));
}
table.delete(delete);
close(conn);
}
获取单条数据(整行、列族、列)
代码语言:javascript复制private static Map<String, String> get(String tableName, String rowKey) throws IOException {
Connection conn = getConn();
Table table = conn.getTable(TableName.valueOf(tableName));
Get get = new Get(Bytes.toBytes(rowKey));
Result result = table.get(get);
Map<String, String> map = new HashMap<>();
for (Cell listCell : result.listCells()) {
String family = Bytes.toString(CellUtil.cloneFamily(listCell));
String columnName = Bytes.toString(CellUtil.cloneQualifier(listCell));
String value = Bytes.toString(CellUtil.cloneValue(listCell));
map.put(family ":" columnName, value);
}
close(conn);
return map;
}
获取多条数据
代码语言:javascript复制public static List<Map<String, String>> scan(String tableName) throws IOException {
Connection conn = getConn();
Table table = conn.getTable(TableName.valueOf(tableName));
ResultScanner scanner = table.getScanner(new Scan());
List<Map<String, String>> list = new ArrayList<>();
for (Result result : scanner) {
Map<String, String> rowToMap = getRowToMap(result);
list.add(rowToMap);
}
close(conn);
return list;
}
private static Map<String, String> getRowToMap(Result result) {
Map<String, String> map = new HashMap<>();
for (Cell listCell : result.listCells()) {
String family = Bytes.toString(CellUtil.cloneFamily(listCell));
String columnName = Bytes.toString(CellUtil.cloneQualifier(listCell));
String value = Bytes.toString(CellUtil.cloneValue(listCell));
map.put(family ":" columnName, value);
}
return map;
}
如果查询指定范围内的数据:
代码语言:javascript复制new Scan().withStartRow(Bytes.toBytes(start)).withStopRow(Bytes.toBytes(end))
默认只查询最后一个版本的数据,如果需要查询多个版本的数据可以设置
代码语言:javascript复制new Scan().withStartRow(Bytes.toBytes(start)).withStopRow(Bytes.toBytes(end)).readVersions(2)
导入数据到HBASE,使用MapReduce统计次数
假设是这些数据:
代码语言:javascript复制1_song1_2016-1-11 song1 singer1 man slow pc
2_song2_2016-1-11 song2 singer2 woman slow ios
3_song3_2016-1-11 song3 singer3 man quick andriod
4_song4_2016-1-11 song4 singer4 woman slow ios
5_song5_2016-1-11 song5 singer5 man quick pc
6_song6_2016-1-11 song6 singer6 woman quick ios
7_song7_2016-1-11 song7 singer7 man quick andriod
8_song8_2016-1-11 song8 singer8 woman slow pc
9_song9_2016-1-11 song9 singer9 woman slow ios
10_song4_2016-1-11 song4 singer4 woman slow ios
11_song6_2016-1-11 song6 singer6 woman quick ios
12_song6_2016-1-11 song6 singer6 woman quick ios
13_song3_2016-1-11 song3 singer3 man quick andriod
14_song2_2016-1-11 song2 singer2 woman slow ios
导入数据
在HDFS中创建一个目录,保存原始文件
代码语言:javascript复制hadoop fs -mkdir -p hdfs://hadoop-1:9000/input2/music2
上传文件到hdfs中
代码语言:javascript复制hadoop fs -put music1.txt hdfs://hadoop-1:9000/input2/music2
在Hbase sheel 中创建表
代码语言:javascript复制create 'music','info'
导入tsv文件到 music
表中
hbase org.apache.hadoop.hbase.mapreduce.ImportTsv -Dimporttsv.columns=HBASE_ROW_KEY,info:name,info:signer,info:gender,info:ryghme,info:terminal music /input2/music2/music1.txt
计算统计
Hbase Shell 执行创建存放统计信息的表
代码语言:javascript复制create 'namelist','details'
编写Map 和 Reduce 类
代码语言:javascript复制public class MusicPlayCountMapper extends TableMapper<Text, IntWritable> {
@Override
protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException,
InterruptedException {
for (Cell listCell : value.listCells()) {
context.write(new Text(Bytes.toString(CellUtil.cloneValue(listCell))), new IntWritable(1));
}
}
}
代码语言:javascript复制public class MusicCountReducer extends TableReducer<Text, IntWritable, Text> {
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException,
InterruptedException {
int rank = 0;
for (IntWritable value : values) {
rank = value.get();
}
context.write(key, new Put(Bytes.toBytes(key.toString())).addColumn(Bytes.toBytes("details"), Bytes.toBytes("rank"), Bytes.toBytes(String.valueOf(rank))).addColumn(Bytes.toBytes("details"), Bytes.toBytes("song"), Bytes.toBytes(key.toString())))
}
}
编写一个运行job的方法:
代码语言:javascript复制public static void mapReduceMusic() throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = getConf();
Job job = Job.getInstance(conf, "top-music");
job.setMapperClass(MusicPlayCountMapper.class);
job.setReducerClass(MusicCountReducer.class);
job.setJarByClass(HbaseDemo.class);
job.setNumReduceTasks(1);
TableMapReduceUtil.initTableMapperJob("music", new Scan().addColumn(
Bytes.toBytes("info"),
Bytes.toBytes("name")
), MusicPlayCountMapper.class, Text.class, IntWritable.class, job);
TableMapReduceUtil.initTableReducerJob("music_top", MusicCountReducer.class, job);
job.waitForCompletion(true);
}
运行之前需要做一件事,由于默认运行的时候是在本地运行job,所以需要所有的hbase的依赖jar,所以需要把hbase下的lib中的jar都加入到classpath中。
运行此方法后,可以通过 之前写的 scan 来查询 music_top
表中的数据:
[{details:rank=1, details:song=song1}, {details:rank=2, details:song=song2}, {details:rank=2, details:song=song3}, {details:rank=2, details:song=song4}, {details:rank=1, details:song=song5}, {details:rank=3, details:song=song6}, {details:rank=1, details:song=song7}, {details:rank=1, details:song=song8}, {details:rank=1, details:song=song9}]