Flink1.4 并发执行

2019-08-07 08:32:52 浏览数 (1)

本节介绍如何在Flink中配置程序的并行执行。一个Flink程序由多个任务(transformations/operatorsdata sourcessinks)组成。一个任务被分成多个并发实例来执行,每个并发实例只处理任务输入数据的一个子集。一个任务的并发实例的个数称为并发度(parallelism)。

如果你想使用保存点,也应该考虑设置最大并发度。从保存点恢复时,可以更改特定算子或整个程序的并发度,并且此配置指定了并发的上限。

1. 设置并发度

一个任务的并发度可以在Flink中指定不同级别。

1.1 算子级别

单个算子,数据源,sink可以通过调用setParallelism()方法来定义并发度。例如,像这样:

Java版本:

代码语言:javascript复制
DataStream<String> text = [...]
DataStream<Tuple2<String, Integer>> wordCounts = text
    .flatMap(new LineSplitter())
    .keyBy(0)
    .timeWindow(Time.seconds(5))
    .sum(1).setParallelism(5);

wordCounts.print();

env.execute("Word Count Example");

Scala版本:

代码语言:javascript复制
val env = StreamExecutionEnvironment.getExecutionEnvironment

val text = [...]
val wordCounts = text
    .flatMap{ _.split(" ") map { (_, 1) } }
    .keyBy(0)
    .timeWindow(Time.seconds(5))
    .sum(1).setParallelism(5)
wordCounts.print()

env.execute("Word Count Example")
1.2 执行环境级别

如这所述,Flink程序是在执行环境的上下文中执行的。执行环境为它执行的所有算子,数据源和数据sink提供了默认的并发度。执行环境的并发度可以通过显式配置一个算子的并发度来覆盖。

执行环境的默认并发度可以通过调用setParallelism()方法来指定。要为执行的所有算子,数据源和sink设置并发度为3,请按如下方式设置执行环境的默认并发度:

Java版本:

代码语言:javascript复制
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(3);

DataStream<String> text = [...]
DataStream<Tuple2<String, Integer>> wordCounts = [...]
wordCounts.print();

env.execute("Word Count Example");

Scala版本:

代码语言:javascript复制
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(3)

val text = [...]
val wordCounts = text
    .flatMap{ _.split(" ") map { (_, 1) } }
    .keyBy(0)
    .timeWindow(Time.seconds(5))
    .sum(1)
wordCounts.print()

env.execute("Word Count Example")
1.3 客户端级别

在向Flink提交作业时,可以在客户端设置并发度。客户端可以是JavaScala程序。Flink的命令行接口(CLI)就是一种客户端。

对于CLI客户端,可以使用-p指定并发度参数。 例如:

代码语言:javascript复制
./bin/flink run -p 10 ../examples/*WordCount-java*.jar

Java/Scala程序中,并发度设置如下:

Java版本:

代码语言:javascript复制
try {
    PackagedProgram program = new PackagedProgram(file, args);
    InetSocketAddress jobManagerAddress = RemoteExecutor.getInetFromHostport("localhost:6123");
    Configuration config = new Configuration();

    Client client = new Client(jobManagerAddress, config, program.getUserCodeClassLoader());

    // set the parallelism to 10 here
    client.run(program, 10, true);

} catch (ProgramInvocationException e) {
    e.printStackTrace();
}

Scala版本:

代码语言:javascript复制
try {
    PackagedProgram program = new PackagedProgram(file, args)
    InetSocketAddress jobManagerAddress = RemoteExecutor.getInetFromHostport("localhost:6123")
    Configuration config = new Configuration()

    Client client = new Client(jobManagerAddress, new Configuration(), program.getUserCodeClassLoader())

    // set the parallelism to 10 here
    client.run(program, 10, true)

} catch {
    case e: Exception => e.printStackTrace
}
1.4 系统级别

可以通过在./conf/flink-conf.yaml中设置parallelism.default属性来为所有执行环境定义全系统默认并发度。详细信息请参阅配置文档。

2. 设置最大并发度

最大并发度可以在可以设置并发度的地方设置(客户端级别和系统级别除外)。你可以调用setMaxParallelism()取代setParallelism()方法来设置最大并发度。

最大并发度的默认设置大致为operatorParallelism (operatorParallelism / 2),下限为127,上限为32768

备注:

代码语言:javascript复制
将最大并发度设置为非常大的数值可能会对性能造成不利影响,因为一些后端状态必须保持在内部数据结构,而这些内部数据结构随key-groups(这是可扩展状态的内部实现机制)的数量进行扩展。(some state backends have to keep internal data structures that scale with the number of key-groups (which are the internal implementation mechanism for rescalable state).)

备注:

代码语言:javascript复制
Flink版本:1.4

0 人点赞