目录
一、HBase BulkLoad介绍
- 前言
- 为什么要用bulkload方式导入?
- bulkload的实现原理
二、HBase BulkLoad批量写入数据实战
- 开发生成HFile文件的代码
- 打成jar包提交到集群中运行
- 观察HDFS上输出的结果
- 加载HFile文件到hbase表中
- 总结
一、HBase BulkLoad介绍
1. 前言
之前我们介绍了HBASE的存储机制,HBASE存储数据其底层使用的是HDFS来作为存储介质,HBASE的每一张表对应的HDFS目录上的一个文件夹,文件夹名是以HBASE表的名字来命名(如果没有使用命名空间,那么默认是在default目录下)。在表文件夹下存放着若干个region命名的文件夹,而region文件夹中的每个列族也是用文件夹进行存储的,每个列族中存储的就是实际的数据,以HFile的形式存在。
路径格式: /hbase/data/default/<tbl_name>/<region_id>//<hfile_id>
2. 为什么要用bulkload方式导入?
在进行数据传输中,批量加载数据到HBase集群有多种方式,比如通过HBase API进行批量写入数据、使用Sqoop工具批量导数到HBase集群、使用MapReduce批量导入等。这些方式,在导入数据的过程中,如果数据量过大,可能耗时会比较严重或者占用HBase集群资源较多(如磁盘IO、HBase Handler数等)。
3. bulkload的实现原理
按照HBase存储数据按照HFile格式存储在HDFS的原理,使用MapReduce直接生成HFile格式的数据文件,然后再通过RegionServer将HFile数据文件移动到相应的Region上去。
- HBase数据正常写流程
- bulkload方式的处理示意图
- bulkload的好处
- 导入过程不占用Region资源
- 能快速导入海量的数据
- 节省内存
二、HBase BulkLoad批量写入数据实战
需求
通过bulkload的方式,将我们放在HDFS上面的这个路径/hbase/input/user.txt的数据文件,转换成HFile格式,然后load到myuser2这张Hbase表里面去。
1. 开发生成HFile文件的代码
自定义map类
代码语言:javascript复制package xsluo.hbase;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
//四个泛型中后两个,分别对应rowkey及put
public class BulkLoadMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] split = value.toString().split("t");
//封装输出的rowkey类型
ImmutableBytesWritable immutableBytesWritable = new ImmutableBytesWritable(split[0].getBytes());
//构建Put对象
Put put = new Put(split[0].getBytes());
put.addColumn("f1".getBytes(), "name".getBytes(), split[1].getBytes());
put.addColumn("f1".getBytes(), "age".getBytes(), split[2].getBytes());
context.write(immutableBytesWritable, put);
}
}
程序main
代码语言:javascript复制package xsluo.hbase;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class HBaseBulkLoad extends Configured implements Tool {
public static void main(String[] args) throws Exception {
Configuration configuration = HBaseConfiguration.create();
//设置ZK集群
configuration.set("hbase.zookeeper.quorum", "node01:2181,node02:2181,node03:2181");
int run = ToolRunner.run(configuration, new HBaseBulkLoad(), args);
System.exit(run);
}
@Override
public int run(String[] strings) throws Exception {
Configuration conf = super.getConf();
Job job = Job.getInstance(conf);
job.setJarByClass(HBaseBulkLoad.class);
FileInputFormat.addInputPath(job, new Path("hdfs://node01:8020/hbase/input"));
job.setMapperClass(BulkLoadMapper.class);
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
job.setMapOutputValueClass(Put.class);
//获取数据库连接
Connection connection = ConnectionFactory.createConnection(conf);
Table table = connection.getTable(TableName.valueOf("myuser2"));
//使MR可以向myuser2表中,增量增加数据
HFileOutputFormat2.configureIncrementalLoad(job, table, connection.getRegionLocator(TableName.valueOf("myuser2")));
//数据写回到HDFS,写成HFile -> 所以指定输出格式为HFileOutputFormat2
job.setOutputFormatClass(HFileOutputFormat2.class);
HFileOutputFormat2.setOutputPath(job, new Path("hdfs://node01:8020/hbase/out_hfile"));
//开始执行
boolean b = job.waitForCompletion(true);
return b?0:1;
}
}
2. 打成jar包提交到集群中运行
代码语言:javascript复制hadoop jar hbase_demo-1.0-SNAPSHOT.jar xsluo.hbase.HBaseBulkLoad
3. 观察HDFS上输出的结果
4. 加载HFile文件到hbase表中
方式一:代码加载
代码语言:javascript复制package xsluo.hbase;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
public class LoadData {
public static void main(String[] args) throws Exception {
Configuration configuration = HBaseConfiguration.create();
configuration.set("hbase.zookeeper.quorum", "node01,node02,node03");
//获取数据库连接
Connection connection = ConnectionFactory.createConnection(configuration);
//获取表的管理器对象
Admin admin = connection.getAdmin();
//获取table对象
TableName tableName = TableName.valueOf("myuser2");
Table table = connection.getTable(tableName);
//构建LoadIncrementalHFiles加载HFile文件
LoadIncrementalHFiles load = new LoadIncrementalHFiles(configuration);
load.doBulkLoad(new Path("hdfs://node01:8020/hbase/out_hfile"), admin,table,connection.getRegionLocator(tableName));
}
}
方式二:命令加载
先将hbase的jar包添加到hadoop的classpath路径下
代码语言:javascript复制export HBASE_HOME=/xsluo/install/hbase-1.2.0-cdh5.14.2/
export HADOOP_HOME=/xsluo/install/hadoop-2.6.0-cdh5.14.2/
export HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase mapredcp`
- 运行命令
yarn jar /xsluo/install/hbase-1.2.0-cdh5.14.2/lib/hbase-server-1.2.0-cdh5.14.2.jar completebulkload /hbase/out_hfile myuser2
加载完后,原文件会被移动到hdfs中此表对应的region的列族的目录下
5. 总结
本文为了演示实战效果,将生成HFile文件和使用BulkLoad方式导入HFile到HBase集群的步骤进行了分解,实际情况中,可以将这两个步骤合并为一个,实现自动化生成与HFile自动导入。如果在执行的过程中出现RpcRetryingCaller的异常,可以到对应RegionServer节点查看日志信息,这里面记录了出现这种异常的详细原因。