StreamOperator是任务执行过程中实际处理类,上层由StreamTask调用,下层调用UserFunction,列举一些常见的StreamOperator
- env.addSource对应StreamSource
- dataStream.map 对应StreamMap
- dataStrem.window对应WindowOperator
- dataStream.addSink对应StreamSink
- dataStream.keyBy(..).process对应KeyedProcessOperator
StreamOperator涉及数据处理、checkpoint、状态存储、定时调用等,本篇幅将从源码角度分析StreamOperator所涉及的核心调用流程。
StreamOperator层级结构
最顶层是一个StreamOperator的接口,定义了其生命周期一些方法,继承接口如下:
- CheckpointListener接口,notifyCheckpointComplete表示checkpoint完成后的回调方法
- KeyContext接口,用于当前key的切换,使用在KeyedStream中state的key设置
- Disposable接口,dispose方法定义了资源释放
- Serializable序列化接口
AbstractStreamOperator是StreamOperator的基础抽象实现类,所有的operator都必须继承该抽象类; AbstractUdfStreamOperator 是继承AbstractStreamOperator的抽象实现类,其内部包含了userFunction, 在Task的生命周期都会调用userFunction中对应的方法; OneInputStreamOperator、TwoInputStreamOperator都是继承StreamOperator的接口,分别表示处理一个输入、两个输入的Operator,包含了processElement/processWatermark/processLatencyMarker方法;
- OneInputStreamOperator实现类StreamMap、WindowOperator、KeyedProcessOperator等单流入处理operator
- TwoInputStreamOperator实现类CoStreamMap、KeyedCoProcessOperator、IntervalJoinOperator等多流处理operator
StreamSource表示的source端的operator,其既没有实现OneInputStreamOperator接口也没有实现TwoInputStreamOperator接口,由于其为流处理的源头,不需要接受输入
AbstractStreamOperator/AbstractUdfStreamOperator分析
AbstractStreamOperator是所有operator的基础抽象类,而AbstractUdfStreamOperator则是面向userFunction调用,接下来分析一下这两个类,其大部分方法都是由StreamTask触发调用,用于初始化或者资源释放等操作,以StreamTask.invoke方法为入口来分析里面的方法:
- initializeState状态初始化,会调用到StreamOperator的initializeState方法,初始化operatorStateBackend/keyedStateBackend状态后端,定时器恢复初始化,对于KeyedState来说会自动初始化恢复,但是operatorState则需要手动初始化恢复,所以在其继承的AbstractUdfStreamOperator会调用userFunction的initializeState方法,前提是该userFunction需要实现CheckpointedFunction接口;
- open初始化方法,在AbstractStreamOperator中是一个空的实现,通常可以在userFunction重写open方法完成一些用户初始化工作,例如创建资源链接
- run方法,在任务正常情况下一直执行的方法,根据收到的不同数据类型调用AbstractStreamOperator不同的方法
- 如果是watermark,会调用其processWatermark方法,在该方法里面做一些定时触发的判断与调用
- 如果是LatencyMarker,其表示的是一个延时标记,同于统计数据从source到下游operator耗时,会调用 processLatencyMarker方法,在该方法里面会上报Histogram类型的metric, 在默认情况下该功能是关闭的
- 如果是StreamRecord,也就是处理的业务数据,首先会调用setKeyContextElement方法,用于切换 KeyedStream类型的的statebackend的当前key, 然后调用processElement具体的数据处理流程
- 如果是CheckpointBarrier,表示的是需要checkpoint,首先会调用prepareSnapshotPreBarrier方法,在AbstractStreamOperator中是一个空实现doNothing,然后调用snapshotState方法,在AbstractUdfStreamOperator会调用userFunction的snapshotState方法,前提是该userFunction需要实现CheckpointedFunction接口;
- close方法,任务正常结束调用方法,在AbstractStreamOperator中是一个空的实现,通常可以在userFunction重写close方法完成一些资源释放;
- dispose方法,任务正常结束或者异常结束调用的方法,如果是异常结束那么就会调用到close方法,正常结束不会重复调用,在dispose里面完成一些状态最终资源的释放;
其他方法:
- setup方法,初始化做一些参数配置
- notifyCheckpointComplete方法,在checkpoint完成时调用的方法,面向用户实现的userFunction需要实现CheckpointListener接口