实操 | Flink1.12.1通过Table API / Flink SQL读取HBase2.4.0

2021-05-07 10:49:07 浏览数 (1)

昨天群里有人问 Flink 1.12 读取Hbase的问题,于是看到这篇文章分享给大家。本文作者Ashiamd。

1. 环境

废话不多说,这里用到的环境如下(不确定是否都必要,但是至少我是这个环境)

  • zookeeper 3.6.2
  • Hbase 2.4.0
  • Flink 1.12.1
2. HBase表
代码语言:javascript复制
# 创建表
create 'u_m_01' , 'u_m_r'

# 插入数据
put 'u_m_01', 'a,A', 'u_m_r:r' , '1'
put 'u_m_01', 'a,B', 'u_m_r:r' , '3'
put 'u_m_01', 'b,B', 'u_m_r:r' , '3'
put 'u_m_01', 'b,C', 'u_m_r:r' , '4'
put 'u_m_01', 'c,A', 'u_m_r:r' , '2'
put 'u_m_01', 'c,C', 'u_m_r:r' , '5'
put 'u_m_01', 'c,D', 'u_m_r:r' , '1'
put 'u_m_01', 'd,B', 'u_m_r:r' , '5'
put 'u_m_01', 'd,D', 'u_m_r:r' , '2'
put 'u_m_01', 'e,A', 'u_m_r:r' , '3'
put 'u_m_01', 'e,B', 'u_m_r:r' , '2'
put 'u_m_01', 'f,A', 'u_m_r:r' , '1'
put 'u_m_01', 'f,B', 'u_m_r:r' , '2'
put 'u_m_01', 'f,D', 'u_m_r:r' , '3'
put 'u_m_01', 'g,C', 'u_m_r:r' , '1'
put 'u_m_01', 'g,D', 'u_m_r:r' , '4'
put 'u_m_01', 'h,A', 'u_m_r:r' , '1'
put 'u_m_01', 'h,B', 'u_m_r:r' , '2'
put 'u_m_01', 'h,C', 'u_m_r:r' , '4'
put 'u_m_01', 'h,D', 'u_m_r:r' , '5'
3. pom依赖
  • jdk1.8
  • Flink1.12.1 使用的pom依赖如下(有些是多余的)
代码语言:javascript复制
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>flink-hive-hbase</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <flink.version>1.12.1</flink.version>
        <scala.binary.version>2.12</scala.binary.version>
        <hive.version>3.1.2</hive.version>
        <mysql.version>8.0.19</mysql.version>
        <hbase.version>2.4.0</hbase.version>
    </properties>


    <dependencies>

        <!-- Flink -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <!-- HBase -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-hbase-2.2_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-server</artifactId>
            <version>${hbase.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-client</artifactId>
            <version>${hbase.version}</version>
        </dependency>


        <!--        &lt;!&ndash; JDBC &ndash;&gt;-->
<!--        <dependency>-->
<!--            <groupId>org.apache.flink</groupId>-->
<!--            <artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId>-->
<!--            <version>${flink.version}</version>-->
<!--        </dependency>-->

<!--        &lt;!&ndash; mysql &ndash;&gt;-->
<!--        <dependency>-->
<!--            <groupId>mysql</groupId>-->
<!--            <artifactId>mysql-connector-java</artifactId>-->
<!--            <version>${mysql.version}</version>-->
<!--        </dependency>-->

<!--        &lt;!&ndash; Hive Dependency &ndash;&gt;-->
<!--        <dependency>-->
<!--            <groupId>org.apache.hive</groupId>-->
<!--            <artifactId>hive-exec</artifactId>-->
<!--            <version>${hive.version}</version>-->
<!--        </dependency>-->
<!--        <dependency>-->
<!--            <groupId>org.apache.flink</groupId>-->
<!--            <artifactId>flink-connector-hive_${scala.binary.version}</artifactId>-->
<!--            <version>${flink.version}</version>-->
<!--        </dependency>-->

        <!-- Table API -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <!-- csv -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-csv</artifactId>
            <version>${flink.version}</version>
        </dependency>

<!--        &lt;!&ndash; Lombok &ndash;&gt;-->
<!--        <dependency>-->
<!--            <groupId>org.projectlombok</groupId>-->
<!--            <artifactId>lombok</artifactId>-->
<!--            <version>1.18.18</version>-->
<!--        </dependency>-->

    </dependencies>
</project>
4. Flink-Java代码

用到的pojo类

代码语言:javascript复制
package entity;
import java.io.Serializable;

public class UserMovie implements Serializable {
    @Override
    public String toString() {
        return "UserMovie{"  
                "userId='"   userId   '''  
                ", movieId='"   movieId   '''  
                ", ratting="   ratting  
                '}';
    }

    public static long getSerialVersionUID() {
        return serialVersionUID;
    }

    public String getUserId() {
        return userId;
    }

    public void setUserId(String userId) {
        this.userId = userId;
    }

    public String getMovieId() {
        return movieId;
    }

    public void setMovieId(String movieId) {
        this.movieId = movieId;
    }

    public Double getRatting() {
        return ratting;
    }

    public void setRatting(Double ratting) {
        this.ratting = ratting;
    }

    public UserMovie() {
    }

    public UserMovie(String userId, String movieId, Double ratting) {
        this.userId = userId;
        this.movieId = movieId;
        this.ratting = ratting;
    }

    private static final long serialVersionUID = 256158274329337559L;

    private String userId;

    private String movieId;

    private Double ratting;

}

实际测试代码

代码语言:javascript复制
package hbase;

import com.nimbusds.jose.util.IntegerUtils;
import entity.UserMovie;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
import org.apache.flink.util.StringUtils;

import java.util.ArrayList;
import java.util.List;
import java.util.function.Consumer;

public class HBaseTest_01   {
    public static void main(String[] args) throws Exception {
        // 批执行环境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        // 表环境
        EnvironmentSettings settings = EnvironmentSettings.newInstance()
                .useBlinkPlanner()
                .inBatchMode()
                .build();
        TableEnvironment tableEnv = TableEnvironment.create(settings);

        // 创建用户-电影表 u_m
        TableResult tableResult = tableEnv.executeSql(
                "CREATE TABLE u_m ("  
                        " rowkey STRING,"  
                        " u_m_r ROW<r STRING>,"  
                        " PRIMARY KEY (rowkey) NOT ENFORCED"  
                        " ) WITH ("  
                        " 'connector' = 'hbase-2.2' ,"  
                        " 'table-name' = 'default:u_m_01' ,"  
                        " 'zookeeper.quorum' = '127.0.0.1:2181'"  
                        " )");

        // 查询是否能获取到HBase里的数据
//        Table table = tableEnv.sqlQuery("SELECT rowkey, u_m_r FROM u_m");

        // 相当于 scan
        Table table = tableEnv.sqlQuery("SELECT * FROM u_m");

        // 查询的结果
        TableResult executeResult = table.execute();

        // 获取查询结果
        CloseableIterator<Row> collect = executeResult.collect();

        // 输出 (执行print或者下面的 Consumer之后,数据就被消费了。两个只能留下一个)
        executeResult.print();

        List<UserMovie> userMovieList = new ArrayList<>();

        collect.forEachRemaining(new Consumer<Row>() {
            @Override
            public void accept(Row row) {
                String field0 = String.valueOf(row.getField(0));
                String[] user_movie = field0.split(",");
                Double ratting = Double.valueOf(String.valueOf(row.getField(1)));
                userMovieList.add(new UserMovie(user_movie[0],user_movie[1],ratting));
            }
        });


        System.out.println("................");

        for(UserMovie um : userMovieList){
            System.out.println(um);
        }
    }
}
5. 输出
  1. 没有注解掉第59行代码executeResult.print();时
代码语言:javascript复制
 -------------------------------- -------------------------------- 
|                         rowkey |                          u_m_r |
 -------------------------------- -------------------------------- 
|                            a,A |                              1 |
|                            a,B |                              3 |
|                            b,B |                              3 |
|                            b,C |                              4 |
|                            c,A |                              2 |
|                            c,C |                              5 |
|                            c,D |                              1 |
|                            d,B |                              5 |
|                            d,D |                              2 |
|                            e,A |                              3 |
|                            e,B |                              2 |
|                            f,A |                              1 |
|                            f,B |                              2 |
|                            f,D |                              3 |
|                            g,C |                              1 |
|                            g,D |                              4 |
|                            h,A |                              1 |
|                            h,B |                              2 |
|                            h,C |                              4 |
|                            h,D |                              5 |
 -------------------------------- -------------------------------- 
20 rows in set
................
  1. 注解掉第59行代码executeResult.print();时
代码语言:javascript复制
................
UserMovie{userId='a', movieId='A', ratting=1.0}
UserMovie{userId='a', movieId='B', ratting=3.0}
UserMovie{userId='b', movieId='B', ratting=3.0}
UserMovie{userId='b', movieId='C', ratting=4.0}
UserMovie{userId='c', movieId='A', ratting=2.0}
UserMovie{userId='c', movieId='C', ratting=5.0}
UserMovie{userId='c', movieId='D', ratting=1.0}
UserMovie{userId='d', movieId='B', ratting=5.0}
UserMovie{userId='d', movieId='D', ratting=2.0}
UserMovie{userId='e', movieId='A', ratting=3.0}
UserMovie{userId='e', movieId='B', ratting=2.0}
UserMovie{userId='f', movieId='A', ratting=1.0}
UserMovie{userId='f', movieId='B', ratting=2.0}
UserMovie{userId='f', movieId='D', ratting=3.0}
UserMovie{userId='g', movieId='C', ratting=1.0}
UserMovie{userId='g', movieId='D', ratting=4.0}
UserMovie{userId='h', movieId='A', ratting=1.0}
UserMovie{userId='h', movieId='B', ratting=2.0}
UserMovie{userId='h', movieId='C', ratting=4.0}
UserMovie{userId='h', movieId='D', ratting=5.0}
注意

这里我们在Flink在SQL里面定义HBase的Table时,指定的字段都是用的STRING类型,虽然本来应该是INT,但是用INT的时候,报错了,改成INT就ok了。

0 人点赞