HBase BulkLoad 原理及批量写入数据实战

2022-12-05 09:11:30 浏览数 (2)

目录

一、HBase BulkLoad介绍

  1. 前言
  2. 为什么要用bulkload方式导入?
  3. bulkload的实现原理

二、HBase BulkLoad批量写入数据实战

  1. 开发生成HFile文件的代码
  2. 打成jar包提交到集群中运行
  3. 观察HDFS上输出的结果
  4. 加载HFile文件到hbase表中
  5. 总结

一、HBase BulkLoad介绍

1. 前言

之前我们介绍了HBASE的存储机制,HBASE存储数据其底层使用的是HDFS来作为存储介质,HBASE的每一张表对应的HDFS目录上的一个文件夹,文件夹名是以HBASE表的名字来命名(如果没有使用命名空间,那么默认是在default目录下)。在表文件夹下存放着若干个region命名的文件夹,而region文件夹中的每个列族也是用文件夹进行存储的,每个列族中存储的就是实际的数据,以HFile的形式存在。

路径格式: /hbase/data/default/<tbl_name>/<region_id>//<hfile_id>

2. 为什么要用bulkload方式导入?

在进行数据传输中,批量加载数据到HBase集群有多种方式,比如通过HBase API进行批量写入数据、使用Sqoop工具批量导数到HBase集群、使用MapReduce批量导入等。这些方式,在导入数据的过程中,如果数据量过大,可能耗时会比较严重或者占用HBase集群资源较多(如磁盘IO、HBase Handler数等)。

3. bulkload的实现原理

按照HBase存储数据按照HFile格式存储在HDFS的原理,使用MapReduce直接生成HFile格式的数据文件,然后再通过RegionServer将HFile数据文件移动到相应的Region上去。

  • HBase数据正常写流程
  • bulkload方式的处理示意图
  • bulkload的好处
  1. 导入过程不占用Region资源
  2. 能快速导入海量的数据
  3. 节省内存

二、HBase BulkLoad批量写入数据实战

需求

通过bulkload的方式,将我们放在HDFS上面的这个路径/hbase/input/user.txt的数据文件,转换成HFile格式,然后load到myuser2这张Hbase表里面去。

1. 开发生成HFile文件的代码

自定义map类

代码语言:javascript复制
package xsluo.hbase;
 
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
 
//四个泛型中后两个,分别对应rowkey及put
public class BulkLoadMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> {
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String[] split = value.toString().split("t");
 
        //封装输出的rowkey类型
        ImmutableBytesWritable immutableBytesWritable = new ImmutableBytesWritable(split[0].getBytes());
        //构建Put对象
        Put put = new Put(split[0].getBytes());
        put.addColumn("f1".getBytes(), "name".getBytes(), split[1].getBytes());
        put.addColumn("f1".getBytes(), "age".getBytes(), split[2].getBytes());
 
        context.write(immutableBytesWritable, put);
    }
}

程序main

代码语言:javascript复制
package xsluo.hbase;
 
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
 
public class HBaseBulkLoad extends Configured implements Tool {
    public static void main(String[] args) throws Exception {
        Configuration configuration = HBaseConfiguration.create();
        //设置ZK集群
        configuration.set("hbase.zookeeper.quorum", "node01:2181,node02:2181,node03:2181");
 
        int run = ToolRunner.run(configuration, new HBaseBulkLoad(), args);
        System.exit(run);
    }
 
    @Override
    public int run(String[] strings) throws Exception {
        Configuration conf = super.getConf();
        Job job = Job.getInstance(conf);
        job.setJarByClass(HBaseBulkLoad.class);
 
        FileInputFormat.addInputPath(job, new Path("hdfs://node01:8020/hbase/input"));
        job.setMapperClass(BulkLoadMapper.class);
        job.setMapOutputKeyClass(ImmutableBytesWritable.class);
        job.setMapOutputValueClass(Put.class);
 
        //获取数据库连接
        Connection connection = ConnectionFactory.createConnection(conf);
        Table table = connection.getTable(TableName.valueOf("myuser2"));
 
        //使MR可以向myuser2表中,增量增加数据
        HFileOutputFormat2.configureIncrementalLoad(job, table, connection.getRegionLocator(TableName.valueOf("myuser2")));
        //数据写回到HDFS,写成HFile -> 所以指定输出格式为HFileOutputFormat2
        job.setOutputFormatClass(HFileOutputFormat2.class);
        HFileOutputFormat2.setOutputPath(job, new Path("hdfs://node01:8020/hbase/out_hfile"));
 
        //开始执行
        boolean b = job.waitForCompletion(true);
 
        return b?0:1;
    }
}
2. 打成jar包提交到集群中运行
代码语言:javascript复制
hadoop jar hbase_demo-1.0-SNAPSHOT.jar xsluo.hbase.HBaseBulkLoad
3. 观察HDFS上输出的结果
4. 加载HFile文件到hbase表中

方式一:代码加载

代码语言:javascript复制
package xsluo.hbase;
 
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
 
public class LoadData {
    public static void main(String[] args) throws Exception {
        Configuration configuration = HBaseConfiguration.create();
        configuration.set("hbase.zookeeper.quorum", "node01,node02,node03");
        //获取数据库连接
        Connection connection =  ConnectionFactory.createConnection(configuration);
        //获取表的管理器对象
        Admin admin = connection.getAdmin();
        //获取table对象
        TableName tableName = TableName.valueOf("myuser2");
        Table table = connection.getTable(tableName);
        //构建LoadIncrementalHFiles加载HFile文件
        LoadIncrementalHFiles load = new LoadIncrementalHFiles(configuration);
        load.doBulkLoad(new Path("hdfs://node01:8020/hbase/out_hfile"), admin,table,connection.getRegionLocator(tableName));
    }
}

方式二:命令加载

先将hbase的jar包添加到hadoop的classpath路径下

代码语言:javascript复制
export HBASE_HOME=/xsluo/install/hbase-1.2.0-cdh5.14.2/
export HADOOP_HOME=/xsluo/install/hadoop-2.6.0-cdh5.14.2/
export HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase mapredcp`
  • 运行命令
代码语言:javascript复制
yarn jar /xsluo/install/hbase-1.2.0-cdh5.14.2/lib/hbase-server-1.2.0-cdh5.14.2.jar   completebulkload /hbase/out_hfile myuser2

加载完后,原文件会被移动到hdfs中此表对应的region的列族的目录下

5. 总结

本文为了演示实战效果,将生成HFile文件和使用BulkLoad方式导入HFile到HBase集群的步骤进行了分解,实际情况中,可以将这两个步骤合并为一个,实现自动化生成与HFile自动导入。如果在执行的过程中出现RpcRetryingCaller的异常,可以到对应RegionServer节点查看日志信息,这里面记录了出现这种异常的详细原因。

0 人点赞