因为网络上很多资料都过时了,有的是版本太老了,本文针对最新版本的1.13.2快速构建一个WordCount程序
项目介绍
本文创建一个可以从网络上读取输入,然后每5秒钟输出每个单词个数的项目
创建maven项目
代码语言:javascript复制mvn archetype:generate
-DarchetypeGroupId=org.apache.flink
-DarchetypeArtifactId=flink-quickstart-java
-DarchetypeVersion=1.13.2
-DgroupId=mflink
-DartifactId=mflink
-Dversion=0.1
-Dpackage=myflink
-DinteractiveMode=false
用IDE打开这个项目,里面已经创建了两个类StreamingJob和BatchJob,本文使用StreamingJob来完成一个实时统计单词的任务
可以修改后面一些自定义的参数
编写逻辑
- 创建
StreamExecutionEnvironment
: 这是一个入口类,可以用来设置参数和创建数据源以及提交任务
// set up the streaming execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- 从socket读取数据: 从本地端口号 9000 的 socket 中读取数据的数据源
DataStream<String> text = env.socketTextStream("localhost", 9000, "n");
这创建了一个字符串类型的 DataStream。DataStream 是 Flink 中做流处理的核心 API,上面定义了非常多常见的操作(如,过滤、转换、聚合、窗口、关联等)。
- 拆分单词: 将字符串数据解析成单词和次数(使用Tuple2<String, Integer>表示)(类似于MapReduce中的Map)
DataStream<Tuple2<String, Integer>> wordCounts = text
.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
for (String word : value.split("\s")) {
out.collect(Tuple2.of(word, 1));
}
}
});
- 统计单词个数(类似于MapReduct中的Reduce)
DataStream<Tuple2<String, Integer>> windowCounts = wordCounts
.keyBy((KeySelector<Tuple2<String, Integer>, Object>) tuple -> tuple.f0)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.sum(1);
- 输出到标准输出
windowCounts.print().setParallelism(1);
- 开始执行
// execute program
env.execute("Socket Window WordCount");
最后的 env.execute
调用是启动实际Flink作业所必需的。所有算子操作(例如创建源、聚合、打印)只是构建了内部算子操作的图形。只有在execute()
被调用时才会在提交到集群上或本地计算机上执行。
完整代码
代码语言:javascript复制public class StreamingJob {
public static void main(String[] args) throws Exception {
// set up the streaming execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 通过连接 socket 获取输入数据,这里连接到本地9000端口,如果9000端口已被占用,请换一个端口
DataStream<String> text = env.socketTextStream("localhost", 9000, "n");
// 解析数据,按 word 分组,开窗,聚合
DataStream<Tuple2<String, Integer>> wordCounts = text
.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
for (String word : value.split("\s")) {
out.collect(Tuple2.of(word, 1));
}
}
});
DataStream<Tuple2<String, Integer>> windowCounts = wordCounts
.keyBy((KeySelector<Tuple2<String, Integer>, Object>) tuple -> tuple.f0)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.sum(1);
// 将结果打印到控制台,注意这里使用的是单线程打印,而非多线程
windowCounts.print().setParallelism(1);
// execute program
env.execute("Socket Window WordCount");
}
}
运行程序
使用netcat往端口输入
代码语言:javascript复制nc -lk 9000
启动StreamingJob统计
直接在IDE中启动就可以了
常见错误
代码语言:javascript复制java.lang.ClassNotFoundException: org.apache.flink.api.common.functions.FlatMapFunction
解决方法: 把pom.xml文件中的<scope>provided</scope>
注释掉
Is the time characteristic set to 'ProcessingTime', or did you forget to call 'DataStream.assignTimestampsAndWatermarks(...)'?
解决方案: .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
参考
- 5分钟从零构建第一个 Flink 应用