Flink中: 你的Function是如何被执行的

2022-11-21 20:21:16 浏览数 (1)

在Flink编程中,不管你是使用DataStream api还是 Table/SQL ,接触最多的就是UserFunction , 比喻说MapFunction、ScalarFunction, 在这些Function 里面可以自定义用户的业务处理逻辑,但是这些Function是如何被调用的呢?本文主要介绍Function 被调用的流程以及对应的方法如何被调用的。

核心调用逻辑

当我们编写完成一个Flink-Job 就会将代码打包成为jar提交到集群中去,当整个资源申请、任务调度完成之后就开始执行这个job,从source到transform 到最后sink 都是在TaskManager 资源节点中执行。Flink-Job 会被划分为一个个Task(整个任务中的一部分处理逻辑)节点, 每一个Task节点都在一个Thread中执行,在这个Thread中会不断的调用UserFunction的相应方法(如上图)。这个是一个大概的处理流程, 让用户只需要关心自身业务处理逻辑,无须关心网络通信、数据传输等流程。

接下来介绍具体的调用逻辑:

当JobMaster 向TaskManager 提交Task(整个任务中的一部分处理逻辑)时,会携带该Task的相关信息, 之后:

  1. org.apache.flink.runtime.taskmanager.Task

创建一个Task对象,代表了并发Task中一个subTask, 里面包含了一个Thread对象,也就是提交的任务会在该Thread中执行。

  1. org.apache.flink.streaming.runtime.tasks.StreamTask

在Task中会创建StreamTask对象, 在StreamTask中完成任务的初始化工作(配置、operatorchain构建、状态恢复等)之后,执行数据处理流程。

  1. org.apache.flink.streaming.runtime.tasks.OperatorChain

Flink优化中有一环是operator-chain, 即将满足一定规则的operator链在一起,他们之前以函数调用的方式执行,减少(网络)数据传输,OperatorChain就代表了多个Operator。

  1. org.apache.flink.streaming.api.operators.StreamOperator

StreamOperator 代表了具体执行的某一个Operator, 每一个UserFunction都会有对应的StreamOperator, 例如MapFunction对应 StreamMap、KeyedProcessFunction 对应KeyedProcessOperator, 也就是在这些Operator中完成对应Function的调用。

Method 是如何被调用的

我们通常定义一个Function , 实现其相关的方法,例如MapFunction 实现map方法、WindowFunction 实现apply方法、KeyedProcessFunction 实现open/processElement/onTimer 方法,如果你的Function 还实现了CheckpointedFunction/CheckpointListener接口, 那么还得实现对应的initializeState、snapshotState、notifyCheckpointComplete方法等等。这些所有的方法都由对应的Operator调用, 下面以MapFunction 对应的StreamMap 这个operator 为例理解Function的调用。

代码语言:javascript复制
public class StreamMap<IN, OUT> extends AbstractUdfStreamOperator<OUT, MapFunction<IN, OUT>>
        implements OneInputStreamOperator<IN, OUT> {

    private static final long serialVersionUID = 1L;

    public StreamMap(MapFunction<IN, OUT> mapper) {
        super(mapper);
        chainingStrategy = ChainingStrategy.ALWAYS;
    }

    @Override
    public void processElement(StreamRecord<IN> element) throws Exception {
        output.collect(element.replace(userFunction.map(element.getValue())));
    }
}
  • userFunction 即代表自定义的MapFunction , 其被processElement 方法内部调用, 而processElement 是被上面提到的StreamTask以OperatorChain 方式不断被调用。
  • StreamMap 继承了抽象的AbstractUdfStreamOperator 类, 该operator 包含了所有userFunction 的方法调用。(部分代码如下)
代码语言:javascript复制
public abstract class AbstractUdfStreamOperator<OUT, F extends Function>
extends AbstractStreamOperator<OUT> implements OutputTypeConfigurable<OUT> {

private static final long serialVersionUID = 1L;

/** The user function. */
protected final F userFunction;


@Override
public void setup(
StreamTask<?, ?> containingTask,
StreamConfig config,
Output<StreamRecord<OUT>> output) {
super.setup(containingTask, config, output);
FunctionUtils.setFunctionRuntimeContext(userFunction, getRuntimeContext());
}

@Override
public void snapshotState(StateSnapshotContext context) throws Exception {
super.snapshotState(context);
StreamingFunctionUtils.snapshotFunctionState(
context, getOperatorStateBackend(), userFunction);
}

@Override
public void initializeState(StateInitializationContext context) throws Exception {
super.initializeState(context);
StreamingFunctionUtils.restoreFunctionState(context, userFunction);
}

@Override
public void open() throws Exception {
super.open();
FunctionUtils.openFunction(userFunction, new Configuration());
}

@Override
public void close() throws Exception {
super.close();
functionsClosed = true;
FunctionUtils.closeFunction(userFunction);
}

@Override
public void dispose() throws Exception {
super.dispose();
if (!functionsClosed) {
functionsClosed = true;
FunctionUtils.closeFunction(userFunction);
}
}

// ------------------------------------------------------------------------
//  checkpointing and recovery
// ------------------------------------------------------------------------

@Override
public void notifyCheckpointComplete(long checkpointId) throws Exception {
super.notifyCheckpointComplete(checkpointId);

if (userFunction instanceof CheckpointListener) {
((CheckpointListener) userFunction).notifyCheckpointComplete(checkpointId);
}
}

@Override
public void notifyCheckpointAborted(long checkpointId) throws Exception {
super.notifyCheckpointAborted(checkpointId);

if (userFunction instanceof CheckpointListener) {
((CheckpointListener) userFunction).notifyCheckpointAborted(checkpointId);
}
}
}

可以看出对于UserFunction 中method的调用核心点就在operator,每个不同的UserFunction 会对应不同的operator, 但是都会继承这个抽象的

AbstractUdfStreamOperator类, 通过这个类可以熟知其整体调用链路。

总结

本文主要以调用者的视角窥探UserFunction 的调用流程以及具体method 调用逻辑。

0 人点赞