聊聊flink的TableFunction

2019-02-08 11:20:19 浏览数 (1)

本文主要研究一下flink的TableFunction

实例

代码语言:javascript复制
// The generic type "Tuple2<String, Integer>" determines the schema of the returned table as (String, Integer).
public class Split extends TableFunction<Tuple2<String, Integer>> {
    private String separator = " ";
    
    public Split(String separator) {
        this.separator = separator;
    }
    
    public void eval(String str) {
        for (String s : str.split(separator)) {
            // use collect(...) to emit a row
            collect(new Tuple2<String, Integer>(s, s.length()));
        }
    }
}
​
BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
Table myTable = ...         // table schema: [a: String]
​
// Register the function.
tableEnv.registerFunction("split", new Split("#"));
​
// Use the table function in the Java Table API. "as" specifies the field names of the table.
myTable.join(new Table(tableEnv, "split(a) as (word, length)"))
    .select("a, word, length");
myTable.leftOuterJoin(new Table(tableEnv, "split(a) as (word, length)"))
    .select("a, word, length");
​
// Use the table function in SQL with LATERAL and TABLE keywords.
// CROSS JOIN a table function (equivalent to "join" in Table API).
tableEnv.sqlQuery("SELECT a, word, length FROM MyTable, LATERAL TABLE(split(a)) as T(word, length)");
// LEFT JOIN a table function (equivalent to "leftOuterJoin" in Table API).
tableEnv.sqlQuery("SELECT a, word, length FROM MyTable LEFT JOIN LATERAL TABLE(split(a)) as T(word, length) ON TRUE");
  • 本实例自定义了Split function,并通过TableEnvironment.registerFunction注册,最后在Table的api或者TableEnvironment.sqlQuery中使用;这里的Split定义了public的eval方法,用于发射数据

UserDefinedFunction

flink-table_2.11-1.7.1-sources.jar!/org/apache/flink/table/functions/UserDefinedFunction.scala

代码语言:javascript复制
abstract class UserDefinedFunction extends Serializable {
  /**
    * Setup method for user-defined function. It can be used for initialization work.
    *
    * By default, this method does nothing.
    */
  @throws(classOf[Exception])
  def open(context: FunctionContext): Unit = {}
​
  /**
    * Tear-down method for user-defined function. It can be used for clean up work.
    *
    * By default, this method does nothing.
    */
  @throws(classOf[Exception])
  def close(): Unit = {}
​
  /**
    * @return true if and only if a call to this function is guaranteed to always return
    *         the same result given the same parameters; true is assumed by default
    *         if user's function is not pure functional, like random(), date(), now()...
    *         isDeterministic must return false
    */
  def isDeterministic: Boolean = true
​
  final def functionIdentifier: String = {
    val md5 = EncodingUtils.hex(EncodingUtils.md5(EncodingUtils.encodeObjectToString(this)))
    getClass.getCanonicalName.replace('.', '$').concat("$").concat(md5)
  }
​
  /**
    * Returns the name of the UDF that is used for plan explain and logging.
    */
  override def toString: String = getClass.getSimpleName
​
}
  • UserDefinedFunction定义了open、close、functionIdentifier方法

TableFunction

flink-table_2.11-1.7.1-sources.jar!/org/apache/flink/table/functions/TableFunction.scala

代码语言:javascript复制
abstract class TableFunction[T] extends UserDefinedFunction {
​
  // ----------------------------------------------------------------------------------------------
​
  /**
    * Emit an output row.
    *
    * @param row the output row
    */
  protected def collect(row: T): Unit = {
    collector.collect(row)
  }
​
  // ----------------------------------------------------------------------------------------------
​
  /**
    * The code generated collector used to emit row.
    */
  private var collector: Collector[T] = _
​
  /**
    * Internal use. Sets the current collector.
    */
  private[flink] final def setCollector(collector: Collector[T]): Unit = {
    this.collector = collector
  }
​
  // ----------------------------------------------------------------------------------------------
​
  /**
    * Returns the result type of the evaluation method with a given signature.
    *
    * This method needs to be overridden in case Flink's type extraction facilities are not
    * sufficient to extract the [[TypeInformation]] based on the return type of the evaluation
    * method. Flink's type extraction facilities can handle basic types or
    * simple POJOs but might be wrong for more complex, custom, or composite types.
    *
    * @return [[TypeInformation]] of result type or null if Flink should determine the type
    */
  def getResultType: TypeInformation[T] = null
​
  /**
    * Returns [[TypeInformation]] about the operands of the evaluation method with a given
    * signature.
    *
    * In order to perform operand type inference in SQL (especially when NULL is used) it might be
    * necessary to determine the parameter [[TypeInformation]] of an evaluation method.
    * By default Flink's type extraction facilities are used for this but might be wrong for
    * more complex, custom, or composite types.
    *
    * @param signature signature of the method the operand types need to be determined
    * @return [[TypeInformation]] of operand types
    */
  def getParameterTypes(signature: Array[Class[_]]): Array[TypeInformation[_]] = {
    signature.map { c =>
      try {
        TypeExtractor.getForClass(c)
      } catch {
        case ite: InvalidTypesException =>
          throw new ValidationException(
            s"Parameter types of table function '${this.getClass.getCanonicalName}' cannot be "  
            s"automatically determined. Please provide type information manually.")
      }
    }
  }
​
}
  • TableFunction继承了UserDefinedFunction,定义了collect、setCollector、getResultType、getParameterTypes方法

ProcessOperator

flink-streaming-java_2.11-1.7.1-sources.jar!/org/apache/flink/streaming/api/operators/ProcessOperator.java

代码语言:javascript复制
@Internal
public class ProcessOperator<IN, OUT>
        extends AbstractUdfStreamOperator<OUT, ProcessFunction<IN, OUT>>
        implements OneInputStreamOperator<IN, OUT> {
​
    private static final long serialVersionUID = 1L;
​
    private transient TimestampedCollector<OUT> collector;
​
    private transient ContextImpl context;
​
    /** We listen to this ourselves because we don't have an {@link InternalTimerService}. */
    private long currentWatermark = Long.MIN_VALUE;
​
    public ProcessOperator(ProcessFunction<IN, OUT> function) {
        super(function);
​
        chainingStrategy = ChainingStrategy.ALWAYS;
    }
​
    @Override
    public void open() throws Exception {
        super.open();
        collector = new TimestampedCollector<>(output);
​
        context = new ContextImpl(userFunction, getProcessingTimeService());
    }
​
    @Override
    public void processElement(StreamRecord<IN> element) throws Exception {
        collector.setTimestamp(element);
        context.element = element;
        userFunction.processElement(element.getValue(), context, collector);
        context.element = null;
    }
​
    @Override
    public void processWatermark(Watermark mark) throws Exception {
        super.processWatermark(mark);
        this.currentWatermark = mark.getTimestamp();
    }
​
    //......
}
  • ProcessOperator的processElement方法调用了userFunction.processElement,这里userFunction为CRowCorrelateProcessRunner

CRowCorrelateProcessRunner

flink-table_2.11-1.7.1-sources.jar!/org/apache/flink/table/runtime/CRowCorrelateProcessRunner.scala

代码语言:javascript复制
class CRowCorrelateProcessRunner(
    processName: String,
    processCode: String,
    collectorName: String,
    collectorCode: String,
    @transient var returnType: TypeInformation[CRow])
  extends ProcessFunction[CRow, CRow]
  with ResultTypeQueryable[CRow]
  with Compiler[Any]
  with Logging {
​
  private var function: ProcessFunction[Row, Row] = _
  private var collector: TableFunctionCollector[_] = _
  private var cRowWrapper: CRowWrappingCollector = _
​
  override def open(parameters: Configuration): Unit = {
    LOG.debug(s"Compiling TableFunctionCollector: $collectorName nn Code:n$collectorCode")
    val clazz = compile(getRuntimeContext.getUserCodeClassLoader, collectorName, collectorCode)
    LOG.debug("Instantiating TableFunctionCollector.")
    collector = clazz.newInstance().asInstanceOf[TableFunctionCollector[_]]
    this.cRowWrapper = new CRowWrappingCollector()
​
    LOG.debug(s"Compiling ProcessFunction: $processName nn Code:n$processCode")
    val processClazz = compile(getRuntimeContext.getUserCodeClassLoader, processName, processCode)
    val constructor = processClazz.getConstructor(classOf[TableFunctionCollector[_]])
    LOG.debug("Instantiating ProcessFunction.")
    function = constructor.newInstance(collector).asInstanceOf[ProcessFunction[Row, Row]]
    FunctionUtils.setFunctionRuntimeContext(collector, getRuntimeContext)
    FunctionUtils.setFunctionRuntimeContext(function, getRuntimeContext)
    FunctionUtils.openFunction(collector, parameters)
    FunctionUtils.openFunction(function, parameters)
  }
​
  override def processElement(
      in: CRow,
      ctx: ProcessFunction[CRow, CRow]#Context,
      out: Collector[CRow])
    : Unit = {
​
    cRowWrapper.out = out
    cRowWrapper.setChange(in.change)
​
    collector.setCollector(cRowWrapper)
    collector.setInput(in.row)
    collector.reset()
​
    function.processElement(
      in.row,
      ctx.asInstanceOf[ProcessFunction[Row, Row]#Context],
      cRowWrapper)
  }
​
  override def getProducedType: TypeInformation[CRow] = returnType
​
  override def close(): Unit = {
    FunctionUtils.closeFunction(collector)
    FunctionUtils.closeFunction(function)
  }
}
  • CRowCorrelateProcessRunner的processElement方法调用了function.processElement,这里function会去调用Split的eval方法

小结

  • TableFunction继承了UserDefinedFunction,定义了collect、setCollector、getResultType、getParameterTypes方法;UserDefinedFunction定义了open、close、functionIdentifier方法
  • 自定义TableFunction的话,除了继承TableFunction类外,还需要定义一个public的eval方法,该方法的参数类型需要依据使用场景来定义,比如本实例中调用split的时候传入的是table的a字段,该字段为String类型,因而eval方法的入参就定义为String类型
  • ProcessOperator的processElement方法调用了userFunction.processElement,这里userFunction为CRowCorrelateProcessRunner;CRowCorrelateProcessRunner的processElement方法调用了function.processElement,这里function会去调用Split的eval方法

doc

  • Table Functions

0 人点赞