Flink SQL中可撤回机制解密

2022-04-18 12:57:04 浏览数 (1)

场景案例

先从一个实际业务场景理解Flink SQL中的撤回机制:设备状态上线/下线数量统计,上游采集设备状态发送到Kafka中,最开始是一个上线状态,此时统计到上线数量 1,过了一段时间该设备下线了,收到的下线的状态,那么此时应该是上线数量-1,下线数量 1,现在需要实现这样一个需求,看一下在Flink SQL里面如何实现

代码语言:javascript复制
val env=StreamExecutionEnvironment.getExecutionEnvironment

    val tabEnv=TableEnvironment.getTableEnvironment(env)

    tabEnv.registerFunction("latestTimeUdf",newLatestTimeUdf())

    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)



    val kafkaConfig=newProperties()

    kafkaConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092")

    kafkaConfig.put(ConsumerConfig.GROUP_ID_CONFIG,"test1")



    val consumer=newFlinkKafkaConsumer011[String]("topic1",newSimpleStringSchema,kafkaConfig)

    val ds=env.addSource(consumer)

.map(x=>{

        val a=x.split(",")

DevData(a(0),a(1).toInt,a(2).toLong)

}).assignTimestampsAndWatermarks(newBoundedOutOfOrdernessTimestampExtractor[DevData](Time.milliseconds(1000)){

overridedef extractTimestamp(element:DevData):Long= element.times

})



    tabEnv.registerDataStream("tbl1",ds,'devId,'status,'times,'rt.rowtime)

    val dw=tabEnv.sqlQuery(

"""

        select st,count(*) from (

                select latestTimeUdf(status,times) st,devId from tbl1 group by devId

                ) a group by st

      """.stripMargin)

    dw.writeToSink(newPaulRetractStreamTableSink)

        env.execute()
 

自定义udf获取最新的设备状态

代码语言:javascript复制
publicclassLatestTimeUdfextendsAggregateFunction<Integer,TimeAndStatus>{



@OverridepublicTimeAndStatus createAccumulator(){

returnnewTimeAndStatus();

}



publicvoid accumulate(TimeAndStatus acc,Integer status,Long time){

if(time > acc.getTimes()){

            acc.setStatus(status);

            acc.setTimes(time);

}

}



@OverridepublicInteger getValue(TimeAndStatus timeAndStatus){

return timeAndStatus.getStatus();

}

}
 

看一组测试数据: 输入数据 dev1,1,1574347472000 得到结果:

代码语言:javascript复制
2>(true,1,1) 

继续输入dev1,0,1574347474000 得到结果:

代码语言:javascript复制
2>(false,1,1)//撤回

2>(true,0,1)

第二条数据输入dev1新的状态数据,导致最后结果的变更。

源码分析

首先分析一下上述得到结果编码Ture或者False是如何确定的: 内部sql1: select latestTimeUdf(times,status) st,devId from tbl1 group by devId,这是一个聚合操作,目的是求出设备当前的状态; 对于外部sql2: select st,count(*) from (sql1) a group by st,同样是一个聚合操作,用于求出不同状态对应的设备数量 输入第一条数据dev1,1,1574347472000对于sql1 来说会产生(true,dev1,1) 的结果,sql2 接受到该结果生成(true,1,1) 就是是结果数据;接着输入第二条数据dev1,0,1574347474000 ,由于dev1的设备状态发生变更,sql1首先发送一条撤回数据(false,dev1,1),sql2收到该条数据判断是撤回数据会将之前的结果撤回产生一条(false,1,1)的数据,sql1同时还会产生一条(true,dev1,0) dev1当前的最新状态,sql2收到该条数据重新计算得到(true,0,1)

那么关于这一整套逻辑在Flink中是如何实现的?代码入口是:DataStreamGroupAggregate 聚合操作的物理执行计划,另外说明在table/SQL api里面数据流动的格式是CRow,包含两个字段:一个是Boolean类型,表示是否是撤回,另外一个是Row类型,真正的数据。

  1. 具体的执行逻辑是通过其translateToPlan来生成,通过AggregateUtil.createGroupAggregateFunction方法动态生成具体的Function,在生成Function 会判断上游消费的数据是否是可撤回来决定是否生成retract方法,比喻说sql1上游是消费kafka 非撤回流,所以在定义LatestTimeUdf 并没有定义retract,sql2 消费sql1的输出,sql1会产生可撤回消息,那么在其内部会生成retract方法,这部分是代码自动生成的
  2. 生成的Function被GroupAggProcessFunction包装,最主要的就是这里面processElement方法的逻辑
  • registerProcessingCleanupTimer注册状态的过期时间,过期配置通过StreamQueryConfig获取,后面定时触发会调用onTimer方法
代码语言:javascript复制
val currentTime = ctx.timerService().currentProcessingTime()

// register state-cleanup timer

registerProcessingCleanupTimer(ctx, currentTime)
 
  • state 存储中间结果状态、cntState存储流入对应key数量,获取当前中间结果accumulators,如果为空则,通过createAccumulators创建,获取当前对应key数量inputCnt,如果为空,则初始化为0
代码语言:javascript复制
val input = inputC.row

// get accumulators and input counter

var accumulators = state.value()

var inputCnt = cntState.value()

if(null== accumulators){

      firstRow =true

      accumulators =function.createAccumulators()

}else{

      firstRow =false

}

if(null== inputCnt){

      inputCnt =0L

}
 
  • newRow/prevRow 分别对应新产生结果(撤回标识True)与之前的结果(撤回标识False),setForwardedFields 设置输出的key, setAggregationResults将之前的结果设置到prevRow中
代码语言:javascript复制
// Set group keys value to the final output

function.setForwardedFields(input, newRow.row)

function.setForwardedFields(input, prevRow.row)

// Set previous aggregate result to the prevRow

function.setAggregationResults(accumulators, prevRow.row)
 
  • 如果输入的是insert即True, 则inputCnt 1, 调用accumulate 将当前流入数据添加到中间结果accumulators中得到新的结果,调用setAggregationResults设置新的结果到newRow结果中, 如果输入的是retract即False, 则inputCnt-1,调用retract从accumulators撤回当前的输入得到新的结果,调用setAggregationResults设置新的结果到newRow结果中
代码语言:javascript复制
// update aggregate result and set to the newRow

if(inputC.change){

      inputCnt  =1

// accumulate input

function.accumulate(accumulators, input)

function.setAggregationResults(accumulators, newRow.row)

}else{

      inputCnt -=1

// retract input

function.retract(accumulators, input)

function.setAggregationResults(accumulators, newRow.row)

}
 
  • 如果当前的inputCnt!=0, 表明当前中间状态还有数据,那么就更新当前state/cntState, 接下来判断是否发送撤回数据,如果当前没有中间状态,那么就表示需要撤回之前的数据,然后清空状态
代码语言:javascript复制
if(inputCnt !=0){

// we aggregated at least one record for this key



// update the state

      state.update(accumulators)//更新状态操作

      cntState.update(inputCnt)



// if this was not the first row

if(!firstRow){

if(prevRow.row.equals(newRow.row)&&!stateCleaningEnabled){

//如果处理前后的结果是一致的并且也没有TTL那么就没有发送一条数据到下游,

//这里前后一致不发送很好理解,假如说有ttl, 那也是需要针对下游需要做状态过期时间的更新

return

}else{

// retract previous result

if(generateRetraction){//是否生成撤回数据

out.collect(prevRow)

}

}

}

// emit the new result

out.collect(newRow)//发出新的结果



}else{

// we retracted the last record for this key

// sent out a delete message

out.collect(prevRow)

// and clear all state

      state.clear()//清空状态

      cntState.clear()

}
 

总结

总的来说撤回机制是需要状态、撤回操作的支持,状态是为了保存当前的数据,下次如果需要发生撤回,就将该数据发出去,撤回操作可以理解为function里面的retract方法,能够支持这个数据撤回的计算操作。

0 人点赞