聊聊flink Table的ScalarFunction

2019-02-10 08:34:59 浏览数 (1)

本文主要研究一下flink Table的ScalarFunction

实例

代码语言:javascript复制
public class HashCode extends ScalarFunction {
​
    private int factor = 0;
​
    @Override
    public void open(FunctionContext context) throws Exception {
        // access "hashcode_factor" parameter
        // "12" would be the default value if parameter does not exist
        factor = Integer.valueOf(context.getJobParameter("hashcode_factor", "12")); 
    }
​
    public int eval(String s) {
        return s.hashCode() * factor;
    }
}
​
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
​
// set job parameter
Configuration conf = new Configuration();
conf.setString("hashcode_factor", "31");
env.getConfig().setGlobalJobParameters(conf);
​
// register the function
tableEnv.registerFunction("hashCode", new HashCode());
​
// use the function in Java Table API
myTable.select("string, string.hashCode(), hashCode(string)");
​
// use the function in SQL
tableEnv.sqlQuery("SELECT string, HASHCODE(string) FROM MyTable");
  • HashCode继承了ScalarFunction,它定义了eval方法

ScalarFunction

flink-table_2.11-1.7.1-sources.jar!/org/apache/flink/table/functions/ScalarFunction.scala

代码语言:javascript复制
abstract class ScalarFunction extends UserDefinedFunction {
​
  /**
    * Creates a call to a [[ScalarFunction]] in Scala Table API.
    *
    * @param params actual parameters of function
    * @return [[Expression]] in form of a [[ScalarFunctionCall]]
    */
  final def apply(params: Expression*): Expression = {
    ScalarFunctionCall(this, params)
  }
​
  // ----------------------------------------------------------------------------------------------
​
  /**
    * Returns the result type of the evaluation method with a given signature.
    *
    * This method needs to be overridden in case Flink's type extraction facilities are not
    * sufficient to extract the [[TypeInformation]] based on the return type of the evaluation
    * method. Flink's type extraction facilities can handle basic types or
    * simple POJOs but might be wrong for more complex, custom, or composite types.
    *
    * @param signature signature of the method the return type needs to be determined
    * @return [[TypeInformation]] of result type or null if Flink should determine the type
    */
  def getResultType(signature: Array[Class[_]]): TypeInformation[_] = null
​
  /**
    * Returns [[TypeInformation]] about the operands of the evaluation method with a given
    * signature.
    *
    * In order to perform operand type inference in SQL (especially when NULL is used) it might be
    * necessary to determine the parameter [[TypeInformation]] of an evaluation method.
    * By default Flink's type extraction facilities are used for this but might be wrong for
    * more complex, custom, or composite types.
    *
    * @param signature signature of the method the operand types need to be determined
    * @return [[TypeInformation]] of operand types
    */
  def getParameterTypes(signature: Array[Class[_]]): Array[TypeInformation[_]] = {
    signature.map { c =>
      try {
        TypeExtractor.getForClass(c)
      } catch {
        case ite: InvalidTypesException =>
          throw new ValidationException(
            s"Parameter types of scalar function '${this.getClass.getCanonicalName}' cannot be "  
            s"automatically determined. Please provide type information manually.")
      }
    }
  }
}
  • ScalarFunction继承了UserDefinedFunction,它定义了apply、getResultType、getParameterTypes方法;ScalarFunction主要用于将零个/一个/多个scalar值map成新的scalar值,其map行为由用户自定义的public的eval方法来实现;另外一般建议使用原始类型作为declare parameters或者result types,比如用int替代DATE/TIME,用long替代TIMESTAMP

CRowProcessRunner

flink-table_2.11-1.7.1-sources.jar!/org/apache/flink/table/runtime/CRowProcessRunner.scala

代码语言:javascript复制
class CRowProcessRunner(
    name: String,
    code: String,
    @transient var returnType: TypeInformation[CRow])
  extends ProcessFunction[CRow, CRow]
  with ResultTypeQueryable[CRow]
  with Compiler[ProcessFunction[Row, Row]]
  with Logging {
​
  private var function: ProcessFunction[Row, Row] = _
  private var cRowWrapper: CRowWrappingCollector = _
​
  override def open(parameters: Configuration): Unit = {
    LOG.debug(s"Compiling ProcessFunction: $name nn Code:n$code")
    val clazz = compile(getRuntimeContext.getUserCodeClassLoader, name, code)
    LOG.debug("Instantiating ProcessFunction.")
    function = clazz.newInstance()
    FunctionUtils.setFunctionRuntimeContext(function, getRuntimeContext)
    FunctionUtils.openFunction(function, parameters)
​
    this.cRowWrapper = new CRowWrappingCollector()
  }
​
  override def processElement(
      in: CRow,
      ctx: ProcessFunction[CRow, CRow]#Context,
      out: Collector[CRow])
    : Unit = {
​
    cRowWrapper.out = out
    cRowWrapper.setChange(in.change)
    function.processElement(
      in.row,
      ctx.asInstanceOf[ProcessFunction[Row, Row]#Context],
      cRowWrapper)
  }
​
  override def getProducedType: TypeInformation[CRow] = returnType
​
  override def close(): Unit = {
    FunctionUtils.closeFunction(function)
  }
}
  • CRowProcessRunner的processElement方法调用了function.processElement,而function.processElement会去调用用户定义的ScalarFunction的eval方法;这里的function继承了ProcessFunction,它的code为CRowProcessRunner的构造器参数,由DataStreamCalc在translateToPlan方法中创建CRowProcessRunner的时候生成

ProcessFunction

flink-streaming-java_2.11-1.7.1-sources.jar!/org/apache/flink/streaming/api/functions/ProcessFunction.java

代码语言:javascript复制
@PublicEvolving
public abstract class ProcessFunction<I, O> extends AbstractRichFunction {
​
    private static final long serialVersionUID = 1L;
​
    /**
     * Process one element from the input stream.
     *
     * <p>This function can output zero or more elements using the {@link Collector} parameter
     * and also update internal state or set timers using the {@link Context} parameter.
     *
     * @param value The input value.
     * @param ctx A {@link Context} that allows querying the timestamp of the element and getting
     *            a {@link TimerService} for registering timers and querying the time. The
     *            context is only valid during the invocation of this method, do not store it.
     * @param out The collector for returning result values.
     *
     * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation
     *                   to fail and may trigger recovery.
     */
    public abstract void processElement(I value, Context ctx, Collector<O> out) throws Exception;
​
    /**
     * Called when a timer set using {@link TimerService} fires.
     *
     * @param timestamp The timestamp of the firing timer.
     * @param ctx An {@link OnTimerContext} that allows querying the timestamp of the firing timer,
     *            querying the {@link TimeDomain} of the firing timer and getting a
     *            {@link TimerService} for registering timers and querying the time.
     *            The context is only valid during the invocation of this method, do not store it.
     * @param out The collector for returning result values.
     *
     * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation
     *                   to fail and may trigger recovery.
     */
    public void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out) throws Exception {}
​
    /**
     * Information available in an invocation of {@link #processElement(Object, Context, Collector)}
     * or {@link #onTimer(long, OnTimerContext, Collector)}.
     */
    public abstract class Context {
​
        /**
         * Timestamp of the element currently being processed or timestamp of a firing timer.
         *
         * <p>This might be {@code null}, for example if the time characteristic of your program
         * is set to {@link org.apache.flink.streaming.api.TimeCharacteristic#ProcessingTime}.
         */
        public abstract Long timestamp();
​
        /**
         * A {@link TimerService} for querying time and registering timers.
         */
        public abstract TimerService timerService();
​
        /**
         * Emits a record to the side output identified by the {@link OutputTag}.
         *
         * @param outputTag the {@code OutputTag} that identifies the side output to emit to.
         * @param value The record to emit.
         */
        public abstract <X> void output(OutputTag<X> outputTag, X value);
    }
​
    /**
     * Information available in an invocation of {@link #onTimer(long, OnTimerContext, Collector)}.
     */
    public abstract class OnTimerContext extends Context {
        /**
         * The {@link TimeDomain} of the firing timer.
         */
        public abstract TimeDomain timeDomain();
    }
​
}
  • ProcessFunction继承了AbstractRichFunction,它定义了抽象方法processElement

DataStreamCalc

flink-table_2.11-1.7.1-sources.jar!/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala

代码语言:javascript复制
class DataStreamCalc(
    cluster: RelOptCluster,
    traitSet: RelTraitSet,
    input: RelNode,
    inputSchema: RowSchema,
    schema: RowSchema,
    calcProgram: RexProgram,
    ruleDescription: String)
  extends Calc(cluster, traitSet, input, calcProgram)
  with CommonCalc
  with DataStreamRel {
​
  //......
​
  override def translateToPlan(
      tableEnv: StreamTableEnvironment,
      queryConfig: StreamQueryConfig): DataStream[CRow] = {
​
    val config = tableEnv.getConfig
​
    val inputDataStream =
      getInput.asInstanceOf[DataStreamRel].translateToPlan(tableEnv, queryConfig)
​
    // materialize time attributes in condition
    val condition = if (calcProgram.getCondition != null) {
      val materializedCondition = RelTimeIndicatorConverter.convertExpression(
        calcProgram.expandLocalRef(calcProgram.getCondition),
        inputSchema.relDataType,
        cluster.getRexBuilder)
      Some(materializedCondition)
    } else {
      None
    }
​
    // filter out time attributes
    val projection = calcProgram.getProjectList.asScala
      .map(calcProgram.expandLocalRef)
​
    val generator = new FunctionCodeGenerator(config, false, inputSchema.typeInfo)
​
    val genFunction = generateFunction(
      generator,
      ruleDescription,
      inputSchema,
      schema,
      projection,
      condition,
      config,
      classOf[ProcessFunction[CRow, CRow]])
​
    val inputParallelism = inputDataStream.getParallelism
​
    val processFunc = new CRowProcessRunner(
      genFunction.name,
      genFunction.code,
      CRowTypeInfo(schema.typeInfo))
​
    inputDataStream
      .process(processFunc)
      .name(calcOpName(calcProgram, getExpressionString))
      // keep parallelism to ensure order of accumulate and retract messages
      .setParallelism(inputParallelism)
  }
}
  • DataStreamCalc的translateToPlan调用了CommonCalc的generateFunction方法,生成了genFunction,之后通过genFunction.name,genFunction.code及CRowTypeInfo创建了CRowProcessRunner;生成的code继承了ProcessFunction,实现了processElement方法,该方法会去调用用户定义的ScalarFunction的eval方法

小结

  • ScalarFunction继承了UserDefinedFunction,它定义了apply、getResultType、getParameterTypes方法;ScalarFunction主要用于将零个/一个/多个scalar值map成新的scalar值,其map行为由用户自定义的public的eval方法来实现;另外一般建议使用原始类型作为declare parameters或者result types,比如用int替代DATE/TIME,用long替代TIMESTAMP
  • CRowProcessRunner的processElement方法调用了function.processElement,而function.processElement会去调用用户定义的ScalarFunction的eval方法;这里的function继承了ProcessFunction,它的code为CRowProcessRunner的构造器参数,由DataStreamCalc在translateToPlan方法中创建CRowProcessRunner的时候生成;ProcessFunction继承了AbstractRichFunction,它定义了抽象方法processElement
  • DataStreamCalc的translateToPlan调用了CommonCalc的generateFunction方法,生成了genFunction,之后通过genFunction.name,genFunction.code及CRowTypeInfo创建了CRowProcessRunner;生成的code继承了ProcessFunction,实现了processElement方法,该方法会去调用用户定义的ScalarFunction的eval方法

doc

  • Integrating UDFs with the Runtime

0 人点赞