在使用Apache Flink对数据进行处理时候,通常需要设置并行度。并行度是Apache Flink中一个非常重要的概念。设置合理的并行度能够加快数据的处理效率,不合理的并行度会造成效率降低甚至是任务出错。 Apache Flink程序包含多个任务(source,transformations/operators,sink)。这些任务使用几个并行实例所进行执行,这些并行的实例称之为并行度。
如何设置并行度
Apache Flink支持在不同的级别设置并行度。配置文件、env级别、算子级别。
- 配置文件默认 在我们提交一个Job的时候如果没有考虑并行度的话,那么Flink会使用默认配置文件中的并行度。我们可以通过命令查看Flink配置文件的并行度。
$ cat flink-conf.yaml |grep "parallelism.default"
parallelism.default: 1
例如当前获取到的并行度为1。也就是说当你不设置并行度的时候它就会使用配置文件默认的并行度 1。 2. env级别 env的级别就是Environment级别。也就是通过Execution Environment来设置整体的Job并行度。
代码语言:javascript复制val env = Stream...
env.setParallelism(5)
- 客户端级别 如果在执行Job时候,发现代码中没有设置并行度而又不修改配置文件的话,可以通过Client来设置Job的并行度。
./bin/flink run -p 5 ../wordCount-java*.jar
-p即设置WordCount的Job并行度为5。4. 算子级别 我们在编写Flink项目时,可能对于不同的Operator设置不同的并行度,例如为了实现读取Kafka的最高效读取需要参考Kafka的partition的数量对并行度进行设置,在Sink时需要对于Sink的介质设置不同的并行度。这样就会存在一个Job需要有多个并行度。这样就需要用到算子级别的并行度设置
代码语言:javascript复制val env = Stream...
val text = ...
text.keyBy(XXX)
.flatMap(XXX).setParallelism(5) //计算时设置为5
.addSink(XXXXX).setParallelism(1) //写入数据库时候设置为1
- 并行度的高级别会覆盖低级别的配置。例如在算子中设置的策略会覆盖配置文件中的parallelism。
从优先级上来看: 算子级别 > env级别 > Client级别 > 系统默认级别
在实际的使用中,我们需要设置合理的并行度来保证数据的高效处理,在一般情况下例如source,Sink等可能会需要不同的并行度来保证数据的快速读取与写入负载等。
并行度设置的数量
Apache Flink的并行度设置并不是说越大越好、数据处理的效率就越高。而是需要设置合理的并行度。那么何谓合理呢? Apache Flink的 并行度取决于每个TaskManager上的slot数量而决定的。Flink的JobManager把任务分成子任务提交给slot进行执行。相同的slot共享相同的JVM资源,同时对Flink提供维护的心跳等信息。 slot是指TaskManagere的并发执行能力,通常来说TaskManager有多少核CPU也就会有多少个slot。这样来看,我们设置的并行度其实是与TaskManager所有Slot数量有关的。