Flink中如何解析与传递参数

2020-11-03 14:32:14 浏览数 (1)

几乎所有的 Flink 应用程序(包括批处理与流处理程序)都需要依赖外部配置参数。例如,可以用来指定输入和输出源(如路径或者地址),系统参数(并发数,运行时配置)以及应用程序特定参数(通常用在自定义函数中)。

从 0.9 版本开始,Flink 提供了一个叫 ParameterTool 的简单程序,提供一些基础的工具来解决上述问题,当然你也可以不用这里描述的 ParameterTool,你可以使用其他框架,例如,Commons CLI、argparse4j 在 Flink 中也是支持的。

1. 解析参数

下面我们看一下如何获取配置并导入 ParameterTool 中。ParameterTool 提供了一系列预定义的静态方法来读取配置信息,ParameterTool 内部存储是一个 Map 数据结构,因此很容易与你自己的配置相集成。

1.1 读取.properties文件参数

下面方法将去读取一个本地 Properties 文件,并返回 key/value 键值对:

代码语言:javascript复制
String propertiesFile = "/Users/wy/study/dev.properties";
ParameterTool parameter = ParameterTool.fromPropertiesFile(propertiesFile);
1.2 读取命令行参数

下面会从命令行中获取像 –input hdfs:///mydata –elements 42 这种形式的参数:

代码语言:javascript复制
public static void main(String[] args) {
    ParameterTool parameter = ParameterTool.fromArgs(args);
}
1.3 从系统属性中获取参数

当启动一个 JVM 时,你可以将系统属性传递给它:-Dinput=hdfs:///mydata,你还可以用这些系统属性来初始化 ParameterTool:

代码语言:javascript复制
ParameterTool parameter = ParameterTool.fromSystemProperties();
1.4 使用参数

我们已经将参数放在了 ParameterTool 对象中,那现在我们如何从 ParameterTool 对象中获取参数呢?ParameterTool 提供了一些内置的方法以供我们访问:

代码语言:javascript复制
ParameterTool parameters = // ...
parameter.getRequired("input");
parameter.get("output", "myDefaultValue");
parameter.getLong("expectedCount", -1L);
parameter.getNumberOfParameters()

我们可以直接在 main() 方法中使用这些方法的返回值。例如,我们可以像如下设置算子的并行度:

代码语言:javascript复制
ParameterTool parameters = ParameterTool.fromArgs(args);
int parallelism = parameters.get("mapParallelism", 2);
DataSet<Tuple2<String, Integer>> counts = text.flatMap(new Tokenizer()).setParallelism(parallelism);

2. 传递参数

在数据处理的过程中,往往需要给函数传递一些参数,那下面看看有哪些方法可以进行参数的传递?

2.1 使用Configuration

我们可以通过 Configuration 对象为函数传递参数。如下显示将 Configuration 作为配置对象传递给用户定义的函数:

代码语言:javascript复制
ParameterTool parameters = ParameterTool.fromArgs(args);
Configuration conf = parameters.getConfiguration();
DataSet<Tuple2<String, Integer>> counts = text
        .flatMap(new Tokenizer())
        .withParameters(conf);

在 Tokenizer 中,我们可以通过 open(Configuration conf) 方法访问传递过来的参数:

代码语言:javascript复制
public static final class Tokenizer extends RichFlatMapFunction<String, Tuple2<String, Integer>> {
    @Override
    public void open(Configuration conf) throws Exception {
      // 获取参数
	    conf.getInteger('xxx', -1);
    }
}

如果我们不需要使用 ParameterTool 来解析参数,我们可以直接定义 Configuration:

代码语言:javascript复制
Configuration conf = new Configuration();
conf.setString("xxx", "xxx");
DataSet<Tuple2<String, Integer>> counts = text
        .flatMap(new Tokenizer())
        .withParameters(conf);
2.2 注册全局参数

除了上述方法之外,我们还可以在 ExecutionConfig 中将参数注册为全局作业参数,可以在 JobManager 的 WEB 界面或者用户自定义函数中访问配置值。如下代码所示注册全局参数:

代码语言:javascript复制
ParameterTool parameters = ParameterTool.fromArgs(args);
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setGlobalJobParameters(parameters);

可以在任意 Rich 用户函数中访问参数:

代码语言:javascript复制
public static final class Tokenizer extends RichFlatMapFunction<String, Tuple2<String, Integer>> {
    @Override
    public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
      ParameterTool parameters = (ParameterTool) getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
	    parameters.getRequired('xxx');
    }
}

0 人点赞