温馨提示:要看高清无码套图,请使用手机打开并单击图片放大查看。
Fayson的github:https://github.com/fayson/cdhproject
提示:代码块部分可以左右滑动查看噢
1.文档编写目的
HBase是一款基于Hadoop的Key-Value数据库,提供了对HDFS上数据的高效随机读写服务,填补了Hadoop MapReduce批处理的缺陷,但HBase作为列簇数据库无法轻易的建立“二级索引”、难以执行求和、计数、排序等操作。在HBase0.96版本后引入了协处理器(Coprocessor),用户可以编写运行在HBase Server端的代码。HBase支持两种类型的协处理器,Endpoint和Observer。
Endpoint协处理器类似传统数据库中的存储过程,客户端可以调用这些Endpoint协处理器执行一段Server端代码,并将Server端代码的结果返回给客户端处理。
Observer Coprocessor,这中协处理器类似于传统数据库中的触发器,当发生某些事件的时候,Observer协处理器会被Server端调用。
本篇文章Fayson先不介绍如何去开发协处理器,主要借助于HBase示例中自带的RowCount Endpoint协处理器来说明如何使用Java代码在客户端调用。在后面的文章Fayson会介绍如何去编写一个协处理器。
Endpoint Coprocessor客户端调用过程,如下图所示:
- 内容概述
1.环境准备
2.编写Java示例代码及运行
3.统计方式对比
- 测试环境
1.CM和CDH版本为5.14.3
2.环境准备
HBase中自带的Endpoint的协处理器,在hbase-examples.jar包中,在CDH的/opt/cloudera/parcels/CDH/lib/hbase/lib目录下,如下图所示:
1.确认hbase-examples-1.2.0-cdh5.14.2.jar是否在
代码语言:javascript复制[root@ip-172-31-8-230 lib]# pwd
/opt/cloudera/parcels/CDH/lib/hbase/lib
[root@ip-172-31-8-230 lib]# ll hbase-examples-1.2.0-cdh5.14.2.jar
2.使用HBase的pe生成TestTable测试表及数据
代码语言:javascript复制[root@ip-172-31-8-230 ~]# hbase pe --rows=10000000 randomWrite 1
3.登录Cloudera Manager进入HBase服务进行配置
配置自定义的Endpoint类,因为Endpoint类型的Coprocessor运行在HBase 的RegionServer中,所以这里只需要配置”HBase Coprocessor Region类”
代码语言:javascript复制org.apache.hadoop.hbase.coprocessor.example.RowCountEndpoint
注意:在这里的配置为全局配置,协处理器有两种使用方式上图的方式是其中的一种,另外一种则是对单个表进行修改。
3.编写JAVA示例
1.创建HBase的Maven工程
2.工程的pom.xml文件内容如下
代码语言:javascript复制<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>cdh-project</artifactId>
<groupId>com.cloudera</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>hbase-demo</artifactId>
<packaging>jar</packaging>
<name>hbase-demo</name>
<url>http://maven.apache.org</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.6.0-cdh5.11.2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.6.0-cdh5.11.2</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>1.2.0-cdh5.11.2</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-examples</artifactId>
<version>1.2.0-cdh5.11.2</version>
</dependency>
</dependencies>
</project>
3.编写CoprocessorExample.java类,内容如下
代码语言:javascript复制package com.cloudera.hbase.coprocessor;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.coprocessor.Batch;
import org.apache.hadoop.hbase.coprocessor.example.generated.ExampleProtos;
import org.apache.hadoop.hbase.ipc.BlockingRpcCallback;
import org.apache.hadoop.hbase.util.Bytes;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
/**
* package: com.cloudera.hbase.coprocessor
* describe: 客户端如何调用自定义的corprocessor类,Endpoint类型,该示例代码中介绍了几种调用的方式,以及各种调用方式的效率
* creat_user: Fayson
* email: htechinfo@163.com
* creat_date: 2018/5/9
* creat_time: 下午11:30
* 公众号:Hadoop实操
*/
public class CoprocessorExample {
public static void main(String[] args) {
//初始化HBase配置
Configuration configuration = HBaseConfiguration.create();
configuration.set("hbase.zookeeper.property.clientPort", "2181");
configuration.setStrings("hbase.zookeeper.quorum", "ip-172-31-5-38.ap-southeast-1.compute.internal,ip-172-31-8-230.ap-southeast-1.compute.internal,ip-172-31-5-171.ap-southeast-1.compute.internal");
try {
//创建一个HBase的Connection
Connection connection = ConnectionFactory.createConnection(configuration);
Table testTable = connection.getTable(TableName.valueOf("TestTable"));
execBatchEndpointCoprocessor(testTable);
execEndpointCoprocessor(testTable);
execFastEndpointCoprocessor(testTable);
//关闭连接
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 使用batchCoprocessorService(MethodDescriptor var1, Message var2, byte[] var3, byte[] var4, R var5)的方法调用
* 使用批量的方式,HBase会自动的将属于同一个RegionServer上的请求打包处理,可以节省网络交互的开销,效率会更高
* @param table HBase表名
* @return 返回表的总条数
*/
public static long execBatchEndpointCoprocessor(Table table) {
byte[] s= Bytes.toBytes("00000000000000000000000000");
byte[] e= Bytes.toBytes("00000000000000000000000010");
long start_t = System.currentTimeMillis();
Map<byte[], ExampleProtos.CountResponse> batchMap = null;
try {
batchMap = table.batchCoprocessorService(
ExampleProtos.RowCountService.getDescriptor().findMethodByName("getKeyValueCount"),
ExampleProtos.CountRequest.getDefaultInstance(),
s,
e,
ExampleProtos.CountResponse.getDefaultInstance());
} catch (Throwable throwable) {
throwable.printStackTrace();
}
long batch_count = 0;
System.out.println("Region Size:" batchMap.size());
for (ExampleProtos.CountResponse response : batchMap.values()) {
batch_count = response.getCount();
}
System.out.println("方式一耗时:" (System.currentTimeMillis() - start_t));
System.out.println("方式一统计数量:" batch_count);
return batch_count;
}
/**
* 通过HBase的coprocessorService(Class, byte[],byte[],Batch.Call)方法获取表的条数
* @param table HBase 表对象
* @return 返回表的条数
*/
public static long execEndpointCoprocessor(Table table) {
try {
long start_t = System.currentTimeMillis();
/**
* coprocessorService(Class, byte[],byte[],Batch.Call)方法描述:
* 参数一:Endpoint Coprocessor类,通过设置Endpoint Coprocessor类可以找到Region相应的协处理器
* 参数二和参数三:要调用哪些Region上的服务则有startkey和endkey来决定,通过rowkey范围可以确定多个Region,如果设置为null则为所有的Region
* 参数四:接口类Batch.Call定义如何调用协处理器,通过重写call()方法实现客户端的逻辑
*
* coprocessorService方法返回的是一个Map对象,Map的Key是Region的名字,Value是Batch.Call.call()方法的返回值
*/
Map<byte[] , Long> map = table.coprocessorService(ExampleProtos.RowCountService.class, null, null, new Batch.Call<ExampleProtos.RowCountService, Long>() {
@Override
public Long call(ExampleProtos.RowCountService rowCountService) throws IOException {
ExampleProtos.CountRequest requet = ExampleProtos.CountRequest.getDefaultInstance();
BlockingRpcCallback<ExampleProtos.CountResponse> rpcCallback = new BlockingRpcCallback<>();
rowCountService.getKeyValueCount(null, requet, rpcCallback);
ExampleProtos.CountResponse response = rpcCallback.get();
return response.getCount();
}
});
//对协处理器返回的所有Region的数量累加得出表的总条数
long count = 0;
System.out.println("Region Size:" map.size());
for(Long count_r : map.values()) {
count = count_r;
}
System.out.println("方式二耗时:" (System.currentTimeMillis() - start_t));
System.out.println("方式二统计数量:" count);
} catch (Throwable throwable) {
throwable.printStackTrace();
}
return 0l;
}
/**
* 效率最高的方式,在方式二的基础上优化
* 通过HBase的coprocessorService(Class, byte[],byte[],Batch.Call,Callback<R>)方法获取表的总条数
* @param table HBase表名
* @return 返回表的总条数
*/
public static long execFastEndpointCoprocessor(Table table) {
long start_t = System.currentTimeMillis();
//定义总的 rowCount 变量
AtomicLong totalRowCount = new AtomicLong();
try {
Batch.Callback<Long> callback = new Batch.Callback<Long>() {
@Override
public void update(byte[] region, byte[] row, Long result) {
totalRowCount.getAndAdd(result);
}
};
table.coprocessorService(ExampleProtos.RowCountService.class, null, null, new Batch.Call<ExampleProtos.RowCountService, Long>() {
@Override
public Long call(ExampleProtos.RowCountService rowCountService) throws IOException {
ExampleProtos.CountRequest requet = ExampleProtos.CountRequest.getDefaultInstance();
BlockingRpcCallback<ExampleProtos.CountResponse> rpcCallback = new BlockingRpcCallback<>();
rowCountService.getKeyValueCount(null, requet, rpcCallback);
ExampleProtos.CountResponse response = rpcCallback.get();
return response.getCount();
}
}, callback);
} catch (Throwable throwable) {
throwable.printStackTrace();
}
System.out.println("方式三耗时:" (System.currentTimeMillis() - start_t));
System.out.println("方式三统计数量:" totalRowCount.longValue());
return totalRowCount.longValue();
}
}
4.示例代码运行
4.HBase表统计效率对比
1.使用HBase的count来统计测试表的总条数
代码语言:javascript复制[root@ip-172-31-8-230 ~]# hbase shell
hbase(main):001:0> count 'TestTable', INTERVAL => 1000000, CACHE => 10000
2.使用HBase提供的MapReduce方式统计测试表的总条数
代码语言:javascript复制[root@ip-172-31-8-230 ~]# hbase org.apache.hadoop.hbase.mapreduce.RowCounter TestTable
查看MapReduce耗时38s
3.使用HBase协处理器执行测试表统计
执行耗时:14.12s
耗时统计:
统计方式 | 统计总量 | 耗时(s) |
---|---|---|
HBase count | 6654040 | 88.9 |
HBase MapReduce | 6654040 | 38 |
HBase协处理器 | 6654040 | 14.12 |
5.总结
- 在使用HBase的coprocessor方法是如果传入startkey和endkey是会根据rowkey的访问检索出符合条件的region并统计每个region上数据量。
- HBase的Endpoint Coprocessor协处理器可以通过CM的方式配置全局的也可以通过客户端或hbase shell的方式来指定某一个表使用比较灵活,在后面的文章Fayson会介绍如何指定单个表的方式。
GitHub地址:
https://github.com/fayson/cdhproject/blob/master/hbasedemo/src/main/java/com/cloudera/hbase/coprocessor/CoprocessorExample.java
参考文档:
https://www.ibm.com/developerworks/cn/opensource/os-cn-hbase-coprocessor2/
提示:代码块部分可以左右滑动查看噢
为天地立心,为生民立命,为往圣继绝学,为万世开太平。 温馨提示:要看高清无码套图,请使用手机打开并单击图片放大查看。
推荐关注Hadoop实操,第一时间,分享更多Hadoop干货,欢迎转发和分享。
原创文章,欢迎转载,转载请注明:转载自微信公众号Hadoop实操