项目使用Hbase进行数据快速查询的代码案例

2019-07-08 00:37:01 浏览数 (1)

之前项目中对于数据详情的查询使用的ddb技术,由于成本过高,现考虑使用开源的hbase框架,借此机会进行hbase的代码案例记录,之前已经对

hbase的原理进行介绍,介绍了hbase中的rowkey,列,列族,以及存储原理等,可以参考之前的博客,现只针对hbase的java Api进行分析。

一、连接配置,拿到 connection 

代码语言:javascript复制
    /**
     * 声明静态配置
     */
    private Configuration conf = null;
    private Connection connection = null;

    /**
     * 构造函数
     */
    public HBaseService(Configuration conf)
    {
        this.conf = conf;
        try {
            connection = ConnectionFactory.createConnection(conf);
        } catch (IOException e) {
            log.error("获取HBase连接失败");
        }
    }
代码语言:javascript复制
   @Value("${HBase.nodes}")
    private String nodes;

    @Value("${HBase.maxsize}")
    private String maxsize;

    @Bean
    public HBaseService getHbaseService(){
        org.apache.hadoop.conf.Configuration conf = HBaseConfiguration.create();
        conf.set("hbase.zookeeper.quorum",nodes );
        conf.set("hbase.client.keyvalue.maxsize",maxsize);
        conf.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 60000);
        conf.setInt("hbase.rpc.timeout", 20000);
        conf.setInt("hbase.client.operation.timeout", 30000);
        conf.setInt("hbase.client.scanner.timeout.period", 20000);
        conf.setInt("hbase.client.pause", 50);
        conf.setInt("hbase.client.retries.number", 15);

        return new HBaseService(conf);
    }

创建表:

代码语言:javascript复制
  /**
     * 创建表
     * @author gxy
     * @date 2018/7/3 17:50
     * @since 1.0.0
     * @param tableName 表名
     * @param columnFamily 列族名  list表
     * @return void
     */
    public boolean creatTable(String tableName, List<String> columnFamily)
    {
        Admin admin = null;
        try {
            admin = connection.getAdmin();

            //一个表中可以存在多个列族
            List<ColumnFamilyDescriptor> familyDescriptors = new ArrayList<>(columnFamily.size());

            for(String cf : columnFamily)   //遍历所有列族
            {
                familyDescriptors.add(ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(cf)).build());  //添加列族的描述
            }

            TableDescriptor tableDescriptor = TableDescriptorBuilder.newBuilder(TableName.valueOf(tableName))   //添加表的描述
                    .setColumnFamilies(familyDescriptors)
                    .build();

            if (admin.tableExists(TableName.valueOf(tableName))) {
                log.debug("table Exists!");
            } else {
                admin.createTable(tableDescriptor);
                log.debug("create table Success!");
            }
        } catch (IOException e) {
            log.error(MessageFormat.format("创建表{0}失败",tableName),e);
            return false;
        }finally {
            close(admin,null,null);
        }
        return true;
    }

    /**
     * 预分区创建表
     * @param tableName 表名
     * @param columnFamily 列族名的集合
     * @param splitKeys 预分期region
     * @return 是否创建成功
     */
    public boolean createTableBySplitKeys(String tableName, List<String> columnFamily, byte[][] splitKeys) {
        Admin admin = null;
        try {
            if (StringUtils.isBlank(tableName) || columnFamily == null || columnFamily.size() == 0)
            {
                log.error("===Parameters tableName|columnFamily should not be null,Please check!===");
                return false;
            }
            admin = connection.getAdmin();
            if (admin.tableExists(TableName.valueOf(tableName))) {
                return true;
            } else {
                List<ColumnFamilyDescriptor> familyDescriptors = new ArrayList<>(columnFamily.size());

                for(String cf : columnFamily)
                {
                    familyDescriptors.add(ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(cf)).build());
                }

                TableDescriptor tableDescriptor = TableDescriptorBuilder.newBuilder(TableName.valueOf(tableName)).setColumnFamilies(familyDescriptors).build();

                //指定splitkeys
                admin.createTable(tableDescriptor, splitKeys);
                log.info("===Create Table "   tableName   " Success!columnFamily:"   columnFamily.toString()   "===");
            }
        } catch (IOException e) {
            log.error("",e);
            return false;
        }finally {
            close(admin,null,null);
        }

        return true;
    }

    /**
     * 自定义获取分区splitKeys
     */
    public static byte[][] getSplitKeys(String[] keys){
        if(keys==null){
            //默认为10个分区
            keys = new String[] {  "1|", "2|", "3|", "4|", "5|", "6|", "7|", "8|", "9|" };
        }
        byte[][] splitKeys = new byte[keys.length][];
        //升序排序
        TreeSet<byte[]> rows = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
        for(String key : keys){
            rows.add(Bytes.toBytes(key));
        }

        Iterator<byte[]> rowKeyIter = rows.iterator();
        int i=0;
        while (rowKeyIter.hasNext()) {
            byte[] tempRow = rowKeyIter.next();
            rowKeyIter.remove();
            splitKeys[i] = tempRow;
            i  ;
        }
        return splitKeys;
    }

增加、跟新数据:

代码语言:javascript复制
 /**
     * 为表添加 or 更新数据
     * @author gxy
     * @date 2018/7/3 17:26
     * @since 1.0.0
     * @param tableName 表名
     * @param rowKey rowKey
     * @param familyName 列族名
     * @param columns 列名数组
     * @param values 列值得数组
     */
    public void putData(String tableName,String rowKey, String familyName, String[] columns, String[] values) {
        // 获取表
        Table table= null;
        try {
            table=getTable(tableName);
            putDataIntoHbase(table,rowKey,tableName,familyName,columns,values);

        } catch (Exception e) {
            log.error(MessageFormat.format("为表添加 or 更新数据失败,tableName:{0},rowKey:{1},familyName:{2}"
                    ,tableName,rowKey,familyName),e);
        }finally {
            close(null,null,table);
        }
    }

    /**
     * 为表添加 or 更新数据  -- 多列
     * @author gxy
     * @date 2018/7/3 17:26
     * @since 1.0.0
     * @param table Table
     * @param rowKey rowKey
     * @param tableName 表名
     * @param familyName 列族名
     * @param columns 列名数组
     * @param values 列值得数组
     */
    private void putDataIntoHbase(Table table, String rowKey, String tableName, String familyName, String[] columns, String[] values) {
        try {
            //设置rowkey
            Put put = new Put(Bytes.toBytes(rowKey));

            if(columns != null && values != null && columns.length == values.length){
                for(int i=0;i<columns.length;i  ){
                    if(columns[i] != null && values[i] != null){
                        put.addColumn(Bytes.toBytes(familyName), Bytes.toBytes(columns[i]), Bytes.toBytes(values[i]));
                    }else{
                        throw new NullPointerException(MessageFormat.format("列名和列数据都不能为空,column:{0},value:{1}"
                                ,columns[i],values[i]));
                    }
                }
            }

            table.put(put);
            log.debug("putData add or update data Success,rowKey:"   rowKey);
            table.close();
        } catch (Exception e) {
            log.error(MessageFormat.format("为表添加 or 更新数据失败,tableName:{0},rowKey:{1},familyName:{2}"
                    ,tableName,rowKey,familyName),e);
        }
    }

    /**
     * 为表的某个单元格赋值  -- 单列
     * @author gxy
     * @date 2018/7/4 10:20
     * @since 1.0.0
     * @param tableName 表名
     * @param rowKey rowKey
     * @param familyName 列族名
     * @param column1 列名
     * @param value1 列值
     */
    public void setColumnValue(String tableName, String rowKey, String familyName, String column1, String value1){
        Table table=null;
        try {
            // 获取表
            table=getTable(tableName);
            // 设置rowKey
            Put put = new Put(Bytes.toBytes(rowKey));
            put.addColumn(Bytes.toBytes(familyName), Bytes.toBytes(column1), Bytes.toBytes(value1));

            table.put(put);
            log.debug("add data Success!");
        }catch (IOException e) {
            log.error(MessageFormat.format("为表的某个单元格赋值失败,tableName:{0},rowKey:{1},familyName:{2},column:{3}"
                    ,tableName,rowKey,familyName,column1),e);
        }finally {
            close(null,null,table);
        }
    }

删除数据:

代码语言:javascript复制
 /**
     * 删除指定的单元格
     * @author gxy
     * @date 2018/7/4 11:41
     * @since 1.0.0
     * @param tableName 表名
     * @param rowKey rowKey
     * @param familyName 列族名
     * @param columnName 列名
     * @return boolean
     */
    public boolean deleteColumn(String tableName, String rowKey, String familyName, String columnName)
    {
        Table table=null;
        Admin admin = null;
        try {
            admin = connection.getAdmin();

            if(admin.tableExists(TableName.valueOf(tableName)))  //判断表是存在的
            {
                //获取表
                table=getTable(tableName);
                Delete delete = new Delete(Bytes.toBytes(rowKey));
                // 设置待删除的列
                delete.addColumns(Bytes.toBytes(familyName), Bytes.toBytes(columnName));    //精确到列名

                table.delete(delete);
                log.debug(MessageFormat.format("familyName({0}):columnName({1})is deleted!",familyName,columnName));
            }

        }catch (IOException e) {
            log.error(MessageFormat.format("删除指定的列失败,tableName:{0},rowKey:{1},familyName:{2},column:{3}"
                    ,tableName,rowKey,familyName,columnName),e);
            return false;
        }finally {
            close(admin,null,table);
        }
        return true;
    }

    /**
     * 根据rowKey删除指定的行
     * @author gxy
     * @date 2018/7/4 13:26
     * @since 1.0.0
     * @param tableName 表名
     * @param rowKey rowKey
     * @return boolean
     */
    public boolean deleteRow(String tableName, String rowKey) {
        Table table=null;
        Admin admin = null;
        try {
            admin = connection.getAdmin();

            if(admin.tableExists(TableName.valueOf(tableName))){
                // 获取表
                table=getTable(tableName);
                Delete delete = new Delete(Bytes.toBytes(rowKey)); //精确到rowKey

                table.delete(delete);
                log.debug(MessageFormat.format("row({0}) is deleted!",rowKey));
            }
        }catch (IOException e) {
            log.error(MessageFormat.format("删除指定的行失败,tableName:{0},rowKey:{1}"
                    ,tableName,rowKey),e);
            return false;
        }finally {
            close(admin,null,table);
        }
        return true;
    }

    /**
     * 根据columnFamily删除指定的列族
     * @author gxy
     * @date 2018/7/4 13:26
     * @since 1.0.0
     * @param tableName 表名
     * @param columnFamily 列族
     * @return boolean
     */
    public boolean deleteColumnFamily(String tableName, String columnFamily) {
        Admin admin = null;
        try {
            admin = connection.getAdmin();

            if(admin.tableExists(TableName.valueOf(tableName)))
            {
                admin.deleteColumnFamily(TableName.valueOf(tableName), Bytes.toBytes(columnFamily));   //精确到表明中的列族名
                log.debug(MessageFormat.format("familyName({0}) is deleted!",columnFamily));
            }
        }catch (IOException e) {

            log.error(MessageFormat.format("删除指定的列族失败,tableName:{0},columnFamily:{1}",tableName,columnFamily),e);
            return false;
        }finally {
            close(admin,null,null);
        }
        return true;
    }

    /**
     * 删除表
     * @author gxy
     * @date 2018/7/3 18:02
     * @since 1.0.0
     * @param tableName 表名
     */
    public boolean deleteTable(String tableName){
        Admin admin = null;
        try {
            admin = connection.getAdmin();

            if(admin.tableExists(TableName.valueOf(tableName)))
            {
                admin.disableTable(TableName.valueOf(tableName)); //首先disable掉table
                admin.deleteTable(TableName.valueOf(tableName));
                log.debug(tableName   "is deleted!");
            }
        }catch (IOException e) {
            log.error(MessageFormat.format("删除指定的表失败,tableName:{0}"
                    ,tableName),e);
            return false;
        }finally {
            close(admin,null,null);
        }
        return true;
    }

查询数据:

代码语言:javascript复制
   /**
     * 获取table
     * @param tableName 表名
     * @return Table
     * @throws IOException IOException
     */
    private Table getTable(String tableName) throws IOException
    {
        return connection.getTable(TableName.valueOf(tableName));
    }

    /**
     * 查询库中所有表的表名
     */
    public List<String> getAllTableNames()
    {
        List<String> result = new ArrayList<>();

        Admin admin = null;
        try {
            admin = connection.getAdmin();
            TableName[] tableNames = admin.listTableNames();

            for(TableName tableName : tableNames)
            {
                result.add(tableName.getNameAsString());
            }
        }catch (IOException e) {
            log.error("获取所有表的表名失败",e);
        }finally {
            close(admin,null,null);
        }

        return result;
    }

    /**
     * 遍历查询指定表中的所有数据
     * @author gxy
     * @date 2018/7/3 18:21
     * @since 1.0.0
     * @param tableName 表名
     * @return java.util.Map<java.lang.String,java.util.Map<java.lang.String,java.lang.String>>
     */
    public Map<String,Map<String,String>> getResultScanner(String tableName){
        Scan scan = new Scan();
        return this.queryData(tableName,scan);
    }

    /**
     * 根据startRowKey和stopRowKey遍历查询指定表中的所有数据
     * @author gxy
     * @date 2018/7/4 18:21
     * @since 1.0.0
     * @param tableName 表名
     * @param startRowKey 起始rowKey
     * @param stopRowKey 结束rowKey
     * @return java.util.Map<java.lang.String,java.util.Map<java.lang.String,java.lang.String>>
     */
    public Map<String,Map<String,String>> getResultScanner(String tableName, String startRowKey, String stopRowKey){
        Scan scan = new Scan();

        if(StringUtils.isNoneBlank(startRowKey) && StringUtils.isNoneBlank(stopRowKey)){
            scan.withStartRow(Bytes.toBytes(startRowKey));
            scan.withStopRow(Bytes.toBytes(stopRowKey));
        }

        return this.queryData(tableName,scan);
    }

    /**
     * 通过行前缀过滤器查询数据
     * @author gxy
     * @date 2018/7/4 18:21
     * @since 1.0.0
     * @param tableName 表名
     * @param prefix 以prefix开始的行键
     * @return java.util.Map<java.lang.String,java.util.Map<java.lang.String,java.lang.String>>
     */
    public Map<String,Map<String,String>> getResultScannerPrefixFilter(String tableName, String prefix){
        Scan scan = new Scan();

        if(StringUtils.isNoneBlank(prefix)){
            Filter filter = new PrefixFilter(Bytes.toBytes(prefix));
            scan.setFilter(filter);
        }

        return this.queryData(tableName,scan);
    }

    /**
     * 通过列前缀过滤器查询数据
     * @author gxy
     * @date 2018/7/4 18:21
     * @since 1.0.0
     * @param tableName 表名
     * @param prefix 以prefix开始的列名
     * @return java.util.Map<java.lang.String,java.util.Map<java.lang.String,java.lang.String>>
     */
    public Map<String,Map<String,String>> getResultScannerColumnPrefixFilter(String tableName, String prefix){
        Scan scan = new Scan();

        if(StringUtils.isNoneBlank(prefix)){
            Filter filter = new ColumnPrefixFilter(Bytes.toBytes(prefix));
            scan.setFilter(filter);
        }

        return this.queryData(tableName,scan);
    }

    /**
     * 查询行键中包含特定字符的数据
     * @author gxy
     * @date 2018/7/4 18:21
     * @since 1.0.0
     * @param tableName 表名
     * @param keyword 包含指定关键词的行键
     * @return java.util.Map<java.lang.String,java.util.Map<java.lang.String,java.lang.String>>
     */
    public Map<String,Map<String,String>> getResultScannerRowFilter(String tableName, String keyword){
        Scan scan = new Scan();

        if(StringUtils.isNoneBlank(keyword)){
            Filter filter = new RowFilter(CompareOperator.GREATER_OR_EQUAL,new SubstringComparator(keyword));
            scan.setFilter(filter);
        }

        return this.queryData(tableName,scan);
    }

    /**
     * 查询列名中包含特定字符的数据
     * @author gxy
     * @date 2018/7/4 18:21
     * @since 1.0.0
     * @param tableName 表名
     * @param keyword 包含指定关键词的列名
     * @return java.util.Map<java.lang.String,java.util.Map<java.lang.String,java.lang.String>>
     */
    public Map<String,Map<String,String>> getResultScannerQualifierFilter(String tableName, String keyword){
        Scan scan = new Scan();

        if(StringUtils.isNoneBlank(keyword)){
            Filter filter = new QualifierFilter(CompareOperator.GREATER_OR_EQUAL,new SubstringComparator(keyword));
            scan.setFilter(filter);
        }

        return this.queryData(tableName,scan);
    }



    /**
     * 通过表名以及过滤条件查询数据
     * @author gxy
     * @date 2018/7/4 16:13
     * @since 1.0.0
     * @param tableName 表名
     * @param scan 过滤条件
     * @return java.util.Map<java.lang.String,java.util.Map<java.lang.String,java.lang.String>>
     */
    private Map<String,Map<String,String>> queryData(String tableName,Scan scan){
        //<rowKey,对应的行数据>
        Map<String,Map<String,String>> result = new HashMap<>();

        ResultScanner rs = null;
        // 获取表
        Table table= null;
        try {
            table = getTable(tableName);
            rs = table.getScanner(scan);
            for (Result r : rs) {
                //每一行数据
                Map<String,String> columnMap = new HashMap<>();
                String rowKey = null;
                for (Cell cell : r.listCells()) {
                    if(rowKey == null){
                        rowKey = Bytes.toString(cell.getRowArray(),cell.getRowOffset(),cell.getRowLength());
                    }
                    columnMap.put(Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()), Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()));
                }

                if(rowKey != null){
                    result.put(rowKey,columnMap);
                }
            }
        }catch (IOException e) {
            log.error(MessageFormat.format("遍历查询指定表中的所有数据失败,tableName:{0}"
                    ,tableName),e);
        }finally {
            close(null,rs,table);
        }

        return result;
    }

    /**
     * 根据tableName和rowKey精确查询一行的数据
     * @author gxy
     * @date 2018/7/3 16:07
     * @since 1.0.0
     * @param tableName 表名
     * @param rowKey 行键
     * @return java.util.Map<java.lang.String,java.lang.String> 返回一行的数据
     */
    public Map<String,String> getRowData(String tableName, String rowKey){
        //返回的键值对
        Map<String,String> result = new HashMap<>();

        Get get = new Get(Bytes.toBytes(rowKey));
        // 获取表
        Table table= null;
        try {
            table = getTable(tableName);
            Result hTableResult = table.get(get);   //table通过封装rowKey的Get 获得具体的行 拿到 Cell
            if (hTableResult != null && !hTableResult.isEmpty()) {
                for (Cell cell : hTableResult.listCells()) {
//                System.out.println("family:"   Bytes.toString(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength()));
//                System.out.println("qualifier:"   Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()));
//                System.out.println("value:"   Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()));
//                System.out.println("Timestamp:"   cell.getTimestamp());
//                System.out.println("-------------------------------------------");
                    result.put(Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()), Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()));
                }
            }
        }catch (IOException e) {
            log.error(MessageFormat.format("查询一行的数据失败,tableName:{0},rowKey:{1}"
                    ,tableName,rowKey),e);
        }finally {
            close(null,null,table);
        }

        return result;
    }

    /**
     * 根据tableName、rowKey、familyName、column查询指定单元格的数据
     * @author gxy
     * @date 2018/7/4 10:58
     * @since 1.0.0
     * @param tableName 表名
     * @param rowKey rowKey
     * @param familyName 列族名
     * @param columnName 列名
     * @return java.lang.String
     */
    public String getColumnValue(String tableName, String rowKey, String familyName, String columnName){
        String str = null;
        Get get = new Get(Bytes.toBytes(rowKey));
        // 获取表
        Table table= null;
        try {
            table = getTable(tableName);
            Result result = table.get(get);
            if (result != null && !result.isEmpty()) {
                Cell cell = result.getColumnLatestCell(Bytes.toBytes(familyName), Bytes.toBytes(columnName));
                if(cell != null){
                    str = Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
                }
            }
        } catch (IOException e) {
            log.error(MessageFormat.format("查询指定单元格的数据失败,tableName:{0},rowKey:{1},familyName:{2},columnName:{3}"
                    ,tableName,rowKey,familyName,columnName),e);
        }finally {
            close(null,null,table);
        }

        return str;
    }

    /**
     * 根据tableName、rowKey、familyName、column查询指定单元格多个版本的数据
     * @author gxy
     * @date 2018/7/4 11:16
     * @since 1.0.0
     * @param tableName 表名
     * @param rowKey rowKey
     * @param familyName 列族名
     * @param columnName 列名
     * @param versions 需要查询的版本数
     * @return java.util.List<java.lang.String>
     */
    public List<String> getColumnValuesByVersion(String tableName, String rowKey, String familyName, String columnName,int versions) {
        //返回数据
        List<String> result = new ArrayList<>(versions);

        // 获取表
        Table table= null;
        try {
            table = getTable(tableName);
            Get get = new Get(Bytes.toBytes(rowKey));
            get.addColumn(Bytes.toBytes(familyName), Bytes.toBytes(columnName));
            //读取多少个版本
            get.readVersions(versions);
            Result hTableResult = table.get(get);
            if (hTableResult != null && !hTableResult.isEmpty()) {
                for (Cell cell : hTableResult.listCells()) {
                    result.add(Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()));
                }
            }
        } catch (IOException e) {
            log.error(MessageFormat.format("查询指定单元格多个版本的数据失败,tableName:{0},rowKey:{1},familyName:{2},columnName:{3}"
                    ,tableName,rowKey,familyName,columnName),e);
        }finally {
            close(null,null,table);
        }

        return result;
    }

关闭流:

代码语言:javascript复制
 /**
     * 关闭流
     */
    private void close(Admin admin, ResultScanner rs, Table table)
    {
        if(admin != null){
            try {
                admin.close();   //关闭 admin
            } catch (IOException e) {
                log.error("关闭Admin失败",e);
            }
        }

        if(rs != null){     //关闭 ResultScanner
            rs.close();
        }

        if(table != null){
            try {
                table.close();  //关闭 table
            } catch (IOException e) {
                log.error("关闭Table失败",e);
            }
        }
    }

0 人点赞