Flink写出数据到HBase的Sink实现

2020-08-11 11:20:33 浏览数 (1)

文章目录
  • 一、MyHbaseSink
    • 1、继承RichSinkFunction<输入的数据类型>类
    • 2、实现open方法,创建连接对象
    • 3、实现invoke方法,批次写入数据到Hbase
    • 4、实现close方法,关闭连接
  • 二、HBaseUtil工具类

一、MyHbaseSink

1、继承RichSinkFunction<输入的数据类型>类
代码语言:javascript复制
public class MyHbaseSink extends RichSinkFunction<Tuple2<String, Double>> {
    private transient Integer maxSize = 1000;
    private transient Long delayTime = 5000L;

    public MyHbaseSink() {
    }

    public MyHbaseSink(Integer maxSize, Long delayTime) {
        this.maxSize = maxSize;
        this.delayTime = delayTime;
    }

    private transient Connection connection;
    private transient Long lastInvokeTime;
    private transient List<Put> puts = new ArrayList<>(maxSize);
2、实现open方法,创建连接对象
代码语言:javascript复制
 // 创建连接
    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        // 获取全局配置文件,并转为ParameterTool
        ParameterTool params =
                (ParameterTool) getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
        //创建一个Hbase的连接
        connection = HBaseUtil.getConnection(
                params.getRequired("hbase.zookeeper.quorum"),
                params.getInt("hbase.zookeeper.property.clientPort", 2181)
        );
        // 获取系统当前时间
        lastInvokeTime = System.currentTimeMillis();
    }
3、实现invoke方法,批次写入数据到Hbase
代码语言:javascript复制
 @Override
    public void invoke(Tuple2<String, Double> value, Context context) throws Exception {
        String rk = value.f0;
        //创建put对象,并赋rk值
        Put put = new Put(rk.getBytes());
        // 添加值:f1->列族, order->属性名 如age, 第三个->属性值 如25
        put.addColumn("f1".getBytes(), "order".getBytes(), value.f1.toString().getBytes());
        puts.add(put);// 添加put对象到list集合
        //使用ProcessingTime
        long currentTime = System.currentTimeMillis();
        //开始批次提交数据
        if (puts.size() == maxSize || currentTime - lastInvokeTime >= delayTime) {
            //获取一个Hbase表
            Table table = connection.getTable(TableName.valueOf("database:table"));
            table.put(puts);//批次提交
            puts.clear();
            lastInvokeTime = currentTime;
            table.close();
        }
    }
4、实现close方法,关闭连接
代码语言:javascript复制
    @Override
    public void close() throws Exception {
        connection.close();
    }

二、HBaseUtil工具类

  • Hbase的工具类,用来创建Hbase的Connection
代码语言:javascript复制
public class HBaseUtil {
    /**
     * @param zkQuorum zookeeper地址,多个要用逗号分隔
     * @param port     zookeeper端口号
     * @return connection
     */
    public static Connection getConnection(String zkQuorum, int port) throws Exception {
        Configuration conf = HBaseConfiguration.create();
        conf.set("hbase.zookeeper.quorum", zkQuorum);
        conf.set("hbase.zookeeper.property.clientPort", port   "");

        Connection connection = ConnectionFactory.createConnection(conf);
        return connection;
    }
}

0 人点赞