序
本文主要研究一下flink Table的ScalarFunction
实例
代码语言:javascript复制public class HashCode extends ScalarFunction {
private int factor = 0;
@Override
public void open(FunctionContext context) throws Exception {
// access "hashcode_factor" parameter
// "12" would be the default value if parameter does not exist
factor = Integer.valueOf(context.getJobParameter("hashcode_factor", "12"));
}
public int eval(String s) {
return s.hashCode() * factor;
}
}
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
// set job parameter
Configuration conf = new Configuration();
conf.setString("hashcode_factor", "31");
env.getConfig().setGlobalJobParameters(conf);
// register the function
tableEnv.registerFunction("hashCode", new HashCode());
// use the function in Java Table API
myTable.select("string, string.hashCode(), hashCode(string)");
// use the function in SQL
tableEnv.sqlQuery("SELECT string, HASHCODE(string) FROM MyTable");
- HashCode继承了ScalarFunction,它定义了eval方法
ScalarFunction
flink-table_2.11-1.7.1-sources.jar!/org/apache/flink/table/functions/ScalarFunction.scala
代码语言:javascript复制abstract class ScalarFunction extends UserDefinedFunction {
/**
* Creates a call to a [[ScalarFunction]] in Scala Table API.
*
* @param params actual parameters of function
* @return [[Expression]] in form of a [[ScalarFunctionCall]]
*/
final def apply(params: Expression*): Expression = {
ScalarFunctionCall(this, params)
}
// ----------------------------------------------------------------------------------------------
/**
* Returns the result type of the evaluation method with a given signature.
*
* This method needs to be overridden in case Flink's type extraction facilities are not
* sufficient to extract the [[TypeInformation]] based on the return type of the evaluation
* method. Flink's type extraction facilities can handle basic types or
* simple POJOs but might be wrong for more complex, custom, or composite types.
*
* @param signature signature of the method the return type needs to be determined
* @return [[TypeInformation]] of result type or null if Flink should determine the type
*/
def getResultType(signature: Array[Class[_]]): TypeInformation[_] = null
/**
* Returns [[TypeInformation]] about the operands of the evaluation method with a given
* signature.
*
* In order to perform operand type inference in SQL (especially when NULL is used) it might be
* necessary to determine the parameter [[TypeInformation]] of an evaluation method.
* By default Flink's type extraction facilities are used for this but might be wrong for
* more complex, custom, or composite types.
*
* @param signature signature of the method the operand types need to be determined
* @return [[TypeInformation]] of operand types
*/
def getParameterTypes(signature: Array[Class[_]]): Array[TypeInformation[_]] = {
signature.map { c =>
try {
TypeExtractor.getForClass(c)
} catch {
case ite: InvalidTypesException =>
throw new ValidationException(
s"Parameter types of scalar function '${this.getClass.getCanonicalName}' cannot be "
s"automatically determined. Please provide type information manually.")
}
}
}
}
- ScalarFunction继承了UserDefinedFunction,它定义了apply、getResultType、getParameterTypes方法;ScalarFunction主要用于将零个/一个/多个scalar值map成新的scalar值,其map行为由用户自定义的public的eval方法来实现;另外一般建议使用原始类型作为declare parameters或者result types,比如用int替代DATE/TIME,用long替代TIMESTAMP
CRowProcessRunner
flink-table_2.11-1.7.1-sources.jar!/org/apache/flink/table/runtime/CRowProcessRunner.scala
代码语言:javascript复制class CRowProcessRunner(
name: String,
code: String,
@transient var returnType: TypeInformation[CRow])
extends ProcessFunction[CRow, CRow]
with ResultTypeQueryable[CRow]
with Compiler[ProcessFunction[Row, Row]]
with Logging {
private var function: ProcessFunction[Row, Row] = _
private var cRowWrapper: CRowWrappingCollector = _
override def open(parameters: Configuration): Unit = {
LOG.debug(s"Compiling ProcessFunction: $name nn Code:n$code")
val clazz = compile(getRuntimeContext.getUserCodeClassLoader, name, code)
LOG.debug("Instantiating ProcessFunction.")
function = clazz.newInstance()
FunctionUtils.setFunctionRuntimeContext(function, getRuntimeContext)
FunctionUtils.openFunction(function, parameters)
this.cRowWrapper = new CRowWrappingCollector()
}
override def processElement(
in: CRow,
ctx: ProcessFunction[CRow, CRow]#Context,
out: Collector[CRow])
: Unit = {
cRowWrapper.out = out
cRowWrapper.setChange(in.change)
function.processElement(
in.row,
ctx.asInstanceOf[ProcessFunction[Row, Row]#Context],
cRowWrapper)
}
override def getProducedType: TypeInformation[CRow] = returnType
override def close(): Unit = {
FunctionUtils.closeFunction(function)
}
}
- CRowProcessRunner的processElement方法调用了function.processElement,而function.processElement会去调用用户定义的ScalarFunction的eval方法;这里的function继承了ProcessFunction,它的code为CRowProcessRunner的构造器参数,由DataStreamCalc在translateToPlan方法中创建CRowProcessRunner的时候生成
ProcessFunction
flink-streaming-java_2.11-1.7.1-sources.jar!/org/apache/flink/streaming/api/functions/ProcessFunction.java
代码语言:javascript复制@PublicEvolving
public abstract class ProcessFunction<I, O> extends AbstractRichFunction {
private static final long serialVersionUID = 1L;
/**
* Process one element from the input stream.
*
* <p>This function can output zero or more elements using the {@link Collector} parameter
* and also update internal state or set timers using the {@link Context} parameter.
*
* @param value The input value.
* @param ctx A {@link Context} that allows querying the timestamp of the element and getting
* a {@link TimerService} for registering timers and querying the time. The
* context is only valid during the invocation of this method, do not store it.
* @param out The collector for returning result values.
*
* @throws Exception This method may throw exceptions. Throwing an exception will cause the operation
* to fail and may trigger recovery.
*/
public abstract void processElement(I value, Context ctx, Collector<O> out) throws Exception;
/**
* Called when a timer set using {@link TimerService} fires.
*
* @param timestamp The timestamp of the firing timer.
* @param ctx An {@link OnTimerContext} that allows querying the timestamp of the firing timer,
* querying the {@link TimeDomain} of the firing timer and getting a
* {@link TimerService} for registering timers and querying the time.
* The context is only valid during the invocation of this method, do not store it.
* @param out The collector for returning result values.
*
* @throws Exception This method may throw exceptions. Throwing an exception will cause the operation
* to fail and may trigger recovery.
*/
public void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out) throws Exception {}
/**
* Information available in an invocation of {@link #processElement(Object, Context, Collector)}
* or {@link #onTimer(long, OnTimerContext, Collector)}.
*/
public abstract class Context {
/**
* Timestamp of the element currently being processed or timestamp of a firing timer.
*
* <p>This might be {@code null}, for example if the time characteristic of your program
* is set to {@link org.apache.flink.streaming.api.TimeCharacteristic#ProcessingTime}.
*/
public abstract Long timestamp();
/**
* A {@link TimerService} for querying time and registering timers.
*/
public abstract TimerService timerService();
/**
* Emits a record to the side output identified by the {@link OutputTag}.
*
* @param outputTag the {@code OutputTag} that identifies the side output to emit to.
* @param value The record to emit.
*/
public abstract <X> void output(OutputTag<X> outputTag, X value);
}
/**
* Information available in an invocation of {@link #onTimer(long, OnTimerContext, Collector)}.
*/
public abstract class OnTimerContext extends Context {
/**
* The {@link TimeDomain} of the firing timer.
*/
public abstract TimeDomain timeDomain();
}
}
- ProcessFunction继承了AbstractRichFunction,它定义了抽象方法processElement
DataStreamCalc
flink-table_2.11-1.7.1-sources.jar!/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala
代码语言:javascript复制class DataStreamCalc(
cluster: RelOptCluster,
traitSet: RelTraitSet,
input: RelNode,
inputSchema: RowSchema,
schema: RowSchema,
calcProgram: RexProgram,
ruleDescription: String)
extends Calc(cluster, traitSet, input, calcProgram)
with CommonCalc
with DataStreamRel {
//......
override def translateToPlan(
tableEnv: StreamTableEnvironment,
queryConfig: StreamQueryConfig): DataStream[CRow] = {
val config = tableEnv.getConfig
val inputDataStream =
getInput.asInstanceOf[DataStreamRel].translateToPlan(tableEnv, queryConfig)
// materialize time attributes in condition
val condition = if (calcProgram.getCondition != null) {
val materializedCondition = RelTimeIndicatorConverter.convertExpression(
calcProgram.expandLocalRef(calcProgram.getCondition),
inputSchema.relDataType,
cluster.getRexBuilder)
Some(materializedCondition)
} else {
None
}
// filter out time attributes
val projection = calcProgram.getProjectList.asScala
.map(calcProgram.expandLocalRef)
val generator = new FunctionCodeGenerator(config, false, inputSchema.typeInfo)
val genFunction = generateFunction(
generator,
ruleDescription,
inputSchema,
schema,
projection,
condition,
config,
classOf[ProcessFunction[CRow, CRow]])
val inputParallelism = inputDataStream.getParallelism
val processFunc = new CRowProcessRunner(
genFunction.name,
genFunction.code,
CRowTypeInfo(schema.typeInfo))
inputDataStream
.process(processFunc)
.name(calcOpName(calcProgram, getExpressionString))
// keep parallelism to ensure order of accumulate and retract messages
.setParallelism(inputParallelism)
}
}
- DataStreamCalc的translateToPlan调用了CommonCalc的generateFunction方法,生成了genFunction,之后通过genFunction.name,genFunction.code及CRowTypeInfo创建了CRowProcessRunner;生成的code继承了ProcessFunction,实现了processElement方法,该方法会去调用用户定义的ScalarFunction的eval方法
小结
- ScalarFunction继承了UserDefinedFunction,它定义了apply、getResultType、getParameterTypes方法;ScalarFunction主要用于将零个/一个/多个scalar值map成新的scalar值,其map行为由用户自定义的public的eval方法来实现;另外一般建议使用原始类型作为declare parameters或者result types,比如用int替代DATE/TIME,用long替代TIMESTAMP
- CRowProcessRunner的processElement方法调用了function.processElement,而function.processElement会去调用用户定义的ScalarFunction的eval方法;这里的function继承了ProcessFunction,它的code为CRowProcessRunner的构造器参数,由DataStreamCalc在translateToPlan方法中创建CRowProcessRunner的时候生成;ProcessFunction继承了AbstractRichFunction,它定义了抽象方法processElement
- DataStreamCalc的translateToPlan调用了CommonCalc的generateFunction方法,生成了genFunction,之后通过genFunction.name,genFunction.code及CRowTypeInfo创建了CRowProcessRunner;生成的code继承了ProcessFunction,实现了processElement方法,该方法会去调用用户定义的ScalarFunction的eval方法
doc
- Integrating UDFs with the Runtime