Hbase增删查改工具类

2019-05-26 20:10:20 浏览数 (1)

代码语言:javascript复制
package cn.hljmobile.tagcloud.service.data.repository;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;

import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.PrefixFilter;
import org.apache.hadoop.hbase.filter.RegexStringComparator;
import org.apache.hadoop.hbase.filter.RowFilter;
import org.apache.hadoop.hbase.util.Bytes;

import org.apache.log4j.Logger;
import org.springframework.beans.BeanWrapper;
import org.springframework.beans.BeanWrapperImpl;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.hadoop.hbase.HbaseTemplate;
import org.springframework.data.hadoop.hbase.RowMapper;
import org.springframework.data.hadoop.hbase.TableCallback;
import org.springframework.stereotype.Repository;

//https://github.com/alibaba/simplehbase
//https://zhuanlan.zhihu.com/p/22742173
@Repository
public class HbaseQueryDao {

	Logger logger = Logger.getLogger(HbaseQueryDao.class); 
	
	private String columnFamily = "columns";

	public String getColumnFamily() {
		return columnFamily;
	}

	public void setColumnFamily(String columnFamily) {
		this.columnFamily = columnFamily;
	}

	@Autowired
	private HbaseTemplate hbaseTemplate;

	public <T> T queryForBeanByRowKey(String tableName, String rowKey, final Class<T> beanType) {
		
		logger.info("-----------------------HbaseQueryDao.queryForBeanByRowKey-----------------------------------");
		logger.info("tableName:"   tableName   ",rowKey:"   rowKey);
		logger.info("-----------------------HbaseQueryDao.queryForBeanByRowKey-----------------------------------");
        return hbaseTemplate.get(tableName, rowKey, new RowMapper<T>() {  
            public T mapRow(Result result, int rowNum) throws Exception {  
            	
            	Map<byte[], byte[]> map = result.getFamilyMap(Bytes.toBytes(columnFamily));
            	T t = beanType.newInstance();
            	BeanWrapper beanWrapper = new BeanWrapperImpl(t);
                for(Map.Entry<byte[], byte[]> entry : map.entrySet()){
                	beanWrapper.setPropertyValue(Bytes.toString(entry.getKey()),Bytes.toString(entry.getValue()));
                }
//                List<Cell> ceList = result.listCells();
//                
//                if (ceList != null && ceList.size() > 0) {  
//                    for (Cell cell : ceList) {
//                    	
//                    	String columnName = Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(),  
//                                cell.getQualifierLength());
//                    	String columnValue = Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
//                    	if (beanWrapper.isWritableProperty(columnName)) {
//                    		beanWrapper.setPropertyValue(columnName,columnValue);
//                    	}
//                    	
//                    }
//                }

                return t;  
            }  
        });  
	}
	
	public Map<String, String> queryForMapByRowKey(String tableName, String rowKey) {
		
		logger.info("-----------------------HbaseQueryDao.queryForMapByRowKey-----------------------------------");
		logger.info("tableName:"   tableName   ",rowKey:"   rowKey);
        return hbaseTemplate.get(tableName, rowKey, new RowMapper<Map<String, String>>() {  
            public Map<String, String> mapRow(Result result, int rowNum) throws Exception {  

            	/*Map<byte[], byte[]> mapColumn = result.getFamilyMap(Bytes.toBytes(columnFamily));
            	Map<String, String> map = new HashMap<String, String>();
                for(Map.Entry<byte[], byte[]> entry : mapColumn.entrySet()){
                	map.put(Bytes.toString(entry.getKey()),Bytes.toString(entry.getValue()));
                }*/
                
                Map<String, String> map = new HashMap<String, String>();
            	if(result.listCells()!=null && result.listCells().size()>0){  
            		Map<byte[], byte[]> mapColumn = result.getFamilyMap(Bytes.toBytes(columnFamily));
            		for(Map.Entry<byte[], byte[]> entry : mapColumn.entrySet()){
                    	map.put(Bytes.toString(entry.getKey()),Bytes.toString(entry.getValue()));
                    }
            	}
                return map;  
            }  
        });  
	}

	public List<Map<String, String>> queryForListByScanRange(String tableName, String startRow, String stopRow) {
		
		logger.info("-----------------------HbaseQueryDao.queryForListByScanRange-----------------------------------");
		logger.info("tableName:"   tableName   ",startRow:"   startRow  ",stopRow:"   stopRow);
		
		Scan scan = new Scan();  
		if (startRow != null) {
			scan.setStartRow(Bytes.toBytes(startRow));
        } else {
        	scan.setStartRow(Bytes.toBytes(""));
        }
        if (stopRow != null) {
        	scan.setStopRow(Bytes.toBytes(stopRow));
        } else {
        	scan.setStopRow(Bytes.toBytes(""));
        }
        
        Filter pf = new PrefixFilter(Bytes.toBytes(startRow));
        scan.setFilter(pf);

        //scan.setBatch(batch)
        return hbaseTemplate.find(tableName, scan, new RowMapper<Map<String, String>>() {
			
            public Map<String, String> mapRow(Result result, int rowNum) throws Exception {  
            	
            	Map<String, String> map = new HashMap<String, String>();
            	String  rowkey = "";
            	if(result.listCells()!=null && result.listCells().size()>0){  
	            	for (Cell cell : result.listCells()) {  
	            		rowkey =Bytes.toString( cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());  
	            		String value =Bytes.toString( cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());  
	                    String family =  Bytes.toString(cell.getFamilyArray(),cell.getFamilyOffset(),cell.getFamilyLength());  
	                    String quali = Bytes.toString( cell.getQualifierArray(),cell.getQualifierOffset(),cell.getQualifierLength());  
	                    if(columnFamily.equals(family)) {
	                    	map.put(quali,value);
	                    }
//	                    System.out.println(family "_" quali "_"  value);  
	                }
	            	map.put("rowkey",rowkey);
            	}
                /*
                 Map<byte[], byte[]> mapColumn = result.getFamilyMap(Bytes.toBytes(columnFamily));
                 for(Map.Entry<byte[], byte[]> entry : mapColumn.entrySet()){
                	map.put(Bytes.toString(entry.getKey()), Bytes.toString(entry.getValue()));
                }*/
                return map;  
            }  
        });  
	}
	//用正则表达式模糊查询
	public List<Map<String, String>> queryForListByRegex(String tableName,String regex) {
		logger.info("-----------------------HbaseQueryDao.queryForListByRegex-----------------------------");
		logger.info("tableName:"   tableName   ",rowKey:"   regex);
		Scan scan = new Scan();  
		/*List<Filter> filters = new ArrayList<Filter>();   
		Filter filter1 = new RowFilter(CompareOp.EQUAL,new RegexStringComparator("kpi")); 
		filters.add(filter1); 
		Filter filter2 = new RowFilter(CompareOp.EQUAL,new RegexStringComparator("kqi")); 
		filters.add(filter2); 
		FilterList filterList = new FilterList(filters); 
		scan.setFilter(filterList);*/
		Filter filter = new RowFilter(CompareOp.EQUAL,new RegexStringComparator(regex)); 
		scan.setFilter(filter);
        return hbaseTemplate.find(tableName, scan, new RowMapper<Map<String, String>>() {
			
            public Map<String, String> mapRow(Result result, int rowNum) throws Exception {  
            	
            	Map<String, String> map = new HashMap<String, String>();
            	String  rowkey = "";
            	if(result.listCells()!=null && result.listCells().size()>0){  
	            	for (Cell cell : result.listCells()) {  
	            		rowkey =Bytes.toString( cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());  
	            		String value =Bytes.toString( cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());  
	                    String family =  Bytes.toString(cell.getFamilyArray(),cell.getFamilyOffset(),cell.getFamilyLength());  
	                    String quali = Bytes.toString( cell.getQualifierArray(),cell.getQualifierOffset(),cell.getQualifierLength());  
	                    if(columnFamily.equals(family)) {
	                    	map.put(quali,value);
	                    }
	                }
	            	map.put("rowkey",rowkey);
            	}
                return map;  
            }  
        });  
	}
	//查询除了regex以外的值
	public List<Map<String, String>> queryForListNotByRegex(String tableName,String regex) {
		logger.info("-----------------------HbaseQueryDao.queryForListByRegex-----------------------------");
		logger.info("tableName:"   tableName   ",rowKey:"   regex);
		Scan scan = new Scan();  
		Filter filter = new RowFilter(CompareOp.NOT_EQUAL,new RegexStringComparator(regex)); 
		scan.setFilter(filter);
        return hbaseTemplate.find(tableName, scan, new RowMapper<Map<String, String>>() {
			
            public Map<String, String> mapRow(Result result, int rowNum) throws Exception {  
            	
            	Map<String, String> map = new HashMap<String, String>();
            	String  rowkey = "";
            	if(result.listCells()!=null && result.listCells().size()>0){  
	            	for (Cell cell : result.listCells()) {  
	            		rowkey =Bytes.toString( cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());  
	            		String value =Bytes.toString( cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());  
	                    String family =  Bytes.toString(cell.getFamilyArray(),cell.getFamilyOffset(),cell.getFamilyLength());  
	                    String quali = Bytes.toString( cell.getQualifierArray(),cell.getQualifierOffset(),cell.getQualifierLength());  
	                    if(columnFamily.equals(family)) {
	                    	map.put(quali,value);
	                    }
	                }
	            	map.put("rowkey",rowkey);
            	}
                return map;  
            }  
        });  
	}
	
	
	
	

	public void save(String tableName, String rowKey, Map<String,String> rowMap) {

		for (Entry<String, String> en : rowMap.entrySet()) {
			hbaseTemplate.put(tableName, rowKey, columnFamily, en.getKey(), Bytes.toBytes(en.getValue()));
		}
	}
	
	public void delete(String tableName, final String rowKey) {
		
		  hbaseTemplate.execute(tableName, new TableCallback<Boolean>() {  
		        public Boolean doInTable(HTableInterface table) throws Throwable {  
		            boolean flag = false;  
		            try{  
		                List<Delete> list = new ArrayList<Delete>();  
		                Delete d1 = new Delete(rowKey.getBytes());  
		                list.add(d1);  
		            	table.delete(list); 
		             flag = true;  
		            }catch(Exception e){  
		                e.printStackTrace();  
		            }  
		            return flag;  
		        }  
		    });  
	}
	
}

0 人点赞