Flink SQL自定义聚合函数

2022-04-18 12:59:51 浏览数 (1)

本篇幅介绍Flink Table/SQL中如何自定义一个聚合函数,介绍其基本用法、撤回定义以及与源码结合分析每个方法的调用位置。

基本使用

Flink Table/SQL Api中自带了一些常见的聚合函数,例如sum、min、max等,但是在实际开发中需要自定义符合业务需求的聚合函数,先从一个实际案例入手:设备随时上报状态,现在需要求出设备的当前最新状态。分析:设备上报状态会产生多条数据,现在只需要最新的状态数据即可,很明显这是多对一的聚合类型的操作,聚合逻辑是每次保留设备的最新状态与时间,下次设备上报数据时间与保留的数据时间进行比较,如果比其大则更新。实现代码如下:

代码语言:javascript复制
publicclassLatestTimeUdfextendsAggregateFunction<Integer,TimeAndStatus>{



@OverridepublicTimeAndStatus createAccumulator(){

returnnewTimeAndStatus();

}



publicvoid accumulate(TimeAndStatus acc,Integer status,Long time){

if(time > acc.getTimes()){

            acc.setStatus(status);

            acc.setTimes(time);

}

}



@OverridepublicInteger getValue(TimeAndStatus timeAndStatus){

return timeAndStatus.getStatus();

}

}
 

在Flink Table/SQL Api中自定义聚合函数需要继承AggregateFunction<T,ACC>, 其中T表示自定义函数返回的结果类型,在这里返回的是Integer 表示状态标识,ACC表示聚合的中间结果类型,这个表示TimeAndStatus存放时间与状态数据,该函数有两个指定该类型的方法getAccumulatorType与getResultType返回的都是TypeInformation类型,如果我们的T或者ACC是复杂类型Flink不能自动抽取的则需要手动指定。其每个方法定义如下:

  • createAccumulator 表示创建一个中间结果数据,由于是以设备为维度那么对于每一个设备都会调用一次该方法;
  • accumulate 表示将流入的数据聚合到createAccumulator创建的中间结果数据中,第一个参数表示的是ACC类型的中间结果数据,其他的表示自定义函数的入参,该方法可以接受不同类型、个数的入参,也就是该方法可以被重载,Flink会自动根据类型提取找到合适的方法。在这里接受的是Integer类型的设备状态与long类型的时间戳,处理逻辑就是与中间结果数据时间进行比较,如果比其大则将流入的时间与设备状态更新到中间结果中。另外在做一点补充accumulate的调用是相同维度的调用,即acc每次都是该维度的中间结果数据,入参也是该维度的数据;
  • getValue 表示一次返回的结果,结果数据从acc中获取,这个方法在accumulate之后被调用。

对于自定义聚合函数来说至少需要createAccumulator、accumulate、getValue这三个方法,并且这三个方法是public 、not static的类型。接下来就可以直接注册然后使用:

代码语言:javascript复制
 tabEnv.registerFunction("latestTimeUdf",newLatestTimeUdf())

val dw=tabEnv.sqlQuery(

"""select latestTimeUdf(status,times) st,devId from tbl1 group by devId

      """.stripMargin)

撤回定义

撤回机制对于Flink来说是一个很重要的特性,在Flink SQL中可撤回机制解密中详细分析了撤回的实现,其中retract是一个不可或缺的环节,其表示具体的回撤操作,对于自定义聚合函数,如果其接受到的是撤回流那么就必须实现该方法,看下其定义:

代码语言:javascript复制
publicvoid retract(ACC accumulator,[user defined inputs])

}
 

accumulator表示对应维度的中间结果数据,流入的数据表示需要撤回的数据,该方法表示将需要撤回的数据从中间结果中去除掉,Flink中默认实现了一些撤回的函数,例如SumWithRetractAggFunction:

代码语言:javascript复制
def retract(acc:SumWithRetractAccumulator[T], value:Any):Unit={

if(value !=null){

      val v = value.asInstanceOf[T]

      acc.f0 = numeric.minus(acc.f0, v)

      acc.f1 -=1

}

}
 

表示求和类的撤回,将需要撤回的value从acc中减掉。 在AggregateFunction 还提供了另外两个函数merge与resetAccumulator,merge 用在session window group或者批处理中,需要做一个合并的过程,resetAccumulator 用在批处理中,表示对中间结果的重置。

在源码中的调用位置

由于是聚合类的操作,仍然以GroupAggProcessFunction 来分析,在这里会调用自定义函数,但是只能是在非窗口的聚合中,通过processElement方法看下其调用流程

  • 中间结果acc是存储在状态中的,如果得到的状态为空,那么就会调用createAccumulator 方法
代码语言:javascript复制
var accumulators = state.value()

var inputCnt = cntState.value()

if(null== accumulators){

if(!inputC.change){

return

}

      firstRow =true

      accumulators =function.createAccumulators()//初始化

}else{

      firstRow =false

}
 

state是ValueState类型的,acc就存储在这里面,说明acc是容错并且具有一致性的。

  • 如果流入的数据是Insert类型就会调用accumulate方法,如果是Retract就调用retract方法,并且会调用getValue获取当前的结果数据
代码语言:javascript复制
if(inputC.change){

      inputCnt  =1

// accumulate input

function.accumulate(accumulators, input)

function.setAggregationResults(accumulators, newRow.row)//会调用getValue

}else{

      inputCnt -=1

// retract input

function.retract(accumulators, input)

function.setAggregationResults(accumulators, newRow.row)//会调用getValue

}
 

总结

自定义聚合函数是一个增量聚合的过程,中间结果保存在状态中,能够保证其容错与一致性语义。用户自定义聚合函数继承AggregateFunction即可,至少实现createAccumulator 、accumulate 、getValue这三个方法,其他方法都是可选的。

0 人点赞