代码语言:javascript复制
package org.ucas.hbase;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.HashMap;
import java.util.Map;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.MasterNotRunningException;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
public class Hw1Grp2 {
//hbase 表名
private static final String TABLE_NAME = "Result";
//列簇名
private static final String COLMUN_FAMILY = "res";
private HTable table;
public HTable getTable() {
return table;
}
public void setTable(HTable table) {
this.table = table;
}
public BufferedReader readHdfs(String file) throws IOException, URISyntaxException{
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(file), conf);
Path path = new Path(file);
FSDataInputStream inStream = fs.open(path);
BufferedReader in = new BufferedReader(new InputStreamReader(inStream));
return in;
}
public HTable createTable(String tableName) throws MasterNotRunningException,
ZooKeeperConnectionException, IOException{
Configuration configuration = HBaseConfiguration.create();
HBaseAdmin hAdmin = new HBaseAdmin(configuration);
if(hAdmin.tableExists(tableName)) {
System.out.println("table is exists, delete exists table");
hAdmin.disableTable(tableName);
hAdmin.deleteTable(tableName);
} else {
System.out.println("table not exists");
}
HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(tableName));
HColumnDescriptor cf = new HColumnDescriptor(COLMUN_FAMILY);
htd.addFamily(cf);
hAdmin.createTable(htd);
hAdmin.close();
System.out.println("table create");
return new HTable(configuration,tableName);
}
public void insert(String rowKey, String family, String qualifier, String value) throws IOException {
Put put = new Put(rowKey.getBytes());
put.add(family.getBytes(),qualifier.getBytes(),value.getBytes());
table.put(put);
}
public void handleData(String file, int rowKey, Map<String, Integer> args) throws IOException, URISyntaxException {
String colStr = null;
BufferedReader buffer = readHdfs(file);
//rowKey和count哈希表
Map<String, Integer> mapCount = new HashMap<String, Integer>();
//rowKey 的某列sum哈希表
Map<String, Integer> mapSum = new HashMap<String, Integer>();
//max哈希表
Map<String, Integer> mapMax = new HashMap<String, Integer>();
//avg哈希表
Map<String, Float> mapAvg = new HashMap<String, Float>();
//min哈希表
Map<String, Integer> mapMin = new HashMap<String, Integer>();
int maxCol = -1, avgCol = -1, sumCol = -1, minCol = -1, countCol = -1;
//根据传进来的参数设置需要进行的聚合函数
if(args.containsKey("count")) {
countCol = args.get("count");
}
if(args.containsKey("avg")) {
avgCol = args.get("avg");
}
if(args.containsKey("max")) {
maxCol = args.get("max");
}
if(args.containsKey("sum")) {
sumCol = args.get("sum");
}
if(args.containsKey("min")) {
minCol = args.get("min");
}
//算出需要用到的聚合函数
String str;
while((str = buffer.readLine()) != null) {
String[] col = str.split("\|");
if(mapCount.containsKey(col[rowKey])) {
mapCount.put(col[rowKey], mapCount.get(col[rowKey]) 1 );
} else {
mapCount.put(col[rowKey], 1);
}
if(sumCol != -1) {
if(mapSum.containsKey(col[rowKey])) {
mapSum.put(col[rowKey], mapSum.get(col[rowKey]) Integer.parseInt(col[sumCol]) );
} else {
mapSum.put(col[rowKey], Integer.parseInt(col[sumCol]));
}
}
if(avgCol != -1) {
if(mapAvg.containsKey(col[rowKey])) {
mapAvg.put(col[rowKey], mapAvg.get(col[rowKey]) Float.parseFloat(col[avgCol]) );
} else {
mapAvg.put(col[rowKey], Float.parseFloat(col[avgCol]));
}
}
if(maxCol != -1) {
if(mapMax.containsKey(col[rowKey])) {
if(Integer.parseInt(col[maxCol]) > mapMax.get(col[rowKey]))
mapMax.put(col[rowKey], Integer.parseInt(col[maxCol]));
} else {
mapMax.put(col[rowKey], Integer.parseInt(col[maxCol]));
}
}
if(minCol != -1) {
if(mapMin.containsKey(col[rowKey])) {
if(Integer.parseInt(col[minCol]) < mapMin.get(col[rowKey]))
mapMin.put(col[rowKey], Integer.parseInt(col[minCol]));
} else {
mapMin.put(col[rowKey], Integer.parseInt(col[minCol]));
}
}
}
//从hashmap中插入数据表
for(String key : mapCount.keySet()) {
if(countCol != -1) {
colStr = "count";
insert(key, "res", colStr, mapCount.get(key) "");
}
if(avgCol != -1) {
colStr = "avg(R" avgCol ")";
mapAvg.put(key, (float)Math.round(mapAvg.get(key)/mapCount.get(key)*100)/100);
insert(key, "res", colStr, mapAvg.get(key) "");
}
if(maxCol != -1) {
colStr = "max(R" maxCol ")";
insert(key, "res", colStr, mapMax.get(key) "");
}
if(minCol != -1) {
colStr = "min(R" minCol ")";
insert(key, "res", colStr, mapMin.get(key) "");
}
if(sumCol != -1) {
colStr = "sum(R" sumCol ")";
insert(key, "res", colStr, mapSum.get(key) "");
}
}
System.out.println("handle data success");
}
public static void main(String[] args) throws IOException, URISyntaxException {
/**
* 命令参数解析,解析出文件名,group by的列,需要求的聚合函数
*/
if(args.length != 3) {
System.out.println("input args length error");
System.exit(0);
}
String file = StringUtils.substringAfter(args[0], "=");
if(file == null) {
System.out.println("args error");
System.exit(0);
}
String keyNum = StringUtils.substringAfter(args[1], "R");
if(keyNum == null) {
System.out.println("args error");
System.exit(0);
}
int rowKey = Integer.parseInt(keyNum);
String colsName = StringUtils.substringAfter(args[2], ":");
if(colsName == null) {
System.out.println("args error");
System.exit(0);
}
String[] cmdStr = colsName.split(",");
Map<String, Integer> cmd = new HashMap<String, Integer>();
for(int i = 0; i < cmdStr.length; i ) {
if(!cmdStr[i].equals("count")) {
cmd.put(StringUtils.substringBefore(cmdStr[i], "("), Integer.parseInt(StringUtils.substringBetween(cmdStr[i],"R", ")")));
} else {
cmd.put(cmdStr[i], rowKey);
}
}
System.out.println("file:" file);
for(String key : cmd.keySet()) {
System.out.println(key ":" cmd.get(key));
}
Hw1Grp2 h = new Hw1Grp2();
h.setTable(h.createTable(TABLE_NAME));
h.handleData(file, rowKey, cmd);
System.out.println("program is over");
}
}