在Flink编程中,不管你是使用DataStream api还是 Table/SQL ,接触最多的就是UserFunction , 比喻说MapFunction、ScalarFunction, 在这些Function 里面可以自定义用户的业务处理逻辑,但是这些Function是如何被调用的呢?本文主要介绍Function 被调用的流程以及对应的方法如何被调用的。
核心调用逻辑
当我们编写完成一个Flink-Job 就会将代码打包成为jar提交到集群中去,当整个资源申请、任务调度完成之后就开始执行这个job,从source到transform 到最后sink 都是在TaskManager 资源节点中执行。Flink-Job 会被划分为一个个Task(整个任务中的一部分处理逻辑)节点, 每一个Task节点都在一个Thread中执行,在这个Thread中会不断的调用UserFunction的相应方法(如上图)。这个是一个大概的处理流程, 让用户只需要关心自身业务处理逻辑,无须关心网络通信、数据传输等流程。
接下来介绍具体的调用逻辑:
当JobMaster 向TaskManager 提交Task(整个任务中的一部分处理逻辑)时,会携带该Task的相关信息, 之后:
- org.apache.flink.runtime.taskmanager.Task
创建一个Task对象,代表了并发Task中一个subTask, 里面包含了一个Thread对象,也就是提交的任务会在该Thread中执行。
- org.apache.flink.streaming.runtime.tasks.StreamTask
在Task中会创建StreamTask对象, 在StreamTask中完成任务的初始化工作(配置、operatorchain构建、状态恢复等)之后,执行数据处理流程。
- org.apache.flink.streaming.runtime.tasks.OperatorChain
Flink优化中有一环是operator-chain, 即将满足一定规则的operator链在一起,他们之前以函数调用的方式执行,减少(网络)数据传输,OperatorChain就代表了多个Operator。
- org.apache.flink.streaming.api.operators.StreamOperator
StreamOperator 代表了具体执行的某一个Operator, 每一个UserFunction都会有对应的StreamOperator, 例如MapFunction对应 StreamMap、KeyedProcessFunction 对应KeyedProcessOperator, 也就是在这些Operator中完成对应Function的调用。
Method 是如何被调用的
我们通常定义一个Function , 实现其相关的方法,例如MapFunction 实现map方法、WindowFunction 实现apply方法、KeyedProcessFunction 实现open/processElement/onTimer 方法,如果你的Function 还实现了CheckpointedFunction/CheckpointListener接口, 那么还得实现对应的initializeState、snapshotState、notifyCheckpointComplete方法等等。这些所有的方法都由对应的Operator调用, 下面以MapFunction 对应的StreamMap 这个operator 为例理解Function的调用。
代码语言:javascript复制public class StreamMap<IN, OUT> extends AbstractUdfStreamOperator<OUT, MapFunction<IN, OUT>>
implements OneInputStreamOperator<IN, OUT> {
private static final long serialVersionUID = 1L;
public StreamMap(MapFunction<IN, OUT> mapper) {
super(mapper);
chainingStrategy = ChainingStrategy.ALWAYS;
}
@Override
public void processElement(StreamRecord<IN> element) throws Exception {
output.collect(element.replace(userFunction.map(element.getValue())));
}
}
- userFunction 即代表自定义的MapFunction , 其被processElement 方法内部调用, 而processElement 是被上面提到的StreamTask以OperatorChain 方式不断被调用。
- StreamMap 继承了抽象的AbstractUdfStreamOperator 类, 该operator 包含了所有userFunction 的方法调用。(部分代码如下)
public abstract class AbstractUdfStreamOperator<OUT, F extends Function>
extends AbstractStreamOperator<OUT> implements OutputTypeConfigurable<OUT> {
private static final long serialVersionUID = 1L;
/** The user function. */
protected final F userFunction;
@Override
public void setup(
StreamTask<?, ?> containingTask,
StreamConfig config,
Output<StreamRecord<OUT>> output) {
super.setup(containingTask, config, output);
FunctionUtils.setFunctionRuntimeContext(userFunction, getRuntimeContext());
}
@Override
public void snapshotState(StateSnapshotContext context) throws Exception {
super.snapshotState(context);
StreamingFunctionUtils.snapshotFunctionState(
context, getOperatorStateBackend(), userFunction);
}
@Override
public void initializeState(StateInitializationContext context) throws Exception {
super.initializeState(context);
StreamingFunctionUtils.restoreFunctionState(context, userFunction);
}
@Override
public void open() throws Exception {
super.open();
FunctionUtils.openFunction(userFunction, new Configuration());
}
@Override
public void close() throws Exception {
super.close();
functionsClosed = true;
FunctionUtils.closeFunction(userFunction);
}
@Override
public void dispose() throws Exception {
super.dispose();
if (!functionsClosed) {
functionsClosed = true;
FunctionUtils.closeFunction(userFunction);
}
}
// ------------------------------------------------------------------------
// checkpointing and recovery
// ------------------------------------------------------------------------
@Override
public void notifyCheckpointComplete(long checkpointId) throws Exception {
super.notifyCheckpointComplete(checkpointId);
if (userFunction instanceof CheckpointListener) {
((CheckpointListener) userFunction).notifyCheckpointComplete(checkpointId);
}
}
@Override
public void notifyCheckpointAborted(long checkpointId) throws Exception {
super.notifyCheckpointAborted(checkpointId);
if (userFunction instanceof CheckpointListener) {
((CheckpointListener) userFunction).notifyCheckpointAborted(checkpointId);
}
}
}
可以看出对于UserFunction 中method的调用核心点就在operator,每个不同的UserFunction 会对应不同的operator, 但是都会继承这个抽象的
AbstractUdfStreamOperator类, 通过这个类可以熟知其整体调用链路。
总结
本文主要以调用者的视角窥探UserFunction 的调用流程以及具体method 调用逻辑。