Flink双流及多流Join 、IntervalJoin、coGroupJoin的区别与生产使用

2022-12-05 09:09:00 浏览数 (1)

1.Flink 三种Join的代码测试

1.1 数据源

(1)左流

代码语言:javascript复制
订单表(orders)
id   productName       orderTime
1     iphone           2020-04-01 10:00:00.0
2     mac               2020-04-01 10:02:00.0
3     huawei           2020-04-01 10:03:00.0
4     pad               2020-04-01 10:05:00.0

(2)右流

代码语言:javascript复制
物流表(shipments)
shipId  orderId status       shiptime
0       1         shipped       2020-04-01 11:00:00.0
1       2         delivered     2020-04-01 17:00:00.0
2      3         shipped       2020-04-01 12:00:00.0
3      4         shipped       2020-04-01 11:30:00.0

1.2 join

(1)代码

代码语言:javascript复制
//延迟0s
val delay = 0
//Window 4hour
val window=4
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val left = env.addSource(new OrdersSource).assignTimestampsAndWatermarks(new TimedataAssignerWithPeriodicWatermarks(delay))
val right = env.addSource(new ShipmentsSource).assignTimestampsAndWatermarks(new TimedataAssignerWithPeriodicWatermarks2(delay))

left.join(right)
  .where(_._1).equalTo(_._2)  //Join字段 left流的第一个字段(id) 等于 right流的第二个字段(orderId)
  .window(TumblingEventTimeWindows.of(Time.hours(window)))  //滑动窗口
  //IN1   (Int,String,Long)  id  productName orderTime
  //IN2   (Int, Int,String,Long)  shipId orderId status shiptime
  //OUT   (Int,String,String,Long,Long)) orderId productName status orderTime shiptime
  .apply(new JoinFunction[(Int,String,Long),(Int, Int,String,Long),(Int,String,String,Long,Long)] {
    override def join(first: (Int,String,Long), second: (Int, Int,String,Long)):(Int,String,String,Long,Long) = {
      (first._1, first._2, second._3, first._3, second._4)
    }
  }).print()

 env.execute()

(2)分析与输出结果

代码语言:javascript复制
WM=窗口内最大的时间-允许延迟执行的时间
VM是不断增大的
窗口触发条件:  WM 》=上一个窗口的结束边界
              窗口内最大的时间-允许延迟执行的时间 > = 上一个窗口的结束边界

                                     nowTimeStamp   nowTime     currentMaxT     WM         窗口                窗口转化为hour      WM转化为hour
订单表(orders)
(1,iphone,1585706400000)          -- 1585706400000 -- 10:00:00 -- 10:00:00 -- 10:00:00    10:00:00-12:00:00    [10-12)            10:00
(2,mac,1585706520000)             -- 1585706520000 -- 10:02:00 -- 10:02:00 -- 10:02:00    10:00:00-12:00:00    [10-12)            10:02
(3,huawei,1585706580000)          -- 1585706580000 -- 10:03:00 -- 10:03:00 -- 10:03:00    10:00:00-12:00:00    [10-12)            10:03
(4,pad,1585706700000)             -- 1585706700000 -- 10:05:00 -- 10:05:00 -- 10:05:00    10:00:00-12:00:00    [10-12)            10:05

物流表(shipments)
(0,1,shipped,1585710000000)       -- 1585710000000 -- 11:00:00 -- 11:00:00 -- 11:00:00    10:00:00-12:00:00    [10-12)            11
(1,2,delivered,1585731600000)     -- 1585731600000 -- 17:00:00 -- 17:00:00 -- 17:00:00    16:00:00-18:00:00    [16-18)            17
(2,3,shipped,1585713600000)       -- 1585713600000 -- 12:00:00 -- 17:00:00 -- 17:00:00    12:00:00-14:00:00    [12-14)            17
(3,4,shipped,1585711800000)       -- 1585711800000 -- 11:30:00 -- 17:00:00 -- 17:00:00    10:00:00-12:00:00    [10-12)            17      //窗口的WM为17,大于窗口的结束边界12,Window窗口触发

订单表(orders)的四条数据与物流表(shipments)的(0,1,shipped,1585710000000)、(3,4,shipped,1585711800000) 同窗口,

并在物流表中流(3,4,shipped,1585711800000)输入时,窗口的WM为17(hour),大于窗口的结束边界12(hour),Window窗口触发。

输出结果:

代码语言:javascript复制
Window 4hour
(1,iphone,shipped,1585706400000,1585710000000)
(4,pad,shipped,1585706700000,1585711800000)

1.3 intervalJoin

支持INNER JOIN、LEFT JOIN、RIGHT JOIN和FULL JOIN,如果直接使用JOIN,默认为INNER JOIN。

暂不支持SEMI JOIN和ANTI JOIN。

TIMEBOUND_EXPRESSION为左右两个流时间属性列上的区间条件表达式,支持以下三种条件表达式:

代码语言:javascript复制
ltime = rtime
ltime >= rtime AND ltime < rtime   INTERVAL '10' MINUTE
ltime BETWEEN rtime - INTERVAL '10' SECOND AND rtime   INTERVAL '5' SECOND
1.3.1 intervalJoin API用法

(1)代码

代码语言:javascript复制
//延迟0s
val delay = 0
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val left = env.addSource(new OrdersSource).assignTimestampsAndWatermarks(new TimedataAssignerWithPeriodicWatermarks(delay))
val right = env.addSource(new ShipmentsSource).assignTimestampsAndWatermarks(new TimedataAssignerWithPeriodicWatermarks2(delay))

left.print("orderStream=>")
right.print("shipMentStream=>")

left
  .keyBy(0)
  .intervalJoin(right.keyBy(1))
  // between 只支持 event time
  //时间间隔 -> leftStream 默认和 [left 0hour,left 4hour]的时间范围内的rightStream进行Join
  //订单流和 发送物流流延迟4个小时内的数据可以Join上
  .between(Time.hours(0), Time.hours(4))
  //不包含下界
  //.lowerBoundExclusive()
  //不包含上界
  //.upperBoundExclusive()
  .process(new ProcessJoinFunction[ (Int, String, Long),(Int,Int,String,Long) , (Int,String,String,Long,Long)]() {
    override def processElement(orders: (Int, String, Long), shipments:(Int,Int,String,Long), context: ProcessJoinFunction[ (Int, String, Long),(Int,Int,String,Long) , (Int,String,String,Long,Long)]#Context, out: Collector[(Int,String,String,Long,Long)]): Unit = {
      //orderId,ProductName, orderStatus ,TimeStamp ,TimeStamp
      out.collect( (orders._1, orders._2, shipments._3,orders._3, shipments._4))
    }
  })
  .print("IntervalJoin=>");

  env.execute("IntervalJoinTest")

(2)分析与输出结果

时间间隔 -> leftStream 默认和 [left 0hour,left 4hour]的时间范围内的rightStream进行Join

订单流(leftStream)和 发送物流流(rightStream)延迟4个小时内的数据可以Join上

输出结果:

代码语言:javascript复制
IntervalJoin=>> (1,iphone,shipped,1585706400000,1585710000000)
IntervalJoin=>> (3,huawei,shipped,1585706580000,1585713600000)
IntervalJoin=>> (4,pad,shipped,1585706700000,1585711800000)
1.3.2 intervalJoin SQL用法
代码语言:javascript复制
val delay = 0
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val left = env.addSource(new OrdersSource).assignTimestampsAndWatermarks(new TimedataAssignerWithPeriodicWatermarks(delay))
   .map(ele=>Order(ele._1,ele._2,DateUtils.formatTime(ele._3)))
val right = env.addSource(new ShipmentsSource).assignTimestampsAndWatermarks(new TimedataAssignerWithPeriodicWatermarks2(delay))
   .map(ele=>ShipMents(ele._1,ele._2,ele._3,DateUtils.formatTime(ele._4)))

val tableEnvironment = StreamTableEnvironment.create(env)

val orderTable:Table=tableEnvironment.fromDataStream(left)
val shipmentsTable:Table=tableEnvironment.fromDataStream(right)

val table: Table = tableEnvironment.sqlQuery(
  s"""
     |SELECT o.id, o.productName, s.status
     |FROM $orderTable AS o
     |JOIN $shipmentsTable AS s on o.id = s.orderId AND
     |     o.orderTime BETWEEN s.shipTime - INTERVAL '4' HOUR AND s.shipTime
     |""".stripMargin)

tableEnvironment.toAppendStream[(Int,String,String)](table).print("IntervalJoinTest")

env.execute()

注意 SQL与API,在写法上有点不一样,但是含义上都表示order流能够Join上shipMent流延迟4个小时之内的数据。

代码语言:javascript复制
o.orderTime BETWEEN s.shipTime - INTERVAL '4' HOUR AND s.shipTime

代码语言:javascript复制
orderStream
   .keyBy(0) 
   .intervalJoin(shipTimeStream.keyBy(1))
   .between(Time.hours(0), Time.hours(4))

1.4 coGroup

(1)代码

代码语言:javascript复制
//延迟0s
val delay = 0
//Window 4hour
val window=4
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val leftStream = env.addSource(new OrdersSource).assignTimestampsAndWatermarks(new TimedataAssignerWithPeriodicWatermarks(delay))
val rightStream = env.addSource(new ShipmentsSource).assignTimestampsAndWatermarks(new TimedataAssignerWithPeriodicWatermarks2(delay))

val leftJoinResult: DataStream[(Int,String,String,Long,Long)] = leftStream.
  coGroup(rightStream).where(_._1).equalTo(_._2) //leftJoin,以名字进行关联
  .window(TumblingEventTimeWindows.of(Time.hours(window))) //滚动窗口
  //IN1   (Int,String,Long)  id  productName orderTime
  //IN2   (Int, Int,String,Long)  shipId orderId status shiptime
  //OUT   (Int,String,String,Long,Long)) orderId productName status orderTime shiptime
  .apply(new CoGroupFunction[(Int,String,Long),(Int, Int,String,Long),(Int,String,String,Long,Long)] {
    override def coGroup(first: lang.Iterable[(Int,String,Long)], second: lang.Iterable[(Int, Int,String,Long)], out: Collector[(Int,String,String,Long,Long)]): Unit = {
      for (firstEle <- first.asScala) {
        var flag = false
        for (secondEle <- second.asScala) {
          //left join: 可以join到
          out.collect((firstEle._1, firstEle._2, secondEle._3, firstEle._3, secondEle._4))
          flag = true
        }
        //left join: join不到
        if (!flag) {
          out.collect((firstEle._1, firstEle._2, "null", firstEle._3, -1L))
        }
      }
    }
  })

leftJoinResult.print()
env.execute()

(2)分析及结果

与Join没有任何实质区别,只不过在输出的时候更加灵活,可以自定义输出。以上写法和SQL中的Left Join的含义类似。

输出结果:

代码语言:javascript复制
(1,iphone,shipped,1585706400000,1585710000000)
(4,pad,shipped,1585706700000,1585711800000)
(3,huawei,null,1585706580000,-1)
(2,mac,null,1585706520000,-1)

2.intervalJoin源码解析

2.1 between方法进入类

代码语言:javascript复制
org.apache.flink.streaming.api.datastream. Class KeyedStream(java){
 Class IntervalJoin{
    Method between{
              //IntervalJoin仅支持EventTime 
 if (timeCharacteristic != TimeCharacteristic.EventTime) {
 throw new UnsupportedTimeCharacteristicException("Time-bounded stream joins are only supported in event time");
 }
     } 
 }
}

org.apache.flink.streaming.api.scala.KeyedStream(scala){
  Class IntervalJoin{
    //between方法注释leftElement.timestamp   lowerBound <= rightElement.timestamp<= leftElement.timestamp   upperBound
    Method between{
         new IntervalJoined[IN1, IN2, KEY](streamOne, streamTwo, lowerMillis, upperMillis){
 //下界默认包含,此方法是排除下界
 Method lowerBoundExclusive{ this.lowerBoundInclusive = false  }
 //上界默认包含,此方法是排除上界
 Method upperBoundExclusive{ this.upperBoundInclusive = false }
             //process方法中需传入用户自定义函数
 Method process((processJoinFunction: ProcessJoinFunction[IN1, IN2, OUT])){
                //Scala中的process方法跳转到Java中的process方法中
   asScalaStream(javaJoined.process(processJoinFunction, outType)){
       //Java中的process方法重点关注IntervalJoinOperator类、流的connect
       SingleOutputStreamOperator<OUT> process{
  //【重要方法1】
  //An TwoInputStreamOperator operato to execute time-bounded stream inner joins.
   operator=new IntervalJoinOperator<>{
  }
  //【重要方法2】
               //  
  return left
   //是将两个KeyedStream进行connect操作,得到ConnectedStreams,这样的两个数据流之间就可以实现状态共享,对于intervalJoin来说就是两个流相同key的数据可以相互访问 
   .connect(right)
   //Assigns keys to the elements,return The partitioned ConnectedStreams。
   .keyBy(keySelector1, keySelector2)
   //creating a transformed output stream.
   .transform("Interval Join", outputType, operator);

       }
   } 
 }
         }
      }
  }
 }
}

2.2 将上述重要方法1 IntervalJoinOperator单独拿出来解析

代码语言:javascript复制
class IntervalJoinOperator{
  //流的状态使用的是MapState,这属于Keyed State类型。状态可以理解为本地缓存。
  //分别用来存储两个流的数据,其中Long对应数据的时间戳,List<BufferEntry>对应相同时间戳的数据(其中BufferEntry有element与 hasBeenJoined两个属性)
  private transient MapState<Long, List<BufferEntry<T1>>> leftBuffer;
  private transient MapState<Long, List<BufferEntry<T2>>> rightBuffer;
  
  //初始化MapState
  Method initializeState{
   this.leftBuffer = context.getKeyedStateStore().getMapState(new MapStateDescriptor<>( ...... ))
  this.rightBuffer = context.getKeyedStateStore().getMapState(new MapStateDescriptor<>(......))
  }

  //processElement1对左流进行处理,均调用processElement方法
  
   Method processElement1{
 processElement(record, leftBuffer, rightBuffer, lowerBound, upperBound, true);
   }
   //processElement2对右流进行处理,均调用processElement方法
   Method processElement2(StreamRecord<T2> record) throws Exception {
 processElement(record, rightBuffer, leftBuffer, -upperBound, -lowerBound, false);
  }
  
  //方法描述的是,当两个流达到之后,比如左边的流有数据到达之后,就去右边的流去查找对应上下界范围内的数据。这两个方法调用的都是processElement方法。
  Method processElement{
  //获取流的值
               final THIS ourValue = record.getValue();
  //获取流的时间戳
               final long ourTimestamp = record.getTimestamp();
  
   //时间戳的值要有实际意义,一般使用EventTime
  if (ourTimestamp == Long.MIN_VALUE) {
   throw new FlinkException("Long.MIN_VALUE timestamp: Elements used in "  
     "interval stream joins need to have timestamps meaningful timestamps.");
  }
  
  //判断该条记录是否延迟,如果延迟,则直接跳出方法,不作任何处理。
  if (isLate(ourTimestamp)) {
                                //到达的记录的时间戳小于当前水位时,说明该条数据延迟,不对该条数据作任何处理
   return;
  }

  //将记录添加到对应流的MapState中,并给改记录打上未Join的标签flase
  addToBuffer(ourBuffer, ourValue, ourTimestamp);
  
  //遍历另一条流的MapStat: otherBuffer
  for (Map.Entry<Long, List<BufferEntry<OTHER>>> bucket: otherBuffer.entries()) {
   //另一条流记录的时间戳
   final long timestamp  = bucket.getKey();
   
   // 如过当前流的时间戳ourTimestamp与另一条流的时间戳timestamp满足如下关系
   //ourTimestamp   relativeLowerBound <=  timestamp <= ourTimestamp   relativeUpperBound
   //则进行Join操作,否则不作任何操作。
   if (timestamp < ourTimestamp   relativeLowerBound ||
     timestamp > ourTimestamp   relativeUpperBound) {
    continue;
   }
   
   //获取另一条流的值,并执行用户自定义函数的逻辑
   //取双流中时间戳较大者作为用户自定义函数ProcessJoinFunction中重写processElemen方法的输入
   for (BufferEntry<OTHER> entry: bucket.getValue()) {
    if (isLeft) {
     //左流执行Join逻辑
     collect((T1) ourValue, (T2) entry.element, ourTimestamp, timestamp);
    } else {
     //右流执行Join逻辑
     collect((T1) entry.element, (T2) ourValue, timestamp, ourTimestamp);
    }
   }
  }
  //当前流的清除时间
  long cleanupTime = (relativeUpperBound > 0L) ? ourTimestamp   relativeUpperBound : ourTimestamp;
  
  //对状态的清理详细看本类onEventTime解析
                            //定时的清理时间,就是当下记录的时间 relativeUpperBound,当watermark大于该时间就需要清理
  //这里可以理解为加了relativeUpperBound延长了当下记录流从状态中删除的时间。
  if (isLeft) {
   //左流执行,注册定时清理时间
   internalTimerService.registerEventTimeTimer(CLEANUP_NAMESPACE_LEFT, cleanupTime);
  } else {
   //右流执行,注册定时清理时间
   internalTimerService.registerEventTimeTimer(CLEANUP_NAMESPACE_RIGHT, cleanupTime);
  }

  }
 
 //判断记录时间戳是否延迟
 Method boolean isLate(long timestamp) {
              //获得当前eventTime的Watermark.水位是单调递增函数
 long currentWatermark = internalTimerService.currentWatermark();
 //如果记录中的时间戳小于currentWatermark则返回true,即当到达的记录的时间戳小于水位线时,说明该数据延时,不去处理,不去关联另一条流的数据。
 return currentWatermark != Long.MIN_VALUE && timestamp < currentWatermark;
 }
 //将记录添加到对应流的MapState中
 Method  void addToBuffer{   
 List<BufferEntry<T>> elemsInBucket = buffer.get(timestamp);
 if (elemsInBucket == null) {
      elemsInBucket = new ArrayList<>();
 }
              //给改条记录默认打上一个未Join的标签false: new BufferEntry<>(value, false)
 elemsInBucket.add(new BufferEntry<>(value, false));
 buffer.put(timestamp, elemsInBucket);
 }
 
 //collet方法,取双流中时间戳较大者作为用户自定义函数ProcessJoinFunction中重写processElemen方法的输入
 Method collect {
  final long resultTimestamp = Math.max(leftTimestamp, rightTimestamp);

  collector.setAbsoluteTimestamp(resultTimestamp);
  context.updateTimestamps(leftTimestamp, rightTimestamp, resultTimestamp);

  userFunction.processElement(left, right, context, collector);
 }
 
 //清除watermark大于该记录EventTime记录
 Method onEventTime(InternalTimer<K, String> timer){

  //注册当前流的清除时间(而不是数据的时间戳)
   long timerTimestamp = timer.getTimestamp();

  String namespace = timer.getNamespace();

  logger.trace("onEventTime @ {}", timerTimestamp);

  switch (namespace) {
   //假设: 假设目前流到达的数据的时间戳为10s,between传进去的时间分别为1s,5s。lowerBound为1s,upperBound为5s,
   //含义: 左流可以Join上右流时间范围在 [左流 1,左流 5]的数据,即 左边流时间戳 1s<=右边时间戳<=左边流时间戳 5s 
                右流可以Join上左流时间范围在 [右流-5,右流-1]的数据,即 右边时间戳-5s<=左边流时间戳<=右边时间戳-1s

   //【重点】当前流的清除时间
   //long cleanupTime = (relativeUpperBound > 0L) ? ourTimestamp   relativeUpperBound : ourTimestamp;

   //对左流状态清除, 此时cleanupTime = 时间戳 5s,即15秒的时候可以清除左流中时间戳在10s的数据
   case CLEANUP_NAMESPACE_LEFT: {    
    //根据左边流时间戳 1s<=右边时间戳<=左边流时间戳 5s ;lowerBound为1s,upperBound为5s。
    //如果是左流,调用processElement1方法, relativeUpperBound为5即 relativeUpperBound>0, 此时的 timerTimestamp=10 5=15s。
    //此时清除左流的timestamp=timerTimestamp=15s.
    //当时间达到15s时,可以清除左边流时间戳为10s数据,即看右边流在15s时,需要查找的左边流时间范围10s<=左边流时间戳<=14s,所以watermark>15s时可清除左边流时间戳为10s数据。
    long timestamp = (upperBound <= 0L) ? timerTimestamp : timerTimestamp - upperBound;
    logger.trace("Removing from left buffer @ {}", timestamp);
    leftBuffer.remove(timestamp);
    break;
   }

   //对右流状态清除,此时cleanupTime = 时间戳,即10秒的时候可以清除右流中时间戳在10s的数据
   case CLEANUP_NAMESPACE_RIGHT: {
    //右边时间戳-5s<=左边流时间戳<=右边时间戳-1s;此时relativeLowerBound为-5,relativeUpperBound为-1。  
    //如果为右边流数据到达,调用processElement2方法 ,relativeLowerBound为-5即relativeLowerBound<0,此时的 timerTimestamp=10s。
    //此时清除右流的timestamp=timerTimestamp   lowerBound =10s-5=5s,实际上清除的是右流为5s的数据???
    //当时间到达10s,可以清除右边流时间戳为10s的数据,即看左边流在10s时,需要查找右边流时间范围11s<=右边时间戳<=16s,所以所以watermark>10s时可清除右边流时间戳为10s数据。
    long timestamp = (lowerBound <= 0L) ? timerTimestamp   lowerBound : timerTimestamp;
    logger.trace("Removing from right buffer @ {}", timestamp);
    rightBuffer.remove(timestamp);
    break;
   }
   default:
    throw new RuntimeException("Invalid namespace "   namespace);
  }

 }

}

2.3 状态清理机制详解

2.3.1 状态清理时间cleanupTime
代码语言:javascript复制
long cleanupTime = (relativeUpperBound > 0L) ? ourTimestamp   relativeUpperBound : ourTimestamp;
2.3.2 执行状态清理操作 Buffer.remove(timestamp)
  1. 假设:假设目前流到达的数据的时间戳为10s,between传进去的时间分别为1s,5s。lowerBound为1s,upperBound为5s,
  2. 含义: 左流可以Join上右流时间范围在 [左流 1,左流 5]的数据,即 左边流时间戳 1s<=右边时间戳<=左边流时间戳 5s 右流可以Join上左流时间范围在 [右流-5,右流-1]的数据,即 右边时间戳-5s<=左边流时间戳<=右边时间戳-1s
  3. 当左流时间戳为10s的数据进入

(1)左边流时间戳 1s<=右边时间戳<=左边流时间戳 5s ;lowerBound为1s,upperBound为5s

(2)如果是左流,调用processElement1方法, relativeUpperBound为5即 relativeUpperBound>0, 此时的 timerTimestamp=10 5=15s。

代码语言:javascript复制
long cleanupTime = (relativeUpperBound > 0L) ? ourTimestamp   relativeUpperBound : ourTimestamp;

(3)此时 relativelowerBound为1即lowerBound>0; 清除左流的timestamp=timerTimestamp=15s.

代码语言:javascript复制
long timestamp = (lowerBound <= 0L) ? timerTimestamp   lowerBound : timerTimestamp;
    logger.trace("Removing from right buffer @ {}", timestamp);
    rightBuffer.remove(timestamp);

(4)结论:

当时间达到15s时,可以清除左边流时间戳为10s数据,即看右边流在15s时,需要查找的左边流时间范围10s<=左边流时间戳<=14s,所以watermark>15s时可清除左边流时间戳为10s数据。

  1. 当右流时间戳为10s的数据进入

(1)右边时间戳-5s<=左边流时间戳<=右边时间戳-1s;此时relativeLowerBound为-5,relativeUpperBound为-1。

(2)如果为右边流数据到达,调用processElement2方法 ,relativeLowerBound为-5即relativeLowerBound<0,此时的 timerTimestamp=10s。

代码语言:javascript复制
long cleanupTime = (relativeUpperBound > 0L) ? ourTimestamp   relativeUpperBound : ourTimestamp;
1

(3)此时 relativelowerBound为-5即lowerBound<0;清除右流的timestamp=timerTimestamp lowerBound =10s-5=5s,实际上清除的是右流为5s的数据???

代码语言:javascript复制
long timestamp = (lowerBound <= 0L) ? timerTimestamp   lowerBound : timerTimestamp;
    logger.trace("Removing from right buffer @ {}", timestamp);
    rightBuffer.remove(timestamp);

(4)结论

当时间到达10s,可以清除右边流时间戳为10s的数据,即看左边流在10s时,需要查找右边流时间范围11s<=右边时间戳<=16s,所以所以watermark>10s时可清除右边流时间戳为10s数据。

2.4 看完源码后需要知道的

2.4.1 MapState存储状态

状态使用的是MapState,这属于Keyed State类型。状态可以理解为本地缓存,分别用来存储两个流的数据。其数据结构为 MapState<Long, List>,其中Long对应数据的时间戳,List对应相同时间戳的数据(其中BufferEntry有element与 hasBeenJoined两个属性)

2.4.2 状态清理时间

左流状态清理时间=ourTimestamp(数据流中的Event) relativeUpperBound(时间范围上界)

右流状态清理时间=ourTimestamp(数据流中的Event)

3.三种Join的区别及使用场景

参考资料

代码语言:javascript复制
Flink DataStream Join && IntervalJoin && coGroup的区别
https://blog.csdn.net/qq_33689414/article/details/93875881

(阿里云实时flink版本)IntervalJoin语句
https://help.aliyun.com/document_detail/195298.html


(原理)Apache Flink 漫谈系列 - Time Interval JOIN
https://enjoyment.cool/2019/03/22/Apache Flink 漫谈系列 - Time Interval JOIN/#more


(状态清理机制)Flink1.11 intervalJoin watermark生成,状态清理机制源码理解&Demo分析
https://blog.csdn.net/qq_34864753/article/details/111183556

(源码分析)Flink Interval Join 使用和原理分析
 https://blog.csdn.net/tzs_1041218129/article/details/109475489?utm_medium=distribute.pc_relevant.none-task-blog-baidujs_baidulandingword-1&spm=1001.2101.3001.4242

4.多个流Join

上面说到了两个流的JOIN,但是实际场景中可能涉及到四个流,甚至六个流的JOIN,该如何实现呢?

4.1 场景1:多个流更新不频繁,需要实时join成一张表(多个维表JOIN成一张维表)

实现步骤:

1.用Canal实时同步MySQL binlog到Kafka,形成相应的流。

2.将表A流与原MySQL中其他表(表B、C、D)异步JOIN。对表E进行相应的增删改。

注意此处前提条件:

(1)MySQL的四张表更新不频繁,因为如果更新频繁,使用MySQL进行异步Join可能QPS要求达不到。

(2)表A去Join表B、C、D是根据情况选择,只需要Join对标E有增删改的表。

4.2 两个流Join(事实表与维表JOIN)

实现步骤:

1.用Canal实时同步MySQL binlog到Kafka,事实表A流。

2.用Canal实时同步MySQ维表B到Phoenix。

3.使用Kafka中A流异步Join Phoenix中的维表B,将结果写入到Phoenix中结果表C。

注意:

(1)此处将维表B实时同步到Phoenix中,是维表B的QPS比较高(这里的维表是一个广泛概念),如果QPS比较低,可以直接使用MySQL中的维表B。

4.3 两个事实表Join(不使用TimeWindowJoin)

两个实时表Join如果使用TimeWindowJoin就是将数据状态保存在Flink的Operate State中。首先,这里使用第三方存储Phoenix。其次IntervalJoin的缺点是其中一个流如果有延迟,而且延迟超过State的过期时间,就会存在数据丢失的情况。此处使用CoGroupJoin 侧流输出解决此问题。

实现步骤:

1.用Canal实时同步MySQL binlog到Kafka,事实表A流和B流。

2.使用A流 coGroup B流

3.A流late,sideputTag API/DB(使用API从数据库中异步JoinB表数据)。同理B流late,sideputTag API/DB(使用API从数据库中异步JoinA表数据)。

4.UNION。将所有流UNION起来并写入到Phoenix表C

注意:

此处与IntervalJoin的不同是,没有使用Flink的状态,而是将延迟的数据直接通过SideOutPutTag拿出来,并异步Join MySQL中的数据。

0 人点赞