随着 Flink Table & SQL的发展,Flink SQL中用于进行维表Join也成为了很多场景的选择。
基于之前的总结,再次总结下Flink Table & SQL 中维表Join的实现方式,包括DataStream中的维表Join。
- 定时加载维度数据
- Distributed Cache(分布式缓存)
- Async IO(异步IO)
- Broadcast State(广播状态)
- UDTF LATERAL TABLE语法
- LookupableTableSource
定时加载维度数据
实现方式
- 实现RichFlatMapFunction, 在open()方法中起个线程定时读取维度数据并加载到内存。
- 在flatMap()方法中实现维度关联。
代码示例
代码语言:javascript复制package com.bigdata.flink.dimJoin;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.Collector;
import java.sql.*;
import java.util.HashMap;
import java.util.Timer;
import java.util.TimerTask;
/**
* 定时加载维度数据到内存
*/
@Slf4j
public class DimRichFlatMapFunction extends RichFlatMapFunction<UserBrowseLog, Tuple2<UserBrowseLog, UserInfo>> {
private final String url;
private final String user;
private final String passwd;
private final Integer reloadInterval;
private Connection connection;
private final String JDBC_DRIVER = "com.mysql.cj.jdbc.Driver";
HashMap dimInfo = new HashMap<String, UserInfo>();
public DimRichFlatMapFunction(String url, String user, String passwd, Integer reloadInterval) {
this.url = url;
this.user = user;
this.passwd = passwd;
this.reloadInterval = reloadInterval;
}
/**
* 打开连接
* 定时加载维度数据
*
* @param parameters
* @throws Exception
*/
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
Class.forName(JDBC_DRIVER);
TimerTask timerTask = new TimerTask() {
@Override
public void run() {
try {
if (connection == null || connection.isClosed()) {
log.warn("No connection. Trying to reconnect...");
connection = DriverManager.getConnection(url, user, passwd);
}
String sql = "select uid,name,age,address from t_user_info";
PreparedStatement preparedStatement = connection.prepareStatement(sql);
ResultSet resultSet = preparedStatement.executeQuery();
while (resultSet.next()) {
UserInfo userInfo = new UserInfo();
userInfo.setUid(resultSet.getString("uid"));
userInfo.setName(resultSet.getString("name"));
userInfo.setAge(resultSet.getInt("age"));
userInfo.setAddress(resultSet.getString("address"));
dimInfo.put(userInfo.getUid(), userInfo);
}
} catch (SQLException e) {
log.error("Get dimension data exception...", e);
}
}
};
Timer timer = new Timer();
timer.scheduleAtFixedRate(timerTask, 0, reloadInterval * 1000);
}
/**
* 关闭连接
*/
@Override
public void close() throws Exception {
super.close();
if (connection != null) {
connection.close();
}
}
/**
* 维度关联
* @param value
* @param out
* @throws Exception
*/
@Override
public void flatMap(UserBrowseLog value, Collector<Tuple2<UserBrowseLog, UserInfo>> out) throws Exception {
String userID = value.getUserID();
if (dimInfo.containsKey(userID)) {
UserInfo dim = (UserInfo) dimInfo.get(userID);
out.collect(new Tuple2<>(value, dim));
}
}
}
- 注意
- 由于数据会存储在内存中,因此,仅支持小数据量维表。
- 定时加载,仅适用于更新不太频繁的维表。
Distributed Cache(分布式缓存)
实现方式
- 通过env.registerCachedFile(cachedFilePath, cachedFileName)注册本地或HDFS缓存文件。
- 程序启动时,Flink会自动将文件分发到TaskManager文件系统中。
- 实现RichFlatMapFunction,在open()方法中通过RuntimeContext获取缓存文件并解析。
- 解析后的数据在内存中,此时可在flatMap()方法中实现维度关联。
代码示例
代码语言:javascript复制package com.bigdata.flink.dimJoin;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.FileUtils;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import java.io.File;
import java.util.HashMap;
import java.util.List;
/**
* 通过Distributed Cache实现维度关联
*/
@Slf4j
public class DistributedCacheJoinDim {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 注册缓存文件 如: file:///some/path 或 hdfs://host:port/and/path
String cachedFilePath = "./user_info.txt";
String cachedFileName = "user_info";
env.registerCachedFile(cachedFilePath, cachedFileName);
// 添加实时流
DataStreamSource<Tuple2<String, String>> stream = env.fromElements(
Tuple2.of("1", "click"),
Tuple2.of("2", "click"),
Tuple2.of("3", "browse"));
// 关联维度
SingleOutputStreamOperator<String> dimedStream = stream.flatMap(new RichFlatMapFunction<Tuple2<String, String>, String>() {
HashMap dimInfo = new HashMap<String, Integer>();
// 读取文件
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
File cachedFile = getRuntimeContext().getDistributedCache().getFile(cachedFileName);
List<String> lines = FileUtils.readLines(cachedFile);
for (String line : lines) {
String[] split = line.split(",");
dimInfo.put(split[0], Integer.valueOf(split[1]));
}
}
// 关联维度
@Override
public void flatMap(Tuple2<String, String> value, Collector<String> out) throws Exception {
if (dimInfo.containsKey(value.f0)) {
Integer age = (Integer) dimInfo.get(value.f0);
out.collect(value.f0 "," value.f1 "," age);
}
}
});
dimedStream.print();
env.execute();
}
}
- 注意
- 由于数据会存储在内存中,因此,仅支持小数据量维表。
- 启动时加载,在维表变化时,需要重启任务。
Distributed Cache(分布式缓存)
实现方式
- 通过env.registerCachedFile(cachedFilePath, cachedFileName)注册本地或HDFS缓存文件。
- 程序启动时,Flink会自动将文件分发到TaskManager文件系统中。
- 实现RichFlatMapFunction,在open()方法中通过RuntimeContext获取缓存文件并解析。
- 解析后的数据在内存中,此时可在flatMap()方法中实现维度关联。
代码示例
代码语言:javascript复制package com.bigdata.flink.dimJoin;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.FileUtils;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import java.io.File;
import java.util.HashMap;
import java.util.List;
/**
* Author: Wang Pei
* Summary:
* 通过Distributed Cache实现维度关联
*/
@Slf4j
public class DistributedCacheJoinDim {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 注册缓存文件 如: file:///some/path 或 hdfs://host:port/and/path
String cachedFilePath = "./user_info.txt";
String cachedFileName = "user_info";
env.registerCachedFile(cachedFilePath, cachedFileName);
// 添加实时流
DataStreamSource<Tuple2<String, String>> stream = env.fromElements(
Tuple2.of("1", "click"),
Tuple2.of("2", "click"),
Tuple2.of("3", "browse"));
// 关联维度
SingleOutputStreamOperator<String> dimedStream = stream.flatMap(new RichFlatMapFunction<Tuple2<String, String>, String>() {
HashMap dimInfo = new HashMap<String, Integer>();
// 读取文件
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
File cachedFile = getRuntimeContext().getDistributedCache().getFile(cachedFileName);
List<String> lines = FileUtils.readLines(cachedFile);
for (String line : lines) {
String[] split = line.split(",");
dimInfo.put(split[0], Integer.valueOf(split[1]));
}
}
// 关联维度
@Override
public void flatMap(Tuple2<String, String> value, Collector<String> out) throws Exception {
if (dimInfo.containsKey(value.f0)) {
Integer age = (Integer) dimInfo.get(value.f0);
out.collect(value.f0 "," value.f1 "," age);
}
}
});
dimedStream.print();
env.execute();
}
}
- 注意
- 由于数据会存储在内存中,因此,仅支持小数据量维表。
- 启动时加载,在维表变化时,需要重启任务。
Async IO(异步IO)
实现方式
- 维度数据在外部存储中,如ES、Redis、HBase中。
- 通过异步IO查询维度数据
- 结合本地缓存如Guava Cache 减少对外部存储的访问。
代码示例
代码语言:javascript复制
package com.bigdata.flink;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.*;
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
/**
* 用Async I/O实现流表与维表Join
*/
public class FlinkAsyncIO {
public static void main(String[] args) throws Exception{
/**解析命令行参数*/
ParameterTool parameterTool = ParameterTool.fromArgs(args);
String kafkaBootstrapServers = parameterTool.get("kafka.bootstrap.servers");
String kafkaGroupID = parameterTool.get("kafka.group.id");
String kafkaAutoOffsetReset= parameterTool.get("kafka.auto.offset.reset");
String kafkaTopic = parameterTool.get("kafka.topic");
int kafkaParallelism =parameterTool.getInt("kafka.parallelism");
String esHost= parameterTool.get("es.host");
Integer esPort= parameterTool.getInt("es.port");
String esUser = parameterTool.get("es.user");
String esPassword = parameterTool.get("es.password");
String esIndex = parameterTool.get("es.index");
String esType = parameterTool.get("es.type");
/**Flink DataStream 运行环境*/
Configuration config = new Configuration();
config.setInteger(RestOptions.PORT,8081);
config.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true);
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(config);
/**添加数据源*/
Properties kafkaProperties = new Properties();
kafkaProperties.put("bootstrap.servers",kafkaBootstrapServers);
kafkaProperties.put("group.id",kafkaGroupID);
kafkaProperties.put("auto.offset.reset",kafkaAutoOffsetReset);
FlinkKafkaConsumer010<String> kafkaConsumer = new FlinkKafkaConsumer010<>(kafkaTopic, new SimpleStringSchema(), kafkaProperties);
kafkaConsumer.setCommitOffsetsOnCheckpoints(true);
SingleOutputStreamOperator<String> source = env.addSource(kafkaConsumer).name("KafkaSource").setParallelism(kafkaParallelism);
//数据转换
SingleOutputStreamOperator<Tuple4<String, String, String, Integer>> sourceMap = source.map((MapFunction<String, Tuple4<String, String, String, Integer>>) value -> {
Tuple4<String, String, String, Integer> output = new Tuple4<>();
try {
JSONObject obj = JSON.parseObject(value);
output.f0 = obj.getString("userID");
output.f1 = obj.getString("eventTime");
output.f2 = obj.getString("eventType");
output.f3 = obj.getInteger("productID");
} catch (Exception e) {
e.printStackTrace();
}
return output;
}).returns(new TypeHint<Tuple4<String, String, String, Integer>>(){}).name("Map: ExtractTransform");
//过滤掉异常数据
SingleOutputStreamOperator<Tuple4<String, String, String, Integer>> sourceFilter = sourceMap.filter((FilterFunction<Tuple4<String, String, String, Integer>>) value -> value.f3 != null).name("Filter: FilterExceptionData");
//Timeout: 超时时间 默认异步I/O请求超时时,会引发异常并重启或停止作业。 如果要处理超时,可以重写AsyncFunction#timeout方法。
//Capacity: 并发请求数量
/**Async IO实现流表与维表Join*/
SingleOutputStreamOperator<Tuple5<String, String, String, Integer, Integer>> result = AsyncDataStream.orderedWait(sourceFilter, new ElasticsearchAsyncFunction(esHost,esPort,esUser,esPassword,esIndex,esType), 500, TimeUnit.MILLISECONDS, 10).name("Join: JoinWithDim");
/**结果输出*/
result.print().name("PrintToConsole");
env.execute();
}
}
其中的 ElasticsearchAsyncFunction:
代码语言:javascript复制package com.bigdata.flink;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.RemovalListener;
import com.google.common.cache.RemovalNotification;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
/**
* 自定义ElasticsearchAsyncFunction,实现从ES中查询维度数据
*/
public class ElasticsearchAsyncFunction extends RichAsyncFunction<Tuple4<String, String, String, Integer>, Tuple5<String, String, String, Integer,Integer>> {
private String host;
private Integer port;
private String user;
private String password;
private String index;
private String type;
public ElasticsearchAsyncFunction(String host, Integer port, String user, String password, String index, String type) {
this.host = host;
this.port = port;
this.user = user;
this.password = password;
this.index = index;
this.type = type;
}
private RestHighLevelClient restHighLevelClient;
private Cache<String,Integer> cache;
/**
* 和ES建立连接
* @param parameters
*/
@Override
public void open(Configuration parameters){
//ES Client
CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(user, password));
restHighLevelClient = new RestHighLevelClient(
RestClient
.builder(new HttpHost(host, port))
.setHttpClientConfigCallback(httpAsyncClientBuilder -> httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider)));
//初始化缓存
cache=CacheBuilder.newBuilder().maximumSize(2).expireAfterAccess(5, TimeUnit.MINUTES).build();
}
/**
* 关闭连接
* @throws Exception
*/
@Override
public void close() throws Exception {
restHighLevelClient.close();
}
/**
* 异步调用
* @param input
* @param resultFuture
*/
@Override
public void asyncInvoke(Tuple4<String, String, String, Integer> input, ResultFuture<Tuple5<String, String, String, Integer, Integer>> resultFuture) {
// 1、先从缓存中取
Integer cachedValue = cache.getIfPresent(input.f0);
if(cachedValue !=null){
System.out.println("从缓存中获取到维度数据: key=" input.f0 ",value=" cachedValue);
resultFuture.complete(Collections.singleton(new Tuple5<>(input.f0,input.f1,input.f2,input.f3,cachedValue)));
// 2、缓存中没有,则从外部存储获取
}else {
searchFromES(input,resultFuture);
}
}
/**
* 当缓存中没有数据时,从外部存储ES中获取
* @param input
* @param resultFuture
*/
private void searchFromES(Tuple4<String, String, String, Integer> input, ResultFuture<Tuple5<String, String, String, Integer, Integer>> resultFuture){
// 1、构造输出对象
Tuple5<String, String, String, Integer, Integer> output = new Tuple5<>();
output.f0=input.f0;
output.f1=input.f1;
output.f2=input.f2;
output.f3=input.f3;
// 2、待查询的Key
String dimKey = input.f0;
// 3、构造Ids Query
SearchRequest searchRequest = new SearchRequest();
searchRequest.indices(index);
searchRequest.types(type);
searchRequest.source(SearchSourceBuilder.searchSource().query(QueryBuilders.idsQuery().addIds(dimKey)));
// 4、用异步客户端查询数据
restHighLevelClient.searchAsync(searchRequest, new ActionListener<SearchResponse>() {
//成功响应时处理
@Override
public void onResponse(SearchResponse searchResponse) {
SearchHit[] searchHits = searchResponse.getHits().getHits();
if(searchHits.length >0 ){
JSONObject obj = JSON.parseObject(searchHits[0].getSourceAsString());
Integer dimValue=obj.getInteger("age");
output.f4=dimValue;
cache.put(dimKey,dimValue);
System.out.println("将维度数据放入缓存: key=" dimKey ",value=" dimValue);
}
resultFuture.complete(Collections.singleton(output));
}
//响应失败时处理
@Override
public void onFailure(Exception e) {
output.f4=null;
resultFuture.complete(Collections.singleton(output));
}
});
}
//超时时处理
@Override
public void timeout(Tuple4<String, String, String, Integer> input, ResultFuture<Tuple5<String, String, String, Integer, Integer>> resultFuture) {
searchFromES(input,resultFuture);
}
}
- 注意
- 此方式不受限于内存,可支持数据量较大的维度数据。
- 需要外部存储支持。
- 应尽量减少对外部存储访问。
Broadcast State
实现方式
- 将维度数据发送到Kafka作为流S1。事实数据是流S2。
- 定义状态描述符MapStateDescriptor,如descriptor。
- 结合状态描述符,将S1广播出去,如S1.broadcast(descriptor),形成广播流(BroadcastStream) B1。
- 事实流S2和广播流B1连接,形成连接后的流BroadcastConnectedStream BC。
- 基于BC流,在KeyedBroadcastProcessFunction/BroadcastProcessFunction中实现Join的逻辑处理。
代码示例
代码语言:javascript复制package com.bigdata.flink;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.*;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.shaded.guava18.com.google.common.collect.Maps;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.*;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.util.Collector;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
/**
* 基于Broadcast State 动态更新配置以实现实时过滤数据并增加字段
*/
@Slf4j
public class TestBroadcastState {
public static void main(String[] args) throws Exception{
//1、解析命令行参数
ParameterTool fromArgs = ParameterTool.fromArgs(args);
ParameterTool parameterTool = ParameterTool.fromPropertiesFile(fromArgs.getRequired("applicationProperties"));
//checkpoint配置
String checkpointDirectory = parameterTool.getRequired("checkpointDirectory");
long checkpointSecondInterval = parameterTool.getLong("checkpointSecondInterval");
//事件流配置
String fromKafkaBootstrapServers = parameterTool.getRequired("fromKafka.bootstrap.servers");
String fromKafkaGroupID = parameterTool.getRequired("fromKafka.group.id");
String fromKafkaTopic = parameterTool.getRequired("fromKafka.topic");
//配置流配置
String fromMysqlHost = parameterTool.getRequired("fromMysql.host");
int fromMysqlPort = parameterTool.getInt("fromMysql.port");
String fromMysqlDB = parameterTool.getRequired("fromMysql.db");
String fromMysqlUser = parameterTool.getRequired("fromMysql.user");
String fromMysqlPasswd = parameterTool.getRequired("fromMysql.passwd");
int fromMysqlSecondInterval = parameterTool.getInt("fromMysql.secondInterval");
//2、配置运行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//设置StateBackend
env.setStateBackend((StateBackend) new FsStateBackend(checkpointDirectory, true));
//设置Checkpoint
CheckpointConfig checkpointConfig = env.getCheckpointConfig();
checkpointConfig.setCheckpointInterval(checkpointSecondInterval * 1000);
checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
//3、Kafka事件流
//从Kafka中获取事件数据
//数据:某个用户在某个时刻浏览或点击了某个商品,如
//{"userID": "user_3", "eventTime": "2019-08-17 12:19:47", "eventType": "browse", "productID": 1}
Properties kafkaProperties = new Properties();
kafkaProperties.put("bootstrap.servers",fromKafkaBootstrapServers);
kafkaProperties.put("group.id",fromKafkaGroupID);
FlinkKafkaConsumer010<String> kafkaConsumer = new FlinkKafkaConsumer010<>(fromKafkaTopic, new SimpleStringSchema(), kafkaProperties);
kafkaConsumer.setStartFromLatest();
DataStream<String> kafkaSource = env.addSource(kafkaConsumer).name("KafkaSource").uid("source-id-kafka-source");
SingleOutputStreamOperator<Tuple4<String, String, String, Integer>> eventStream = kafkaSource.process(new ProcessFunction<String, Tuple4<String, String, String, Integer>>() {
@Override
public void processElement(String value, Context ctx, Collector<Tuple4<String, String, String, Integer>> out){
try {
JSONObject obj = JSON.parseObject(value);
String userID = obj.getString("userID");
String eventTime = obj.getString("eventTime");
String eventType = obj.getString("eventType");
int productID = obj.getIntValue("productID");
out.collect(new Tuple4<>(userID, eventTime, eventType, productID));
}catch (Exception ex){
log.warn("异常数据:{}",value,ex);
}
}
});
//4、Mysql配置流
//自定义Mysql Source,周期性地从Mysql中获取配置,并广播出去
//数据: 用户ID,用户姓名,用户年龄
DataStreamSource<HashMap<String, Tuple2<String, Integer>>> configStream = env.addSource(new MysqlSource(fromMysqlHost, fromMysqlPort, fromMysqlDB, fromMysqlUser, fromMysqlPasswd, fromMysqlSecondInterval));
/*
(1) 先建立MapStateDescriptor
MapStateDescriptor定义了状态的名称、Key和Value的类型。
这里,MapStateDescriptor中,key是Void类型,value是Map<String, Tuple2<String,Int>>类型。
*/
MapStateDescriptor<Void, Map<String, Tuple2<String,Integer>>> configDescriptor = new MapStateDescriptor<>("config", Types.VOID, Types.MAP(Types.STRING, Types.TUPLE(Types.STRING, Types.INT)));
/*
(2) 将配置流广播,形成BroadcastStream
*/
BroadcastStream<HashMap<String, Tuple2<String, Integer>>> broadcastConfigStream = configStream.broadcast(configDescriptor);
//5、事件流和广播的配置流连接,形成BroadcastConnectedStream
BroadcastConnectedStream<Tuple4<String, String, String, Integer>, HashMap<String, Tuple2<String, Integer>>> connectedStream = eventStream.connect(broadcastConfigStream);
//6、对BroadcastConnectedStream应用process方法,根据配置(规则)处理事件
SingleOutputStreamOperator<Tuple6<String, String, String, Integer, String, Integer>> resultStream = connectedStream.process(new CustomBroadcastProcessFunction());
//7、输出结果
resultStream.print();
//8、生成JobGraph,并开始执行
env.execute();
}
/**
* 自定义BroadcastProcessFunction
* 当事件流中的用户ID在配置中出现时,才对该事件处理, 并在事件中补全用户的基础信息
* Tuple4<String, String, String, Integer>: 第一个流(事件流)的数据类型
* HashMap<String, Tuple2<String, Integer>>: 第二个流(配置流)的数据类型
* Tuple6<String, String, String, Integer,String, Integer>: 返回的数据类型
*/
static class CustomBroadcastProcessFunction extends BroadcastProcessFunction<Tuple4<String, String, String, Integer>, HashMap<String, Tuple2<String, Integer>>, Tuple6<String, String, String, Integer, String, Integer>>{
/**定义MapStateDescriptor*/
MapStateDescriptor<Void, Map<String, Tuple2<String,Integer>>> configDescriptor = new MapStateDescriptor<>("config", Types.VOID, Types.MAP(Types.STRING, Types.TUPLE(Types.STRING, Types.INT)));
/**
* 读取状态,并基于状态,处理事件流中的数据
* 在这里,从上下文中获取状态,基于获取的状态,对事件流中的数据进行处理
* @param value 事件流中的数据
* @param ctx 上下文
* @param out 输出零条或多条数据
* @throws Exception
*/
@Override
public void processElement(Tuple4<String, String, String, Integer> value, ReadOnlyContext ctx, Collector<Tuple6<String, String, String, Integer, String, Integer>> out) throws Exception {
//事件流中的用户ID
String userID = value.f0;
//获取状态
ReadOnlyBroadcastState<Void, Map<String, Tuple2<String, Integer>>> broadcastState = ctx.getBroadcastState(configDescriptor);
Map<String, Tuple2<String, Integer>> broadcastStateUserInfo = broadcastState.get(null);
//配置中有此用户,则在该事件中添加用户的userName、userAge字段。
//配置中没有此用户,则丢弃
Tuple2<String, Integer> userInfo = broadcastStateUserInfo.get(userID);
if(userInfo!=null){
out.collect(new Tuple6<>(value.f0,value.f1,value.f2,value.f3,userInfo.f0,userInfo.f1));
}
}
/**
* 处理广播流中的每一条数据,并更新状态
* @param value 广播流中的数据
* @param ctx 上下文
* @param out 输出零条或多条数据
* @throws Exception
*/
@Override
public void processBroadcastElement(HashMap<String, Tuple2<String, Integer>> value, Context ctx, Collector<Tuple6<String, String, String, Integer, String, Integer>> out) throws Exception {
//获取状态
BroadcastState<Void, Map<String, Tuple2<String, Integer>>> broadcastState = ctx.getBroadcastState(configDescriptor);
//清空状态
broadcastState.clear();
//更新状态
broadcastState.put(null,value);
}
}
/**
* 自定义Mysql Source,每隔 secondInterval 秒从Mysql中获取一次配置
*/
static class MysqlSource extends RichSourceFunction<HashMap<String, Tuple2<String, Integer>>> {
private String host;
private Integer port;
private String db;
private String user;
private String passwd;
private Integer secondInterval;
private volatile boolean isRunning = true;
private Connection connection;
private PreparedStatement preparedStatement;
MysqlSource(String host, Integer port, String db, String user, String passwd,Integer secondInterval) {
this.host = host;
this.port = port;
this.db = db;
this.user = user;
this.passwd = passwd;
this.secondInterval = secondInterval;
}
/**
* 开始时, 在open()方法中建立连接
* @param parameters
* @throws Exception
*/
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
Class.forName("com.mysql.jdbc.Driver");
connection= DriverManager.getConnection("jdbc:mysql://" host ":" port "/" db "?useUnicode=true&characterEncoding=UTF-8", user, passwd);
String sql="select userID,userName,userAge from user_info";
preparedStatement=connection.prepareStatement(sql);
}
/**
* 执行完,调用close()方法关系连接,释放资源
* @throws Exception
*/
@Override
public void close() throws Exception {
super.close();
if(connection!=null){
connection.close();
}
if(preparedStatement !=null){
preparedStatement.close();
}
}
/**
* 调用run()方法获取数据
* @param ctx
*/
@Override
public void run(SourceContext<HashMap<String, Tuple2<String, Integer>>> ctx) {
try {
while (isRunning){
HashMap<String, Tuple2<String, Integer>> output = new HashMap<>();
ResultSet resultSet = preparedStatement.executeQuery();
while (resultSet.next()){
String userID = resultSet.getString("userID");
String userName = resultSet.getString("userName");
int userAge = resultSet.getInt("userAge");
output.put(userID,new Tuple2<>(userName,userAge));
}
ctx.collect(output);
//每隔多少秒执行一次查询
Thread.sleep(1000*secondInterval);
}
}catch (Exception ex){
log.error("从Mysql获取配置异常...",ex);
}
}
/**
* 取消时,会调用此方法
*/
@Override
public void cancel() {
isRunning = false;
}
}
}
- 注意
- 需要将维度数据的变化转换成Kafka中的流。
- 维度的变化可实时感知。
- 维度数据保存在内存中,支持的数据量相对较小。
UDTF LATERAL TABLE语法
实现方式
- 假设你用的是Flink SQL。首先,自定义UTDF, 继承TableFunction抽象类,实现open()、close()、eval()方法。
- 注册TableFunction。
- 在SQL中使用LATERAL TABLE语法和UDTF运行的结果进行关联。
代码示例-Flink Table API
代码语言:javascript复制package com.bigdata.flink.tableSqlTemporalTable;
import com.alibaba.fastjson.JSON;
import com.bigdata.flink.beans.table.ProductInfo;
import com.bigdata.flink.beans.table.UserBrowseLog;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.functions.TemporalTableFunction;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;
import java.time.LocalDateTime;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.Properties;
/**
* Summary:
* 时态表(Temporal Table)
*/
@Slf4j
public class Test {
public static void main(String[] args) throws Exception{
args=new String[]{"--application","flink/src/main/java/com/bigdata/flink/tableSqlTemporalTable/application.properties"};
//1、解析命令行参数
ParameterTool fromArgs = ParameterTool.fromArgs(args);
ParameterTool parameterTool = ParameterTool.fromPropertiesFile(fromArgs.getRequired("application"));
//browse log
String kafkaBootstrapServers = parameterTool.getRequired("kafkaBootstrapServers");
String browseTopic = parameterTool.getRequired("browseTopic");
String browseTopicGroupID = parameterTool.getRequired("browseTopicGroupID");
//product history info
String productInfoTopic = parameterTool.getRequired("productHistoryInfoTopic");
String productInfoGroupID = parameterTool.getRequired("productHistoryInfoGroupID");
//2、设置运行环境
StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv, settings);
streamEnv.setParallelism(1);
//3、注册Kafka数据源
//注意: 为了在北京时间和时间戳之间有直观的认识,这里的UserBrowseLog中增加了一个字段eventTimeTimestamp作为eventTime的时间戳
Properties browseProperties = new Properties();
browseProperties.put("bootstrap.servers",kafkaBootstrapServers);
browseProperties.put("group.id",browseTopicGroupID);
DataStream<UserBrowseLog> browseStream=streamEnv
.addSource(new FlinkKafkaConsumer010<>(browseTopic, new SimpleStringSchema(), browseProperties))
.process(new BrowseKafkaProcessFunction())
.assignTimestampsAndWatermarks(new BrowseTimestampExtractor(Time.seconds(0)));
tableEnv.registerDataStream("browse",browseStream,"userID,eventTime,eventTimeTimestamp,eventType,productID,productPrice,browseRowtime.rowtime");
//tableEnv.toAppendStream(tableEnv.scan("browse"),Row.class).print();
//4、注册时态表(Temporal Table)
//注意: 为了在北京时间和时间戳之间有直观的认识,这里的ProductInfo中增加了一个字段updatedAtTimestamp作为updatedAt的时间戳
Properties productInfoProperties = new Properties();
productInfoProperties.put("bootstrap.servers",kafkaBootstrapServers);
productInfoProperties.put("group.id",productInfoGroupID);
DataStream<ProductInfo> productInfoStream=streamEnv
.addSource(new FlinkKafkaConsumer010<>(productInfoTopic, new SimpleStringSchema(), productInfoProperties))
.process(new ProductInfoProcessFunction())
.assignTimestampsAndWatermarks(new ProductInfoTimestampExtractor(Time.seconds(0)));
tableEnv.registerDataStream("productInfo",productInfoStream, "productID,productName,productCategory,updatedAt,updatedAtTimestamp,productInfoRowtime.rowtime");
//设置Temporal Table的时间属性和主键
TemporalTableFunction productInfo = tableEnv.scan("productInfo").createTemporalTableFunction("productInfoRowtime", "productID");
//注册TableFunction
tableEnv.registerFunction("productInfoFunc",productInfo);
//tableEnv.toAppendStream(tableEnv.scan("productInfo"),Row.class).print();
//5、运行SQL
String sql = ""
"SELECT "
"browse.userID, "
"browse.eventTime, "
"browse.eventTimeTimestamp, "
"browse.eventType, "
"browse.productID, "
"browse.productPrice, "
"productInfo.productID, "
"productInfo.productName, "
"productInfo.productCategory, "
"productInfo.updatedAt, "
"productInfo.updatedAtTimestamp "
"FROM "
" browse, "
" LATERAL TABLE (productInfoFunc(browse.browseRowtime)) as productInfo "
"WHERE "
" browse.productID=productInfo.productID";
Table table = tableEnv.sqlQuery(sql);
tableEnv.toAppendStream(table,Row.class).print();
//6、开始执行
tableEnv.execute(Test.class.getSimpleName());
}
/**
* 解析Kafka数据
*/
static class BrowseKafkaProcessFunction extends ProcessFunction<String, UserBrowseLog> {
@Override
public void processElement(String value, Context ctx, Collector<UserBrowseLog> out) throws Exception {
try {
UserBrowseLog log = JSON.parseObject(value, UserBrowseLog.class);
// 增加一个long类型的时间戳
// 指定eventTime为yyyy-MM-dd HH:mm:ss格式的北京时间
DateTimeFormatter format = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
OffsetDateTime eventTime = LocalDateTime.parse(log.getEventTime(), format).atOffset(ZoneOffset.of(" 08:00"));
// 转换成毫秒时间戳
long eventTimeTimestamp = eventTime.toInstant().toEpochMilli();
log.setEventTimeTimestamp(eventTimeTimestamp);
out.collect(log);
}catch (Exception ex){
log.error("解析Kafka数据异常...",ex);
}
}
}
/**
* 提取时间戳生成水印
*/
static class BrowseTimestampExtractor extends BoundedOutOfOrdernessTimestampExtractor<UserBrowseLog> {
BrowseTimestampExtractor(Time maxOutOfOrderness) {
super(maxOutOfOrderness);
}
@Override
public long extractTimestamp(UserBrowseLog element) {
return element.getEventTimeTimestamp();
}
}
/**
* 解析Kafka数据
*/
static class ProductInfoProcessFunction extends ProcessFunction<String, ProductInfo> {
@Override
public void processElement(String value, Context ctx, Collector<ProductInfo> out) throws Exception {
try {
ProductInfo log = JSON.parseObject(value, ProductInfo.class);
// 增加一个long类型的时间戳
// 指定eventTime为yyyy-MM-dd HH:mm:ss格式的北京时间
DateTimeFormatter format = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
OffsetDateTime eventTime = LocalDateTime.parse(log.getUpdatedAt(), format).atOffset(ZoneOffset.of(" 08:00"));
// 转换成毫秒时间戳
long eventTimeTimestamp = eventTime.toInstant().toEpochMilli();
log.setUpdatedAtTimestamp(eventTimeTimestamp);
out.collect(log);
}catch (Exception ex){
log.error("解析Kafka数据异常...",ex);
}
}
}
/**
* 提取时间戳生成水印
*/
static class ProductInfoTimestampExtractor extends BoundedOutOfOrdernessTimestampExtractor<ProductInfo> {
ProductInfoTimestampExtractor(Time maxOutOfOrderness) {
super(maxOutOfOrderness);
}
@Override
public long extractTimestamp(ProductInfo element) {
return element.getUpdatedAtTimestamp();
}
}
}
代码示例-Flink SQL
定义UDTF:
代码语言:javascript复制package com.bigdata.flink.dimJoin;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.table.functions.FunctionContext;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.types.Row;
import redis.clients.jedis.Jedis;
/**
* UDTF
*/
public class UDTFRedis extends TableFunction<Row> {
private Jedis jedis;
/**
* 打开连接
* @param context
* @throws Exception
*/
@Override
public void open(FunctionContext context) throws Exception {
jedis = new Jedis("localhost", 6379);
jedis.select(0);
}
/**
* 关闭连接
* @throws Exception
*/
@Override
public void close() throws Exception {
if (jedis != null) {
jedis.close();
}
}
/**
* 从Redis中查找维度数据
* @param key
*/
public void eval(String key) {
String value = jedis.get(key);
if (value != null) {
String[] valueSplit = value.split(",");
Row row = new Row(2);
row.setField(0, valueSplit[0]);
row.setField(1, Integer.valueOf(valueSplit[1]));
collector.collect(row);
}
}
/**
* 定义返回的数据类型,返回数据为userName,userAge,所以这里为String,Int。
* @return
*/
@Override
public TypeInformation<Row> getResultType() {
return new RowTypeInfo(Types.STRING, Types.INT);
}
}
Kafka Join Redis-Dim
代码语言:javascript复制package com.bigdata.flink.dimJoin;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
/**
* Author: Wang Pei
* Summary:
* Kafka Join Redis-Dim
*/
public class KafkaJoinRedisDimWithUDTF {
public static void main(String[] args) throws Exception {
EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build();
StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv, settings);
// Source DDL
// Kafka数据: {"userID":"user_1","eventType":"click","eventTime":"2015-01-01 00:00:00"}
String sourceDDL = ""
"create table source_kafka "
"( "
" userID String, "
" eventType String, "
" eventTime String "
") with ( "
" 'connector.type' = 'kafka', "
" 'connector.version' = '0.10', "
" 'connector.properties.bootstrap.servers' = 'kafka01:9092', "
" 'connector.properties.zookeeper.connect' = 'kafka01:2181', "
" 'connector.topic' = 'test_1', "
" 'connector.properties.group.id' = 'c1_test_1', "
" 'connector.startup-mode' = 'latest-offset', "
" 'format.type' = 'json' "
")";
tableEnv.sqlUpdate(sourceDDL);
tableEnv.toAppendStream(tableEnv.from("source_kafka"), Row.class).print();
// UDTF DDL
// Redis中的数据 userID userName,userAge
// 127.0.0.1:6379> get user_1
// "name1,10"
String udtfDDL = ""
"CREATE TEMPORARY FUNCTION "
" IF NOT EXISTS UDTFRedis "
" AS 'com.bigdata.flink.dimJoin.UDTFRedis'";
tableEnv.sqlUpdate(udtfDDL);
// Query
// Left Join
String execSQL = ""
"select "
" source_kafka.*,dim.* "
"from source_kafka "
"LEFT JOIN LATERAL TABLE(UDTFRedis(userID)) as dim (userName,userAge) ON TRUE";
Table table = tableEnv.sqlQuery(execSQL);
tableEnv.toAppendStream(table, Row.class).print();
tableEnv.execute(KafkaJoinRedisDimWithUDTF.class.getSimpleName());
}
}
- 注意
- 需要定义UDTF和使用LATERAL TABLE语法。
- 不是很通用,如想用一个UDTF实现所有从Redis获取维度数据的场景,很难实现。
- 依赖外部存储,当数据变化时,可及时获取。
LookupableTableSource
实现方式
数据源实现LookupableTableSource接口。
在Flink SQL中直接注册Lookup表即可,在Flink Table API中需要注册LookupFunction 。
本质上,还是通过TableFunction来获取维度数据。
代码示例-Flink Table API
代码语言:javascript复制package com.bigdata.flink.tableSqlLookableTableSource;
import com.alibaba.fastjson.JSON;
import com.bigdata.flink.beans.table.UserBrowseLog;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.addons.hbase.HBaseTableSource;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.io.jdbc.JDBCLookupOptions;
import org.apache.flink.api.java.io.jdbc.JDBCOptions;
import org.apache.flink.api.java.io.jdbc.JDBCTableSource;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.types.DataType;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;
import org.apache.hadoop.conf.Configuration;
import java.time.LocalDateTime;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.Properties;
/**
* Lookup Table Source
*/
@Slf4j
public class Test {
public static void main(String[] args) throws Exception{
args=new String[]{"--application","flink/src/main/java/com/bigdata/flink/tableSqlLookableTableSource/application.properties"};
//1、解析命令行参数
ParameterTool fromArgs = ParameterTool.fromArgs(args);
ParameterTool parameterTool = ParameterTool.fromPropertiesFile(fromArgs.getRequired("application"));
String kafkaBootstrapServers = parameterTool.getRequired("kafkaBootstrapServers");
String browseTopic = parameterTool.getRequired("browseTopic");
String browseTopicGroupID = parameterTool.getRequired("browseTopicGroupID");
String hbaseZookeeperQuorum = parameterTool.getRequired("hbaseZookeeperQuorum");
String hbaseZnode = parameterTool.getRequired("hbaseZnode");
String hbaseTable = parameterTool.getRequired("hbaseTable");
String mysqlDBUrl = parameterTool.getRequired("mysqlDBUrl");
String mysqlUser = parameterTool.getRequired("mysqlUser");
String mysqlPwd = parameterTool.getRequired("mysqlPwd");
String mysqlTable = parameterTool.getRequired("mysqlTable");
//2、设置运行环境
EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build();
StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv, settings);
streamEnv.setParallelism(1);
//3、注册Kafka数据源
//自己造的测试数据,某个用户在某个时刻点击了某个商品,以及商品的价值,如下
//{"userID": "user_1", "eventTime": "2016-01-01 10:02:00", "eventType": "browse", "productID": "product_1", "productPrice": 20}
Properties browseProperties = new Properties();
browseProperties.put("bootstrap.servers",kafkaBootstrapServers);
browseProperties.put("group.id",browseTopicGroupID);
DataStream<UserBrowseLog> browseStream=streamEnv
.addSource(new FlinkKafkaConsumer010<>(browseTopic, new SimpleStringSchema(), browseProperties))
.process(new BrowseKafkaProcessFunction());
tableEnv.registerDataStream("kafka",browseStream,"userID,eventTime,eventTimeTimestamp,eventType,productID,productPrice");
//tableEnv.toAppendStream(tableEnv.scan("kafka"),Row.class).print();
//4、注册HBase数据源(Lookup Table Source)
Configuration conf = new Configuration();
conf.set("hbase.zookeeper.quorum", hbaseZookeeperQuorum);
conf.set("zookeeper.znode.parent",hbaseZnode);
HBaseTableSource hBaseTableSource = new HBaseTableSource(conf, hbaseTable);
hBaseTableSource.setRowKey("uid",String.class);
hBaseTableSource.addColumn("f1","name",String.class);
hBaseTableSource.addColumn("f1","age",Integer.class);
tableEnv.registerTableSource("hbase",hBaseTableSource);
//注册TableFunction
tableEnv.registerFunction("hbaseLookup", hBaseTableSource.getLookupFunction(new String[]{"uid"}));
//5、注册Mysql数据源(Lookup Table Source)
String[] mysqlFieldNames={"pid","productName","productCategory","updatedAt"};
DataType[] mysqlFieldTypes={DataTypes.STRING(),DataTypes.STRING(),DataTypes.STRING(),DataTypes.STRING()};
TableSchema mysqlTableSchema = TableSchema.builder().fields(mysqlFieldNames, mysqlFieldTypes).build();
JDBCOptions jdbcOptions = JDBCOptions.builder()
.setDriverName("com.mysql.jdbc.Driver")
.setDBUrl(mysqlDBUrl)
.setUsername(mysqlUser)
.setPassword(mysqlPwd)
.setTableName(mysqlTable)
.build();
JDBCLookupOptions jdbcLookupOptions = JDBCLookupOptions.builder()
.setCacheExpireMs(10 * 1000) //缓存有效期
.setCacheMaxSize(10) //最大缓存数据条数
.setMaxRetryTimes(3) //最大重试次数
.build();
JDBCTableSource jdbcTableSource = JDBCTableSource.builder()
.setOptions(jdbcOptions)
.setLookupOptions(jdbcLookupOptions)
.setSchema(mysqlTableSchema)
.build();
tableEnv.registerTableSource("mysql",jdbcTableSource);
//注册TableFunction
tableEnv.registerFunction("mysqlLookup",jdbcTableSource.getLookupFunction(new String[]{"pid"}));
//6、查询
//根据userID, 从HBase表中获取用户基础信息
//根据productID, 从Mysql表中获取产品基础信息
String sql = ""
"SELECT "
" userID, "
" eventTime, "
" eventType, "
" productID, "
" productPrice, "
" f1.name AS userName, "
" f1.age AS userAge, "
" productName, "
" productCategory "
"FROM "
" kafka, "
" LATERAL TABLE(hbaseLookup(userID)), "
" LATERAL TABLE (mysqlLookup(productID))";
tableEnv.toAppendStream(tableEnv.sqlQuery(sql),Row.class).print();
//7、开始执行
tableEnv.execute(Test.class.getSimpleName());
}
/**
* 解析Kafka数据
*/
static class BrowseKafkaProcessFunction extends ProcessFunction<String, UserBrowseLog> {
@Override
public void processElement(String value, Context ctx, Collector<UserBrowseLog> out) throws Exception {
try {
UserBrowseLog log = JSON.parseObject(value, UserBrowseLog.class);
// 增加一个long类型的时间戳
// 指定eventTime为yyyy-MM-dd HH:mm:ss格式的北京时间
DateTimeFormatter format = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
OffsetDateTime eventTime = LocalDateTime.parse(log.getEventTime(), format).atOffset(ZoneOffset.of(" 08:00"));
// 转换成毫秒时间戳
long eventTimeTimestamp = eventTime.toInstant().toEpochMilli();
log.setEventTimeTimestamp(eventTimeTimestamp);
out.collect(log);
}catch (Exception ex){
log.error("解析Kafka数据异常...",ex);
}
}
}
}
代码示例-Flink SQL
代码语言:javascript复制package com.bigdata.flink.dimJoin;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
/**
* Author: Wang Pei
* Summary:
* Kafka Join Mysql-Dim
*/
public class KafkaJoinMysqlDim {
public static void main(String[] args) throws Exception {
EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build();
StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv, settings);
// Source DDL
// Kafka数据: {"userID":"user_1","eventType":"click","eventTime":"2015-01-01 00:00:00"}
String sourceDDL = ""
"create table source_kafka "
"( "
" userID STRING, "
" eventType STRING, "
" eventTime STRING, "
" proctime AS PROCTIME() "
") with ( "
" 'connector.type' = 'kafka', "
" 'connector.version' = '0.10', "
" 'connector.properties.bootstrap.servers' = 'kafka01:9092', "
" 'connector.properties.zookeeper.connect' = 'kafka01:2181', "
" 'connector.topic' = 'test_1', "
" 'connector.properties.group.id' = 'c1_test_1', "
" 'connector.startup-mode' = 'latest-offset', "
" 'format.type' = 'json' "
")";
tableEnv.sqlUpdate(sourceDDL);
//tableEnv.toAppendStream(tableEnv.from("source_kafka"), Row.class).print();
// Dim DDL
// Mysql维度数据
// mysql> select * from t_user_info limit 1;
// -------- ---------- ---------
// | userID | userName | userAge |
// -------- ---------- ---------
// | user_1 | name1 | 10 |
// -------- ---------- ---------
String dimDDL = ""
"CREATE TABLE dim_mysql ( "
" userID STRING, "
" userName STRING, "
" userAge INT "
") WITH ( "
" 'connector.type' = 'jdbc', "
" 'connector.url' = 'jdbc:mysql://localhost:3306/bigdata', "
" 'connector.table' = 't_user_info', "
" 'connector.driver' = 'com.mysql.jdbc.Driver', "
" 'connector.username' = '****', "
" 'connector.password' = '******' "
")";
tableEnv.sqlUpdate(dimDDL);
// Query
// Left Join
String execSQL = ""
"SELECT "
" kafka.*,mysql.userName,mysql.userAge "
"FROM "
" source_kafka as kafka"
" LEFT JOIN dim_mysql FOR SYSTEM_TIME AS OF kafka.proctime AS mysql "
" ON kafka.userID = mysql.userID";
Table table = tableEnv.sqlQuery(execSQL);
tableEnv.toAppendStream(table, Row.class).print();
tableEnv.execute(KafkaJoinMysqlDim.class.getSimpleName());
}
}
- 注意
- 需要实现LookupableTableSource接口。
- 比较通用。
- 依赖外部存储,当数据变化时,可及时获取。
- 目前仅支持Blink Planner。
你好,我是王知无,一个大数据领域的硬核原创作者。 做过后端架构、数据中间件、数据平台&架构&、算法工程化。