StreamOperator源码简析

2022-04-18 13:03:57 浏览数 (1)

StreamOperator是任务执行过程中实际处理类,上层由StreamTask调用,下层调用UserFunction,列举一些常见的StreamOperator

  • env.addSource对应StreamSource
  • dataStream.map 对应StreamMap
  • dataStrem.window对应WindowOperator
  • dataStream.addSink对应StreamSink
  • dataStream.keyBy(..).process对应KeyedProcessOperator

StreamOperator涉及数据处理、checkpoint、状态存储、定时调用等,本篇幅将从源码角度分析StreamOperator所涉及的核心调用流程。

StreamOperator层级结构

最顶层是一个StreamOperator的接口,定义了其生命周期一些方法,继承接口如下:

  • CheckpointListener接口,notifyCheckpointComplete表示checkpoint完成后的回调方法
  • KeyContext接口,用于当前key的切换,使用在KeyedStream中state的key设置
  • Disposable接口,dispose方法定义了资源释放
  • Serializable序列化接口

AbstractStreamOperator是StreamOperator的基础抽象实现类,所有的operator都必须继承该抽象类; AbstractUdfStreamOperator 是继承AbstractStreamOperator的抽象实现类,其内部包含了userFunction, 在Task的生命周期都会调用userFunction中对应的方法; OneInputStreamOperator、TwoInputStreamOperator都是继承StreamOperator的接口,分别表示处理一个输入、两个输入的Operator,包含了processElement/processWatermark/processLatencyMarker方法;

  • OneInputStreamOperator实现类StreamMap、WindowOperator、KeyedProcessOperator等单流入处理operator
  • TwoInputStreamOperator实现类CoStreamMap、KeyedCoProcessOperator、IntervalJoinOperator等多流处理operator

StreamSource表示的source端的operator,其既没有实现OneInputStreamOperator接口也没有实现TwoInputStreamOperator接口,由于其为流处理的源头,不需要接受输入

AbstractStreamOperator/AbstractUdfStreamOperator分析

AbstractStreamOperator是所有operator的基础抽象类,而AbstractUdfStreamOperator则是面向userFunction调用,接下来分析一下这两个类,其大部分方法都是由StreamTask触发调用,用于初始化或者资源释放等操作,以StreamTask.invoke方法为入口来分析里面的方法:

  • initializeState状态初始化,会调用到StreamOperator的initializeState方法,初始化operatorStateBackend/keyedStateBackend状态后端,定时器恢复初始化,对于KeyedState来说会自动初始化恢复,但是operatorState则需要手动初始化恢复,所以在其继承的AbstractUdfStreamOperator会调用userFunction的initializeState方法,前提是该userFunction需要实现CheckpointedFunction接口;
  • open初始化方法,在AbstractStreamOperator中是一个空的实现,通常可以在userFunction重写open方法完成一些用户初始化工作,例如创建资源链接
  • run方法,在任务正常情况下一直执行的方法,根据收到的不同数据类型调用AbstractStreamOperator不同的方法
    1. 如果是watermark,会调用其processWatermark方法,在该方法里面做一些定时触发的判断与调用
    2. 如果是LatencyMarker,其表示的是一个延时标记,同于统计数据从source到下游operator耗时,会调用 processLatencyMarker方法,在该方法里面会上报Histogram类型的metric, 在默认情况下该功能是关闭的
    3. 如果是StreamRecord,也就是处理的业务数据,首先会调用setKeyContextElement方法,用于切换 KeyedStream类型的的statebackend的当前key, 然后调用processElement具体的数据处理流程
    4. 如果是CheckpointBarrier,表示的是需要checkpoint,首先会调用prepareSnapshotPreBarrier方法,在AbstractStreamOperator中是一个空实现doNothing,然后调用snapshotState方法,在AbstractUdfStreamOperator会调用userFunction的snapshotState方法,前提是该userFunction需要实现CheckpointedFunction接口;
  • close方法,任务正常结束调用方法,在AbstractStreamOperator中是一个空的实现,通常可以在userFunction重写close方法完成一些资源释放;
  • dispose方法,任务正常结束或者异常结束调用的方法,如果是异常结束那么就会调用到close方法,正常结束不会重复调用,在dispose里面完成一些状态最终资源的释放;

其他方法:

  • setup方法,初始化做一些参数配置
  • notifyCheckpointComplete方法,在checkpoint完成时调用的方法,面向用户实现的userFunction需要实现CheckpointListener接口

0 人点赞