背景
Flink 任务是一个DAG图,由多个节点(Operator)组成,部分上下游的节点在运行时可以合成为一个节点,称为算子链Chain。Chain后的节点,总CPU为所有节点CPU的最大值,总内存为所有节点内存的总和。多节点合成一个节点可以有效的减少网络传输,降低成本。但如一个任务DAG过大,需根据实时情况对算子链Chain进行拆解操作。接下来对算子链三种策略进行说明、策略对应的使用方法、哪些算子可进行操作和在何处应用并举例讲解。
算子链Operators Chain优化
ChainingStrategy算子链策略
ChainingStrategy 是用来定义算子链接的策略。当一个算子和上游算子链接在一起,这意味着它们会运行在同一个线程。它们会合并为一个有多个运行步骤的算子。Operators Chain 是通过ChainingStrategy 算子链路策略枚举类设置的。源码如下:
代码语言:javascript复制public enum ChainingStrategy {
ALWAYS,
NEVER,
HEAD
}
ChainingStrategy支持以下三种方式:
- ALWAYS 算子会尽可能的链接在一起。为了优化性能,通常需要让算子尽可能链接在一起,同时增加并发度。
- NEVER 算子将不会和上下游的算子链接在一起。
- HEAD 算子将不会和上游的算子链接在一起,但是会和下游的算子链接在一起。
在Flink程序中,各算子Operators,如Filter、FlatMap、Map、Project、Sink、Source和Window等都是打开的即取值ALWAYS算子会尽可能的链接在一起。也可通过执行环境变量进行全局关闭,但不一般建议这样,如:
代码语言:javascript复制StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.disableOperatorChaining();//设置为全局不可用
ChainingStrategy方式应用
算子链策略默认是全局开启即取值ALWAYS,但也可在单算子或多算子上应用其开启、关闭或局部开启。是通过两种方法disableChaining和startNewChain来设置的,singleOutputOperator的两种方法实现(其他算子也是如此)源码如下:
代码语言:javascript复制@PublicEvolving
public SingleOutputStreamOperator<T> disableChaining() {
return setChainingStrategy(ChainingStrategy.NEVER);
}
@PublicEvolving
public SingleOutputStreamOperator<T> startNewChain() {
return setChainingStrategy(ChainingStrategy.HEAD);
}
举例说明:
定义一个有限流dataStream,此流上应用三个map简单操作,从应用了startNewChain方法开始后两个map运行在同一线程。
代码语言:javascript复制public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//env.disableOperatorChaining();//设置为全局不可用
env.setParallelism(4);
DataStream<String> dataStream = env.fromCollection(Arrays.asList("A","B","C","D","E","F","G","H","I","J","K","L","M","N"));
DataStream<Tuple2<String,Integer>> dataStreamKV = dataStream.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String value) throws Exception {
return Tuple2.of(value,1);
}
}).startNewChain().map(new MapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(Tuple2<String, Integer> value) throws Exception {
return Tuple2.of(value.f0,value.f1*100);
}
}).map(new MapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(Tuple2<String, Integer> value) throws Exception {
return Tuple2.of(value.f0,value.f1-10);
}
});
dataStreamKV.print();
env.execute();
}
disableChaining对应的是ChainingStrategy.NEVER 算子将不会和上下游的算子链接在一起。方法应用同样如此,笔者不再赘述。
其他优化
在Flink程序运行过程中,并行度取决于每个TaskManager上的slot数量而决定的。slot插槽是指TaskManagere的并发执行能力,通常来说TaskManager有多少核CPU也就会有多少个slot。slot插槽可共享相同的JVM资源,同时对Flink提供维护的心跳等信息。详细可以参考Flink优化器与源码解析系列--内存模型详解里面slot与source资源相关内容。
总结
Operator Chain策略也是程序优化的一种方式。在Flink程序中,是全局默认开启的。多节点合成一个节点可以有效的减少网络传输,降低成本。实时情况对算子链Chain进行拆解操作,灵活运用。也需要配置其他参数进行优化如并行度等等。