Flink SQL 中TableFunction使用分析

2022-04-18 13:03:15 浏览数 (1)

本篇幅介绍Flink Table/SQL中如何自定义一个表函数(TableFunction),介绍其基本用法以及与源码结合分析其调用流程。

基本使用

表函数TableFunction相对标量函数ScalarFunction一对一,它是一个一对多的情况,通常使用TableFunction来完成列转行的一个操作。先通过一个实际案例了解其用法:终端设备上报数据,数据类型包含温度、耗电量等,上报方式是以多条方式上报,例如:

现在希望得到如下数据格式:

这是一个典型的列转行或者一行转多行的场景,需要将data列进行拆分成为多行多列,先看下代码实现:

代码语言:javascript复制
public class MyUDTF extends TableFunction<Row>{



public void eval(String s){

JSONArray jsonArray =JSONArray.parseArray(s);

for(int i =0; i < jsonArray.size(); i  ){

JSONObject jsonObject = jsonArray.getJSONObject(i);

String type = jsonObject.getString("type");

String value = jsonObject.getString("value");

            collector.collect(Row.of(type, value));

}

}



@Overridepublic TypeInformation<Row> getResultType(){

returnTypes.ROW(Types.STRING(),Types.STRING());

}

}
 

在MyUDTF中继承了TableFunction<T>, 所有的自定义表函数都必须继承该抽象类,其中T表示返回的数据类型,通常如果是原子类型则直接指定例如String, 如果是复合类型通常会选择Row, FlinkSQL 通过类型提取可以自动识别返回的类型,如果识别不了需要重载其getResultType方法,指定其返回的TypeInformation,重点看下eval 方法定义:

  • eval 方法, 处理数据的方法,必须声明为public/not static,并且该方法可以重载,会自动根据不同的输入参数选择对应的eval, 在eval方法里面可以使用collector对象将数据发送出去,该对象是从TableFunction继承过来的。

调用如下:

代码语言:javascript复制
def main(args:Array[String]):Unit={

    val env =StreamExecutionEnvironment.getExecutionEnvironment

    val tabEnv =TableEnvironment.getTableEnvironment(env)

    tabEnv.registerFunction("udtf",newMyUDTF)

    val kafkaConfig =newProperties();

    kafkaConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");

    kafkaConfig.put(ConsumerConfig.GROUP_ID_CONFIG,"test1");

    val consumer =newFlinkKafkaConsumer[String]("topic1",newSimpleStringSchema(), kafkaConfig);



    val ds:DataStream[(String, java.lang.Long,String)]= env.addSource(consumer)

.map(x =>{

        val obj = JSON.parseObject(x, classOf[RawData])

Tuple3.apply(obj.devId, obj.time, obj.data)

})



    tabEnv.registerDataStream("tbl1", ds,'devId, 'time,'data)

    val rsTab = tabEnv.sqlQuery("select devId,`time`,`type`,`value` from tbl1 , LATERAL TABLE(udtf(data)) as t(`type`,`value`) ")

      .writeToSink(new PaulRetractStreamTableSink)

    env.execute()

  }
 

测试数据:

代码语言:javascript复制
{"devid":"dev01","time":1574944573000,"data":[{"type":"temperature","value":"10"},{"type":"battery","value":"1"}]}
 

得到结果:

代码语言:javascript复制
3>(true,dev01,1574944573000,temperature,10)

3>(true,dev01,1574944573000,battery,1)
 

至此拿到了符合要求的数据。在Flink SQL中使用TableFunction需要搭配LATERAL TABLE一起使用,将其认为是一张虚拟的表,整个过程就是一个Join with Table Function过程,左表(tbl1) 会join 右表(t1) 的每一条记录。但是也存在另外一种情况右表(t1)没有输出但是也需要左表输出那么可以使用LEFT JOIN LATERAL TABLE,用法如下:

代码语言:javascript复制
SELECT users, tag

FROM Orders LEFT JOIN LATERAL TABLE(unnest_udtf(tags)) t AS tag ON TRUE

对于右表没有输出会自动补上null。

源码分析

在介绍源码分析之前先安利一个小技巧,很多时候比较难找到Flink SQL解析之后的任务具体执行过程,这个时候可以通过先打印其执行计划,使用方式:

代码语言:javascript复制
println(tabEnv.explain(rsTab))

就可以得到其抽象语法树、逻辑执行计划、物理执行计划:

代码语言:javascript复制
==AbstractSyntaxTree==

LogicalProject(devId=[$0], time=[$1], type=[$3], value=[$4])

LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{2}])

LogicalTableScan(table=[[tbl1]])

LogicalTableFunctionScan(invocation=[udtf($cor0.data)], rowType=[RecordType(VARCHAR(65536) f0, VARCHAR(65536) f1)], elementType=[class[Ljava.lang.Object;])



==OptimizedLogicalPlan==

DataStreamCalc(select=[devId, time, f0 AS type, f1 AS value])

DataStreamCorrelate(invocation=[udtf($cor0.data)], correlate=[table(udtf($cor0.data))],select=[devId, time, data, f0, f1], rowType=[RecordType(VARCHAR(65536) devId, BIGINT time, VARCHAR(65536) data, VARCHAR(65536) f0, VARCHAR(65536) f1)], joinType=[INNER])

DataStreamScan(table=[[tbl1]])



==PhysicalExecutionPlan==

Stage1:DataSource

    content : collect elements withCollectionInputFormat



Stage2:Operator

        content :Map

        ship_strategy : FORWARD



Stage3:Operator

            content :from:(devId, time, data)

            ship_strategy : FORWARD

...........
 

可以从逻辑执行计划入手,Table Function Join 对应DataStreamCorrelate,重点在于其translateToPlan方法:

  • generateFunction 调用,生成一个ProcessFunction函数,内部封装用户自定义的TableFunction, 在该ProcessFunction里面会调用TableFunction的eval方法,由于该Function是动态生成的,可以通过debug方法查看,这里感受一下在processElement里面调用eval的代码:
代码语言:javascript复制
function_udf$MyUDTF$086f769e79e46e52752c8500480e4b32.eval(isNull$21 ?null:(java.lang.String) result$20);
  1. function_udfMyUDTF086f769e79e46e52752c8500480e4b32.eval(isNull21 ?null:(java.lang.String) result20);
  • generateCollector调用,生成的是一个TableFunctionCollector 类型的collector,这部分也是动态生成的
  • CRowCorrelateProcessRunner 也是一个ProcessFunction, 内部包含了generateFunction生成的function 与generateCollector生成的collector, 在其初始化open的时候会将该collector赋给function

接下来从CRowCorrelateProcessRunner的processElement方法看整个调用流程:

代码语言:javascript复制
    cRowWrapper.out=out

    cRowWrapper.setChange(in.change)

    collector.setCollector(cRowWrapper)

    collector.setInput(in.row)//重点input 信息设置到动态生成的collector

    collector.reset()
 
代码语言:javascript复制
function.processElement(

in.row,

      ctx.asInstanceOf[ProcessFunction[Row,Row]#Context],

      cRowWrapper)
 

这步调用动态生成的function, 在其processElement里面调用eval方法,eval 会调用动态生成的collector,这个步骤就可以理解为是一个join过程, 最终输出组合数据。

0 人点赞