场景案例
先从一个实际业务场景理解Flink SQL中的撤回机制:设备状态上线/下线数量统计,上游采集设备状态发送到Kafka中,最开始是一个上线状态,此时统计到上线数量 1,过了一段时间该设备下线了,收到的下线的状态,那么此时应该是上线数量-1,下线数量 1,现在需要实现这样一个需求,看一下在Flink SQL里面如何实现
代码语言:javascript复制val env=StreamExecutionEnvironment.getExecutionEnvironment
val tabEnv=TableEnvironment.getTableEnvironment(env)
tabEnv.registerFunction("latestTimeUdf",newLatestTimeUdf())
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val kafkaConfig=newProperties()
kafkaConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092")
kafkaConfig.put(ConsumerConfig.GROUP_ID_CONFIG,"test1")
val consumer=newFlinkKafkaConsumer011[String]("topic1",newSimpleStringSchema,kafkaConfig)
val ds=env.addSource(consumer)
.map(x=>{
val a=x.split(",")
DevData(a(0),a(1).toInt,a(2).toLong)
}).assignTimestampsAndWatermarks(newBoundedOutOfOrdernessTimestampExtractor[DevData](Time.milliseconds(1000)){
overridedef extractTimestamp(element:DevData):Long= element.times
})
tabEnv.registerDataStream("tbl1",ds,'devId,'status,'times,'rt.rowtime)
val dw=tabEnv.sqlQuery(
"""
select st,count(*) from (
select latestTimeUdf(status,times) st,devId from tbl1 group by devId
) a group by st
""".stripMargin)
dw.writeToSink(newPaulRetractStreamTableSink)
env.execute()
自定义udf获取最新的设备状态
代码语言:javascript复制publicclassLatestTimeUdfextendsAggregateFunction<Integer,TimeAndStatus>{
@OverridepublicTimeAndStatus createAccumulator(){
returnnewTimeAndStatus();
}
publicvoid accumulate(TimeAndStatus acc,Integer status,Long time){
if(time > acc.getTimes()){
acc.setStatus(status);
acc.setTimes(time);
}
}
@OverridepublicInteger getValue(TimeAndStatus timeAndStatus){
return timeAndStatus.getStatus();
}
}
看一组测试数据:
输入数据 dev1,1,1574347472000
得到结果:
2>(true,1,1)
继续输入dev1,0,1574347474000
得到结果:
2>(false,1,1)//撤回
2>(true,0,1)
第二条数据输入dev1新的状态数据,导致最后结果的变更。
源码分析
首先分析一下上述得到结果编码Ture或者False是如何确定的:
内部sql1: select latestTimeUdf(times,status) st,devId from tbl1 group by devId,这是一个聚合操作,目的是求出设备当前的状态;
对于外部sql2: select st,count(*) from (sql1) a group by st,同样是一个聚合操作,用于求出不同状态对应的设备数量
输入第一条数据dev1,1,1574347472000
对于sql1 来说会产生(true,dev1,1)
的结果,sql2 接受到该结果生成(true,1,1)
就是是结果数据;接着输入第二条数据dev1,0,1574347474000
,由于dev1的设备状态发生变更,sql1首先发送一条撤回数据(false,dev1,1)
,sql2收到该条数据判断是撤回数据会将之前的结果撤回产生一条(false,1,1)
的数据,sql1同时还会产生一条(true,dev1,0)
dev1当前的最新状态,sql2收到该条数据重新计算得到(true,0,1)
那么关于这一整套逻辑在Flink中是如何实现的?代码入口是:DataStreamGroupAggregate 聚合操作的物理执行计划,另外说明在table/SQL api里面数据流动的格式是CRow,包含两个字段:一个是Boolean类型,表示是否是撤回,另外一个是Row类型,真正的数据。
- 具体的执行逻辑是通过其translateToPlan来生成,通过AggregateUtil.createGroupAggregateFunction方法动态生成具体的Function,在生成Function 会判断上游消费的数据是否是可撤回来决定是否生成retract方法,比喻说sql1上游是消费kafka 非撤回流,所以在定义LatestTimeUdf 并没有定义retract,sql2 消费sql1的输出,sql1会产生可撤回消息,那么在其内部会生成retract方法,这部分是代码自动生成的
- 生成的Function被GroupAggProcessFunction包装,最主要的就是这里面processElement方法的逻辑
- registerProcessingCleanupTimer注册状态的过期时间,过期配置通过StreamQueryConfig获取,后面定时触发会调用onTimer方法
val currentTime = ctx.timerService().currentProcessingTime()
// register state-cleanup timer
registerProcessingCleanupTimer(ctx, currentTime)
- state 存储中间结果状态、cntState存储流入对应key数量,获取当前中间结果accumulators,如果为空则,通过createAccumulators创建,获取当前对应key数量inputCnt,如果为空,则初始化为0
val input = inputC.row
// get accumulators and input counter
var accumulators = state.value()
var inputCnt = cntState.value()
if(null== accumulators){
firstRow =true
accumulators =function.createAccumulators()
}else{
firstRow =false
}
if(null== inputCnt){
inputCnt =0L
}
- newRow/prevRow 分别对应新产生结果(撤回标识True)与之前的结果(撤回标识False),setForwardedFields 设置输出的key, setAggregationResults将之前的结果设置到prevRow中
// Set group keys value to the final output
function.setForwardedFields(input, newRow.row)
function.setForwardedFields(input, prevRow.row)
// Set previous aggregate result to the prevRow
function.setAggregationResults(accumulators, prevRow.row)
- 如果输入的是insert即True, 则inputCnt 1, 调用accumulate 将当前流入数据添加到中间结果accumulators中得到新的结果,调用setAggregationResults设置新的结果到newRow结果中, 如果输入的是retract即False, 则inputCnt-1,调用retract从accumulators撤回当前的输入得到新的结果,调用setAggregationResults设置新的结果到newRow结果中
// update aggregate result and set to the newRow
if(inputC.change){
inputCnt =1
// accumulate input
function.accumulate(accumulators, input)
function.setAggregationResults(accumulators, newRow.row)
}else{
inputCnt -=1
// retract input
function.retract(accumulators, input)
function.setAggregationResults(accumulators, newRow.row)
}
- 如果当前的inputCnt!=0, 表明当前中间状态还有数据,那么就更新当前state/cntState, 接下来判断是否发送撤回数据,如果当前没有中间状态,那么就表示需要撤回之前的数据,然后清空状态
if(inputCnt !=0){
// we aggregated at least one record for this key
// update the state
state.update(accumulators)//更新状态操作
cntState.update(inputCnt)
// if this was not the first row
if(!firstRow){
if(prevRow.row.equals(newRow.row)&&!stateCleaningEnabled){
//如果处理前后的结果是一致的并且也没有TTL那么就没有发送一条数据到下游,
//这里前后一致不发送很好理解,假如说有ttl, 那也是需要针对下游需要做状态过期时间的更新
return
}else{
// retract previous result
if(generateRetraction){//是否生成撤回数据
out.collect(prevRow)
}
}
}
// emit the new result
out.collect(newRow)//发出新的结果
}else{
// we retracted the last record for this key
// sent out a delete message
out.collect(prevRow)
// and clear all state
state.clear()//清空状态
cntState.clear()
}
总结
总的来说撤回机制是需要状态、撤回操作的支持,状态是为了保存当前的数据,下次如果需要发生撤回,就将该数据发出去,撤回操作可以理解为function里面的retract方法,能够支持这个数据撤回的计算操作。