1.Flink 三种Join的代码测试
1.1 数据源
(1)左流
代码语言:javascript复制订单表(orders)
id productName orderTime
1 iphone 2020-04-01 10:00:00.0
2 mac 2020-04-01 10:02:00.0
3 huawei 2020-04-01 10:03:00.0
4 pad 2020-04-01 10:05:00.0
(2)右流
代码语言:javascript复制物流表(shipments)
shipId orderId status shiptime
0 1 shipped 2020-04-01 11:00:00.0
1 2 delivered 2020-04-01 17:00:00.0
2 3 shipped 2020-04-01 12:00:00.0
3 4 shipped 2020-04-01 11:30:00.0
1.2 join
(1)代码
代码语言:javascript复制//延迟0s
val delay = 0
//Window 4hour
val window=4
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val left = env.addSource(new OrdersSource).assignTimestampsAndWatermarks(new TimedataAssignerWithPeriodicWatermarks(delay))
val right = env.addSource(new ShipmentsSource).assignTimestampsAndWatermarks(new TimedataAssignerWithPeriodicWatermarks2(delay))
left.join(right)
.where(_._1).equalTo(_._2) //Join字段 left流的第一个字段(id) 等于 right流的第二个字段(orderId)
.window(TumblingEventTimeWindows.of(Time.hours(window))) //滑动窗口
//IN1 (Int,String,Long) id productName orderTime
//IN2 (Int, Int,String,Long) shipId orderId status shiptime
//OUT (Int,String,String,Long,Long)) orderId productName status orderTime shiptime
.apply(new JoinFunction[(Int,String,Long),(Int, Int,String,Long),(Int,String,String,Long,Long)] {
override def join(first: (Int,String,Long), second: (Int, Int,String,Long)):(Int,String,String,Long,Long) = {
(first._1, first._2, second._3, first._3, second._4)
}
}).print()
env.execute()
(2)分析与输出结果
代码语言:javascript复制WM=窗口内最大的时间-允许延迟执行的时间
VM是不断增大的
窗口触发条件: WM 》=上一个窗口的结束边界
窗口内最大的时间-允许延迟执行的时间 > = 上一个窗口的结束边界
nowTimeStamp nowTime currentMaxT WM 窗口 窗口转化为hour WM转化为hour
订单表(orders)
(1,iphone,1585706400000) -- 1585706400000 -- 10:00:00 -- 10:00:00 -- 10:00:00 10:00:00-12:00:00 [10-12) 10:00
(2,mac,1585706520000) -- 1585706520000 -- 10:02:00 -- 10:02:00 -- 10:02:00 10:00:00-12:00:00 [10-12) 10:02
(3,huawei,1585706580000) -- 1585706580000 -- 10:03:00 -- 10:03:00 -- 10:03:00 10:00:00-12:00:00 [10-12) 10:03
(4,pad,1585706700000) -- 1585706700000 -- 10:05:00 -- 10:05:00 -- 10:05:00 10:00:00-12:00:00 [10-12) 10:05
物流表(shipments)
(0,1,shipped,1585710000000) -- 1585710000000 -- 11:00:00 -- 11:00:00 -- 11:00:00 10:00:00-12:00:00 [10-12) 11
(1,2,delivered,1585731600000) -- 1585731600000 -- 17:00:00 -- 17:00:00 -- 17:00:00 16:00:00-18:00:00 [16-18) 17
(2,3,shipped,1585713600000) -- 1585713600000 -- 12:00:00 -- 17:00:00 -- 17:00:00 12:00:00-14:00:00 [12-14) 17
(3,4,shipped,1585711800000) -- 1585711800000 -- 11:30:00 -- 17:00:00 -- 17:00:00 10:00:00-12:00:00 [10-12) 17 //窗口的WM为17,大于窗口的结束边界12,Window窗口触发
订单表(orders)的四条数据与物流表(shipments)的(0,1,shipped,1585710000000)、(3,4,shipped,1585711800000) 同窗口,
并在物流表中流(3,4,shipped,1585711800000)输入时,窗口的WM为17(hour),大于窗口的结束边界12(hour),Window窗口触发。
输出结果:
代码语言:javascript复制Window 4hour
(1,iphone,shipped,1585706400000,1585710000000)
(4,pad,shipped,1585706700000,1585711800000)
1.3 intervalJoin
支持INNER JOIN、LEFT JOIN、RIGHT JOIN和FULL JOIN,如果直接使用JOIN,默认为INNER JOIN。
暂不支持SEMI JOIN和ANTI JOIN。
TIMEBOUND_EXPRESSION为左右两个流时间属性列上的区间条件表达式,支持以下三种条件表达式:
代码语言:javascript复制ltime = rtime
ltime >= rtime AND ltime < rtime INTERVAL '10' MINUTE
ltime BETWEEN rtime - INTERVAL '10' SECOND AND rtime INTERVAL '5' SECOND
1.3.1 intervalJoin API用法
(1)代码
代码语言:javascript复制//延迟0s
val delay = 0
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val left = env.addSource(new OrdersSource).assignTimestampsAndWatermarks(new TimedataAssignerWithPeriodicWatermarks(delay))
val right = env.addSource(new ShipmentsSource).assignTimestampsAndWatermarks(new TimedataAssignerWithPeriodicWatermarks2(delay))
left.print("orderStream=>")
right.print("shipMentStream=>")
left
.keyBy(0)
.intervalJoin(right.keyBy(1))
// between 只支持 event time
//时间间隔 -> leftStream 默认和 [left 0hour,left 4hour]的时间范围内的rightStream进行Join
//订单流和 发送物流流延迟4个小时内的数据可以Join上
.between(Time.hours(0), Time.hours(4))
//不包含下界
//.lowerBoundExclusive()
//不包含上界
//.upperBoundExclusive()
.process(new ProcessJoinFunction[ (Int, String, Long),(Int,Int,String,Long) , (Int,String,String,Long,Long)]() {
override def processElement(orders: (Int, String, Long), shipments:(Int,Int,String,Long), context: ProcessJoinFunction[ (Int, String, Long),(Int,Int,String,Long) , (Int,String,String,Long,Long)]#Context, out: Collector[(Int,String,String,Long,Long)]): Unit = {
//orderId,ProductName, orderStatus ,TimeStamp ,TimeStamp
out.collect( (orders._1, orders._2, shipments._3,orders._3, shipments._4))
}
})
.print("IntervalJoin=>");
env.execute("IntervalJoinTest")
(2)分析与输出结果
时间间隔 -> leftStream 默认和 [left 0hour,left 4hour]的时间范围内的rightStream进行Join
订单流(leftStream)和 发送物流流(rightStream)延迟4个小时内的数据可以Join上
输出结果:
代码语言:javascript复制IntervalJoin=>> (1,iphone,shipped,1585706400000,1585710000000)
IntervalJoin=>> (3,huawei,shipped,1585706580000,1585713600000)
IntervalJoin=>> (4,pad,shipped,1585706700000,1585711800000)
1.3.2 intervalJoin SQL用法
代码语言:javascript复制val delay = 0
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val left = env.addSource(new OrdersSource).assignTimestampsAndWatermarks(new TimedataAssignerWithPeriodicWatermarks(delay))
.map(ele=>Order(ele._1,ele._2,DateUtils.formatTime(ele._3)))
val right = env.addSource(new ShipmentsSource).assignTimestampsAndWatermarks(new TimedataAssignerWithPeriodicWatermarks2(delay))
.map(ele=>ShipMents(ele._1,ele._2,ele._3,DateUtils.formatTime(ele._4)))
val tableEnvironment = StreamTableEnvironment.create(env)
val orderTable:Table=tableEnvironment.fromDataStream(left)
val shipmentsTable:Table=tableEnvironment.fromDataStream(right)
val table: Table = tableEnvironment.sqlQuery(
s"""
|SELECT o.id, o.productName, s.status
|FROM $orderTable AS o
|JOIN $shipmentsTable AS s on o.id = s.orderId AND
| o.orderTime BETWEEN s.shipTime - INTERVAL '4' HOUR AND s.shipTime
|""".stripMargin)
tableEnvironment.toAppendStream[(Int,String,String)](table).print("IntervalJoinTest")
env.execute()
注意 SQL与API,在写法上有点不一样,但是含义上都表示order流能够Join上shipMent流延迟4个小时之内的数据。
代码语言:javascript复制o.orderTime BETWEEN s.shipTime - INTERVAL '4' HOUR AND s.shipTime
与
代码语言:javascript复制orderStream
.keyBy(0)
.intervalJoin(shipTimeStream.keyBy(1))
.between(Time.hours(0), Time.hours(4))
1.4 coGroup
(1)代码
代码语言:javascript复制//延迟0s
val delay = 0
//Window 4hour
val window=4
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val leftStream = env.addSource(new OrdersSource).assignTimestampsAndWatermarks(new TimedataAssignerWithPeriodicWatermarks(delay))
val rightStream = env.addSource(new ShipmentsSource).assignTimestampsAndWatermarks(new TimedataAssignerWithPeriodicWatermarks2(delay))
val leftJoinResult: DataStream[(Int,String,String,Long,Long)] = leftStream.
coGroup(rightStream).where(_._1).equalTo(_._2) //leftJoin,以名字进行关联
.window(TumblingEventTimeWindows.of(Time.hours(window))) //滚动窗口
//IN1 (Int,String,Long) id productName orderTime
//IN2 (Int, Int,String,Long) shipId orderId status shiptime
//OUT (Int,String,String,Long,Long)) orderId productName status orderTime shiptime
.apply(new CoGroupFunction[(Int,String,Long),(Int, Int,String,Long),(Int,String,String,Long,Long)] {
override def coGroup(first: lang.Iterable[(Int,String,Long)], second: lang.Iterable[(Int, Int,String,Long)], out: Collector[(Int,String,String,Long,Long)]): Unit = {
for (firstEle <- first.asScala) {
var flag = false
for (secondEle <- second.asScala) {
//left join: 可以join到
out.collect((firstEle._1, firstEle._2, secondEle._3, firstEle._3, secondEle._4))
flag = true
}
//left join: join不到
if (!flag) {
out.collect((firstEle._1, firstEle._2, "null", firstEle._3, -1L))
}
}
}
})
leftJoinResult.print()
env.execute()
(2)分析及结果
与Join没有任何实质区别,只不过在输出的时候更加灵活,可以自定义输出。以上写法和SQL中的Left Join的含义类似。
输出结果:
代码语言:javascript复制(1,iphone,shipped,1585706400000,1585710000000)
(4,pad,shipped,1585706700000,1585711800000)
(3,huawei,null,1585706580000,-1)
(2,mac,null,1585706520000,-1)
2.intervalJoin源码解析
2.1 between方法进入类
代码语言:javascript复制org.apache.flink.streaming.api.datastream. Class KeyedStream(java){
Class IntervalJoin{
Method between{
//IntervalJoin仅支持EventTime
if (timeCharacteristic != TimeCharacteristic.EventTime) {
throw new UnsupportedTimeCharacteristicException("Time-bounded stream joins are only supported in event time");
}
}
}
}
org.apache.flink.streaming.api.scala.KeyedStream(scala){
Class IntervalJoin{
//between方法注释leftElement.timestamp lowerBound <= rightElement.timestamp<= leftElement.timestamp upperBound
Method between{
new IntervalJoined[IN1, IN2, KEY](streamOne, streamTwo, lowerMillis, upperMillis){
//下界默认包含,此方法是排除下界
Method lowerBoundExclusive{ this.lowerBoundInclusive = false }
//上界默认包含,此方法是排除上界
Method upperBoundExclusive{ this.upperBoundInclusive = false }
//process方法中需传入用户自定义函数
Method process((processJoinFunction: ProcessJoinFunction[IN1, IN2, OUT])){
//Scala中的process方法跳转到Java中的process方法中
asScalaStream(javaJoined.process(processJoinFunction, outType)){
//Java中的process方法重点关注IntervalJoinOperator类、流的connect
SingleOutputStreamOperator<OUT> process{
//【重要方法1】
//An TwoInputStreamOperator operato to execute time-bounded stream inner joins.
operator=new IntervalJoinOperator<>{
}
//【重要方法2】
//
return left
//是将两个KeyedStream进行connect操作,得到ConnectedStreams,这样的两个数据流之间就可以实现状态共享,对于intervalJoin来说就是两个流相同key的数据可以相互访问
.connect(right)
//Assigns keys to the elements,return The partitioned ConnectedStreams。
.keyBy(keySelector1, keySelector2)
//creating a transformed output stream.
.transform("Interval Join", outputType, operator);
}
}
}
}
}
}
}
}
2.2 将上述重要方法1 IntervalJoinOperator单独拿出来解析
代码语言:javascript复制class IntervalJoinOperator{
//流的状态使用的是MapState,这属于Keyed State类型。状态可以理解为本地缓存。
//分别用来存储两个流的数据,其中Long对应数据的时间戳,List<BufferEntry>对应相同时间戳的数据(其中BufferEntry有element与 hasBeenJoined两个属性)
private transient MapState<Long, List<BufferEntry<T1>>> leftBuffer;
private transient MapState<Long, List<BufferEntry<T2>>> rightBuffer;
//初始化MapState
Method initializeState{
this.leftBuffer = context.getKeyedStateStore().getMapState(new MapStateDescriptor<>( ...... ))
this.rightBuffer = context.getKeyedStateStore().getMapState(new MapStateDescriptor<>(......))
}
//processElement1对左流进行处理,均调用processElement方法
Method processElement1{
processElement(record, leftBuffer, rightBuffer, lowerBound, upperBound, true);
}
//processElement2对右流进行处理,均调用processElement方法
Method processElement2(StreamRecord<T2> record) throws Exception {
processElement(record, rightBuffer, leftBuffer, -upperBound, -lowerBound, false);
}
//方法描述的是,当两个流达到之后,比如左边的流有数据到达之后,就去右边的流去查找对应上下界范围内的数据。这两个方法调用的都是processElement方法。
Method processElement{
//获取流的值
final THIS ourValue = record.getValue();
//获取流的时间戳
final long ourTimestamp = record.getTimestamp();
//时间戳的值要有实际意义,一般使用EventTime
if (ourTimestamp == Long.MIN_VALUE) {
throw new FlinkException("Long.MIN_VALUE timestamp: Elements used in "
"interval stream joins need to have timestamps meaningful timestamps.");
}
//判断该条记录是否延迟,如果延迟,则直接跳出方法,不作任何处理。
if (isLate(ourTimestamp)) {
//到达的记录的时间戳小于当前水位时,说明该条数据延迟,不对该条数据作任何处理
return;
}
//将记录添加到对应流的MapState中,并给改记录打上未Join的标签flase
addToBuffer(ourBuffer, ourValue, ourTimestamp);
//遍历另一条流的MapStat: otherBuffer
for (Map.Entry<Long, List<BufferEntry<OTHER>>> bucket: otherBuffer.entries()) {
//另一条流记录的时间戳
final long timestamp = bucket.getKey();
// 如过当前流的时间戳ourTimestamp与另一条流的时间戳timestamp满足如下关系
//ourTimestamp relativeLowerBound <= timestamp <= ourTimestamp relativeUpperBound
//则进行Join操作,否则不作任何操作。
if (timestamp < ourTimestamp relativeLowerBound ||
timestamp > ourTimestamp relativeUpperBound) {
continue;
}
//获取另一条流的值,并执行用户自定义函数的逻辑
//取双流中时间戳较大者作为用户自定义函数ProcessJoinFunction中重写processElemen方法的输入
for (BufferEntry<OTHER> entry: bucket.getValue()) {
if (isLeft) {
//左流执行Join逻辑
collect((T1) ourValue, (T2) entry.element, ourTimestamp, timestamp);
} else {
//右流执行Join逻辑
collect((T1) entry.element, (T2) ourValue, timestamp, ourTimestamp);
}
}
}
//当前流的清除时间
long cleanupTime = (relativeUpperBound > 0L) ? ourTimestamp relativeUpperBound : ourTimestamp;
//对状态的清理详细看本类onEventTime解析
//定时的清理时间,就是当下记录的时间 relativeUpperBound,当watermark大于该时间就需要清理
//这里可以理解为加了relativeUpperBound延长了当下记录流从状态中删除的时间。
if (isLeft) {
//左流执行,注册定时清理时间
internalTimerService.registerEventTimeTimer(CLEANUP_NAMESPACE_LEFT, cleanupTime);
} else {
//右流执行,注册定时清理时间
internalTimerService.registerEventTimeTimer(CLEANUP_NAMESPACE_RIGHT, cleanupTime);
}
}
//判断记录时间戳是否延迟
Method boolean isLate(long timestamp) {
//获得当前eventTime的Watermark.水位是单调递增函数
long currentWatermark = internalTimerService.currentWatermark();
//如果记录中的时间戳小于currentWatermark则返回true,即当到达的记录的时间戳小于水位线时,说明该数据延时,不去处理,不去关联另一条流的数据。
return currentWatermark != Long.MIN_VALUE && timestamp < currentWatermark;
}
//将记录添加到对应流的MapState中
Method void addToBuffer{
List<BufferEntry<T>> elemsInBucket = buffer.get(timestamp);
if (elemsInBucket == null) {
elemsInBucket = new ArrayList<>();
}
//给改条记录默认打上一个未Join的标签false: new BufferEntry<>(value, false)
elemsInBucket.add(new BufferEntry<>(value, false));
buffer.put(timestamp, elemsInBucket);
}
//collet方法,取双流中时间戳较大者作为用户自定义函数ProcessJoinFunction中重写processElemen方法的输入
Method collect {
final long resultTimestamp = Math.max(leftTimestamp, rightTimestamp);
collector.setAbsoluteTimestamp(resultTimestamp);
context.updateTimestamps(leftTimestamp, rightTimestamp, resultTimestamp);
userFunction.processElement(left, right, context, collector);
}
//清除watermark大于该记录EventTime记录
Method onEventTime(InternalTimer<K, String> timer){
//注册当前流的清除时间(而不是数据的时间戳)
long timerTimestamp = timer.getTimestamp();
String namespace = timer.getNamespace();
logger.trace("onEventTime @ {}", timerTimestamp);
switch (namespace) {
//假设: 假设目前流到达的数据的时间戳为10s,between传进去的时间分别为1s,5s。lowerBound为1s,upperBound为5s,
//含义: 左流可以Join上右流时间范围在 [左流 1,左流 5]的数据,即 左边流时间戳 1s<=右边时间戳<=左边流时间戳 5s
右流可以Join上左流时间范围在 [右流-5,右流-1]的数据,即 右边时间戳-5s<=左边流时间戳<=右边时间戳-1s
//【重点】当前流的清除时间
//long cleanupTime = (relativeUpperBound > 0L) ? ourTimestamp relativeUpperBound : ourTimestamp;
//对左流状态清除, 此时cleanupTime = 时间戳 5s,即15秒的时候可以清除左流中时间戳在10s的数据
case CLEANUP_NAMESPACE_LEFT: {
//根据左边流时间戳 1s<=右边时间戳<=左边流时间戳 5s ;lowerBound为1s,upperBound为5s。
//如果是左流,调用processElement1方法, relativeUpperBound为5即 relativeUpperBound>0, 此时的 timerTimestamp=10 5=15s。
//此时清除左流的timestamp=timerTimestamp=15s.
//当时间达到15s时,可以清除左边流时间戳为10s数据,即看右边流在15s时,需要查找的左边流时间范围10s<=左边流时间戳<=14s,所以watermark>15s时可清除左边流时间戳为10s数据。
long timestamp = (upperBound <= 0L) ? timerTimestamp : timerTimestamp - upperBound;
logger.trace("Removing from left buffer @ {}", timestamp);
leftBuffer.remove(timestamp);
break;
}
//对右流状态清除,此时cleanupTime = 时间戳,即10秒的时候可以清除右流中时间戳在10s的数据
case CLEANUP_NAMESPACE_RIGHT: {
//右边时间戳-5s<=左边流时间戳<=右边时间戳-1s;此时relativeLowerBound为-5,relativeUpperBound为-1。
//如果为右边流数据到达,调用processElement2方法 ,relativeLowerBound为-5即relativeLowerBound<0,此时的 timerTimestamp=10s。
//此时清除右流的timestamp=timerTimestamp lowerBound =10s-5=5s,实际上清除的是右流为5s的数据???
//当时间到达10s,可以清除右边流时间戳为10s的数据,即看左边流在10s时,需要查找右边流时间范围11s<=右边时间戳<=16s,所以所以watermark>10s时可清除右边流时间戳为10s数据。
long timestamp = (lowerBound <= 0L) ? timerTimestamp lowerBound : timerTimestamp;
logger.trace("Removing from right buffer @ {}", timestamp);
rightBuffer.remove(timestamp);
break;
}
default:
throw new RuntimeException("Invalid namespace " namespace);
}
}
}
2.3 状态清理机制详解
2.3.1 状态清理时间cleanupTime
代码语言:javascript复制long cleanupTime = (relativeUpperBound > 0L) ? ourTimestamp relativeUpperBound : ourTimestamp;
2.3.2 执行状态清理操作 Buffer.remove(timestamp)
- 假设:假设目前流到达的数据的时间戳为10s,between传进去的时间分别为1s,5s。lowerBound为1s,upperBound为5s,
- 含义: 左流可以Join上右流时间范围在 [左流 1,左流 5]的数据,即 左边流时间戳 1s<=右边时间戳<=左边流时间戳 5s 右流可以Join上左流时间范围在 [右流-5,右流-1]的数据,即 右边时间戳-5s<=左边流时间戳<=右边时间戳-1s
- 当左流时间戳为10s的数据进入
(1)左边流时间戳 1s<=右边时间戳<=左边流时间戳 5s ;lowerBound为1s,upperBound为5s
(2)如果是左流,调用processElement1方法, relativeUpperBound为5即 relativeUpperBound>0, 此时的 timerTimestamp=10 5=15s。
代码语言:javascript复制long cleanupTime = (relativeUpperBound > 0L) ? ourTimestamp relativeUpperBound : ourTimestamp;
(3)此时 relativelowerBound为1即lowerBound>0; 清除左流的timestamp=timerTimestamp=15s.
代码语言:javascript复制long timestamp = (lowerBound <= 0L) ? timerTimestamp lowerBound : timerTimestamp;
logger.trace("Removing from right buffer @ {}", timestamp);
rightBuffer.remove(timestamp);
(4)结论:
当时间达到15s时,可以清除左边流时间戳为10s数据,即看右边流在15s时,需要查找的左边流时间范围10s<=左边流时间戳<=14s,所以watermark>15s时可清除左边流时间戳为10s数据。
- 当右流时间戳为10s的数据进入
(1)右边时间戳-5s<=左边流时间戳<=右边时间戳-1s;此时relativeLowerBound为-5,relativeUpperBound为-1。
(2)如果为右边流数据到达,调用processElement2方法 ,relativeLowerBound为-5即relativeLowerBound<0,此时的 timerTimestamp=10s。
代码语言:javascript复制long cleanupTime = (relativeUpperBound > 0L) ? ourTimestamp relativeUpperBound : ourTimestamp;
1
(3)此时 relativelowerBound为-5即lowerBound<0;清除右流的timestamp=timerTimestamp lowerBound =10s-5=5s,实际上清除的是右流为5s的数据???
代码语言:javascript复制long timestamp = (lowerBound <= 0L) ? timerTimestamp lowerBound : timerTimestamp;
logger.trace("Removing from right buffer @ {}", timestamp);
rightBuffer.remove(timestamp);
(4)结论
当时间到达10s,可以清除右边流时间戳为10s的数据,即看左边流在10s时,需要查找右边流时间范围11s<=右边时间戳<=16s,所以所以watermark>10s时可清除右边流时间戳为10s数据。
2.4 看完源码后需要知道的
2.4.1 MapState存储状态
状态使用的是MapState,这属于Keyed State类型。状态可以理解为本地缓存,分别用来存储两个流的数据。其数据结构为 MapState<Long, List>,其中Long对应数据的时间戳,List对应相同时间戳的数据(其中BufferEntry有element与 hasBeenJoined两个属性)
2.4.2 状态清理时间
左流状态清理时间=ourTimestamp(数据流中的Event) relativeUpperBound(时间范围上界)
右流状态清理时间=ourTimestamp(数据流中的Event)
3.三种Join的区别及使用场景
参考资料
代码语言:javascript复制Flink DataStream Join && IntervalJoin && coGroup的区别
https://blog.csdn.net/qq_33689414/article/details/93875881
(阿里云实时flink版本)IntervalJoin语句
https://help.aliyun.com/document_detail/195298.html
(原理)Apache Flink 漫谈系列 - Time Interval JOIN
https://enjoyment.cool/2019/03/22/Apache Flink 漫谈系列 - Time Interval JOIN/#more
(状态清理机制)Flink1.11 intervalJoin watermark生成,状态清理机制源码理解&Demo分析
https://blog.csdn.net/qq_34864753/article/details/111183556
(源码分析)Flink Interval Join 使用和原理分析
https://blog.csdn.net/tzs_1041218129/article/details/109475489?utm_medium=distribute.pc_relevant.none-task-blog-baidujs_baidulandingword-1&spm=1001.2101.3001.4242
4.多个流Join
上面说到了两个流的JOIN,但是实际场景中可能涉及到四个流,甚至六个流的JOIN,该如何实现呢?
4.1 场景1:多个流更新不频繁,需要实时join成一张表(多个维表JOIN成一张维表)
实现步骤:
1.用Canal实时同步MySQL binlog到Kafka,形成相应的流。
2.将表A流与原MySQL中其他表(表B、C、D)异步JOIN。对表E进行相应的增删改。
注意此处前提条件:
(1)MySQL的四张表更新不频繁,因为如果更新频繁,使用MySQL进行异步Join可能QPS要求达不到。
(2)表A去Join表B、C、D是根据情况选择,只需要Join对标E有增删改的表。
4.2 两个流Join(事实表与维表JOIN)
实现步骤:
1.用Canal实时同步MySQL binlog到Kafka,事实表A流。
2.用Canal实时同步MySQ维表B到Phoenix。
3.使用Kafka中A流异步Join Phoenix中的维表B,将结果写入到Phoenix中结果表C。
注意:
(1)此处将维表B实时同步到Phoenix中,是维表B的QPS比较高(这里的维表是一个广泛概念),如果QPS比较低,可以直接使用MySQL中的维表B。
4.3 两个事实表Join(不使用TimeWindowJoin)
两个实时表Join如果使用TimeWindowJoin就是将数据状态保存在Flink的Operate State中。首先,这里使用第三方存储Phoenix。其次IntervalJoin的缺点是其中一个流如果有延迟,而且延迟超过State的过期时间,就会存在数据丢失的情况。此处使用CoGroupJoin 侧流输出解决此问题。
实现步骤:
1.用Canal实时同步MySQL binlog到Kafka,事实表A流和B流。
2.使用A流 coGroup B流
3.A流late,sideputTag API/DB(使用API从数据库中异步JoinB表数据)。同理B流late,sideputTag API/DB(使用API从数据库中异步JoinA表数据)。
4.UNION。将所有流UNION起来并写入到Phoenix表C
注意:
此处与IntervalJoin的不同是,没有使用Flink的状态,而是将延迟的数据直接通过SideOutPutTag拿出来,并异步Join MySQL中的数据。