文章目录- 一、MyHbaseSink
- 1、继承RichSinkFunction<输入的数据类型>类
- 2、实现open方法,创建连接对象
- 3、实现invoke方法,批次写入数据到Hbase
- 4、实现close方法,关闭连接
- 二、HBaseUtil工具类
一、MyHbaseSink
1、继承RichSinkFunction<输入的数据类型>类
代码语言:javascript
复制public class MyHbaseSink extends RichSinkFunction<Tuple2<String, Double>> {
private transient Integer maxSize = 1000;
private transient Long delayTime = 5000L;
public MyHbaseSink() {
}
public MyHbaseSink(Integer maxSize, Long delayTime) {
this.maxSize = maxSize;
this.delayTime = delayTime;
}
private transient Connection connection;
private transient Long lastInvokeTime;
private transient List<Put> puts = new ArrayList<>(maxSize);
2、实现open方法,创建连接对象
代码语言:javascript
复制 // 创建连接
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
// 获取全局配置文件,并转为ParameterTool
ParameterTool params =
(ParameterTool) getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
//创建一个Hbase的连接
connection = HBaseUtil.getConnection(
params.getRequired("hbase.zookeeper.quorum"),
params.getInt("hbase.zookeeper.property.clientPort", 2181)
);
// 获取系统当前时间
lastInvokeTime = System.currentTimeMillis();
}
3、实现invoke方法,批次写入数据到Hbase
代码语言:javascript
复制 @Override
public void invoke(Tuple2<String, Double> value, Context context) throws Exception {
String rk = value.f0;
//创建put对象,并赋rk值
Put put = new Put(rk.getBytes());
// 添加值:f1->列族, order->属性名 如age, 第三个->属性值 如25
put.addColumn("f1".getBytes(), "order".getBytes(), value.f1.toString().getBytes());
puts.add(put);// 添加put对象到list集合
//使用ProcessingTime
long currentTime = System.currentTimeMillis();
//开始批次提交数据
if (puts.size() == maxSize || currentTime - lastInvokeTime >= delayTime) {
//获取一个Hbase表
Table table = connection.getTable(TableName.valueOf("database:table"));
table.put(puts);//批次提交
puts.clear();
lastInvokeTime = currentTime;
table.close();
}
}
4、实现close方法,关闭连接
代码语言:javascript
复制 @Override
public void close() throws Exception {
connection.close();
}
二、HBaseUtil工具类
- Hbase的工具类,用来创建Hbase的Connection
代码语言:javascript
复制public class HBaseUtil {
/**
* @param zkQuorum zookeeper地址,多个要用逗号分隔
* @param port zookeeper端口号
* @return connection
*/
public static Connection getConnection(String zkQuorum, int port) throws Exception {
Configuration conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum", zkQuorum);
conf.set("hbase.zookeeper.property.clientPort", port "");
Connection connection = ConnectionFactory.createConnection(conf);
return connection;
}
}