昨天群里有人问 Flink 1.12 读取Hbase的问题,于是看到这篇文章分享给大家。本文作者Ashiamd。
1. 环境
废话不多说,这里用到的环境如下(不确定是否都必要,但是至少我是这个环境)
- zookeeper 3.6.2
- Hbase 2.4.0
- Flink 1.12.1
2. HBase表
代码语言:javascript复制# 创建表
create 'u_m_01' , 'u_m_r'
# 插入数据
put 'u_m_01', 'a,A', 'u_m_r:r' , '1'
put 'u_m_01', 'a,B', 'u_m_r:r' , '3'
put 'u_m_01', 'b,B', 'u_m_r:r' , '3'
put 'u_m_01', 'b,C', 'u_m_r:r' , '4'
put 'u_m_01', 'c,A', 'u_m_r:r' , '2'
put 'u_m_01', 'c,C', 'u_m_r:r' , '5'
put 'u_m_01', 'c,D', 'u_m_r:r' , '1'
put 'u_m_01', 'd,B', 'u_m_r:r' , '5'
put 'u_m_01', 'd,D', 'u_m_r:r' , '2'
put 'u_m_01', 'e,A', 'u_m_r:r' , '3'
put 'u_m_01', 'e,B', 'u_m_r:r' , '2'
put 'u_m_01', 'f,A', 'u_m_r:r' , '1'
put 'u_m_01', 'f,B', 'u_m_r:r' , '2'
put 'u_m_01', 'f,D', 'u_m_r:r' , '3'
put 'u_m_01', 'g,C', 'u_m_r:r' , '1'
put 'u_m_01', 'g,D', 'u_m_r:r' , '4'
put 'u_m_01', 'h,A', 'u_m_r:r' , '1'
put 'u_m_01', 'h,B', 'u_m_r:r' , '2'
put 'u_m_01', 'h,C', 'u_m_r:r' , '4'
put 'u_m_01', 'h,D', 'u_m_r:r' , '5'
3. pom依赖
- jdk1.8
- Flink1.12.1 使用的pom依赖如下(有些是多余的)
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>flink-hive-hbase</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<flink.version>1.12.1</flink.version>
<scala.binary.version>2.12</scala.binary.version>
<hive.version>3.1.2</hive.version>
<mysql.version>8.0.19</mysql.version>
<hbase.version>2.4.0</hbase.version>
</properties>
<dependencies>
<!-- Flink -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- HBase -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-hbase-2.2_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>${hbase.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>${hbase.version}</version>
</dependency>
<!-- <!– JDBC –>-->
<!-- <dependency>-->
<!-- <groupId>org.apache.flink</groupId>-->
<!-- <artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId>-->
<!-- <version>${flink.version}</version>-->
<!-- </dependency>-->
<!-- <!– mysql –>-->
<!-- <dependency>-->
<!-- <groupId>mysql</groupId>-->
<!-- <artifactId>mysql-connector-java</artifactId>-->
<!-- <version>${mysql.version}</version>-->
<!-- </dependency>-->
<!-- <!– Hive Dependency –>-->
<!-- <dependency>-->
<!-- <groupId>org.apache.hive</groupId>-->
<!-- <artifactId>hive-exec</artifactId>-->
<!-- <version>${hive.version}</version>-->
<!-- </dependency>-->
<!-- <dependency>-->
<!-- <groupId>org.apache.flink</groupId>-->
<!-- <artifactId>flink-connector-hive_${scala.binary.version}</artifactId>-->
<!-- <version>${flink.version}</version>-->
<!-- </dependency>-->
<!-- Table API -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- csv -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-csv</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- <!– Lombok –>-->
<!-- <dependency>-->
<!-- <groupId>org.projectlombok</groupId>-->
<!-- <artifactId>lombok</artifactId>-->
<!-- <version>1.18.18</version>-->
<!-- </dependency>-->
</dependencies>
</project>
4. Flink-Java代码
用到的pojo类
代码语言:javascript复制package entity;
import java.io.Serializable;
public class UserMovie implements Serializable {
@Override
public String toString() {
return "UserMovie{"
"userId='" userId '''
", movieId='" movieId '''
", ratting=" ratting
'}';
}
public static long getSerialVersionUID() {
return serialVersionUID;
}
public String getUserId() {
return userId;
}
public void setUserId(String userId) {
this.userId = userId;
}
public String getMovieId() {
return movieId;
}
public void setMovieId(String movieId) {
this.movieId = movieId;
}
public Double getRatting() {
return ratting;
}
public void setRatting(Double ratting) {
this.ratting = ratting;
}
public UserMovie() {
}
public UserMovie(String userId, String movieId, Double ratting) {
this.userId = userId;
this.movieId = movieId;
this.ratting = ratting;
}
private static final long serialVersionUID = 256158274329337559L;
private String userId;
private String movieId;
private Double ratting;
}
实际测试代码
代码语言:javascript复制package hbase;
import com.nimbusds.jose.util.IntegerUtils;
import entity.UserMovie;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
import org.apache.flink.util.StringUtils;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Consumer;
public class HBaseTest_01 {
public static void main(String[] args) throws Exception {
// 批执行环境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// 表环境
EnvironmentSettings settings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inBatchMode()
.build();
TableEnvironment tableEnv = TableEnvironment.create(settings);
// 创建用户-电影表 u_m
TableResult tableResult = tableEnv.executeSql(
"CREATE TABLE u_m ("
" rowkey STRING,"
" u_m_r ROW<r STRING>,"
" PRIMARY KEY (rowkey) NOT ENFORCED"
" ) WITH ("
" 'connector' = 'hbase-2.2' ,"
" 'table-name' = 'default:u_m_01' ,"
" 'zookeeper.quorum' = '127.0.0.1:2181'"
" )");
// 查询是否能获取到HBase里的数据
// Table table = tableEnv.sqlQuery("SELECT rowkey, u_m_r FROM u_m");
// 相当于 scan
Table table = tableEnv.sqlQuery("SELECT * FROM u_m");
// 查询的结果
TableResult executeResult = table.execute();
// 获取查询结果
CloseableIterator<Row> collect = executeResult.collect();
// 输出 (执行print或者下面的 Consumer之后,数据就被消费了。两个只能留下一个)
executeResult.print();
List<UserMovie> userMovieList = new ArrayList<>();
collect.forEachRemaining(new Consumer<Row>() {
@Override
public void accept(Row row) {
String field0 = String.valueOf(row.getField(0));
String[] user_movie = field0.split(",");
Double ratting = Double.valueOf(String.valueOf(row.getField(1)));
userMovieList.add(new UserMovie(user_movie[0],user_movie[1],ratting));
}
});
System.out.println("................");
for(UserMovie um : userMovieList){
System.out.println(um);
}
}
}
5. 输出
- 没有注解掉第59行代码executeResult.print();时
-------------------------------- --------------------------------
| rowkey | u_m_r |
-------------------------------- --------------------------------
| a,A | 1 |
| a,B | 3 |
| b,B | 3 |
| b,C | 4 |
| c,A | 2 |
| c,C | 5 |
| c,D | 1 |
| d,B | 5 |
| d,D | 2 |
| e,A | 3 |
| e,B | 2 |
| f,A | 1 |
| f,B | 2 |
| f,D | 3 |
| g,C | 1 |
| g,D | 4 |
| h,A | 1 |
| h,B | 2 |
| h,C | 4 |
| h,D | 5 |
-------------------------------- --------------------------------
20 rows in set
................
- 注解掉第59行代码executeResult.print();时
................
UserMovie{userId='a', movieId='A', ratting=1.0}
UserMovie{userId='a', movieId='B', ratting=3.0}
UserMovie{userId='b', movieId='B', ratting=3.0}
UserMovie{userId='b', movieId='C', ratting=4.0}
UserMovie{userId='c', movieId='A', ratting=2.0}
UserMovie{userId='c', movieId='C', ratting=5.0}
UserMovie{userId='c', movieId='D', ratting=1.0}
UserMovie{userId='d', movieId='B', ratting=5.0}
UserMovie{userId='d', movieId='D', ratting=2.0}
UserMovie{userId='e', movieId='A', ratting=3.0}
UserMovie{userId='e', movieId='B', ratting=2.0}
UserMovie{userId='f', movieId='A', ratting=1.0}
UserMovie{userId='f', movieId='B', ratting=2.0}
UserMovie{userId='f', movieId='D', ratting=3.0}
UserMovie{userId='g', movieId='C', ratting=1.0}
UserMovie{userId='g', movieId='D', ratting=4.0}
UserMovie{userId='h', movieId='A', ratting=1.0}
UserMovie{userId='h', movieId='B', ratting=2.0}
UserMovie{userId='h', movieId='C', ratting=4.0}
UserMovie{userId='h', movieId='D', ratting=5.0}
注意
这里我们在Flink在SQL里面定义HBase的Table时,指定的字段都是用的STRING类型,虽然本来应该是INT,但是用INT的时候,报错了,改成INT就ok了。