一、pom依赖
代码语言:javascript复制 1 <?xml version="1.0" encoding="UTF-8"?>
2 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
3 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
4 <modelVersion>4.0.0</modelVersion>
5
6 <groupId>com.cnblogs.yjmyzz</groupId>
7 <artifactId>spring-boot-demo</artifactId>
8 <version>0.0.1-SNAPSHOT</version>
9
10 <properties>
11 <java.version>1.8</java.version>
12 <flink.version>1.11.2</flink.version>
13 </properties>
14
15 <dependencies>
16
17 <!--lombok可选-->
18 <dependency>
19 <groupId>org.projectlombok</groupId>
20 <artifactId>lombok</artifactId>
21 <version>1.18.12</version>
22 </dependency>
23
24 <!-- spring应用最小依赖-可选 -->
25 <dependency>
26 <groupId>org.springframework</groupId>
27 <artifactId>spring-context</artifactId>
28 <version>5.2.4.RELEASE</version>
29 </dependency>
30
31 <!-- flink -->
32 <dependency>
33 <groupId>org.apache.flink</groupId>
34 <artifactId>flink-core</artifactId>
35 <version>${flink.version}</version>
36 </dependency>
37
38 <dependency>
39 <groupId>org.apache.flink</groupId>
40 <artifactId>flink-java</artifactId>
41 <version>${flink.version}</version>
42 </dependency>
43
44 <dependency>
45 <groupId>org.apache.flink</groupId>
46 <artifactId>flink-scala_2.12</artifactId>
47 <version>${flink.version}</version>
48 </dependency>
49
50 <dependency>
51 <groupId>org.apache.flink</groupId>
52 <artifactId>flink-clients_2.12</artifactId>
53 <version>${flink.version}</version>
54 </dependency>
55
56 <dependency>
57 <groupId>org.apache.flink</groupId>
58 <artifactId>flink-test-utils-junit</artifactId>
59 <version>${flink.version}</version>
60 </dependency>
61 </dependencies>
62
63 <build>
64 <plugins>
65 <plugin>
66 <artifactId>maven-compiler-plugin</artifactId>
67 <version>3.1</version>
68 <configuration>
69 <source>1.8</source>
70 <target>1.8</target>
71 </configuration>
72 </plugin>
73
74 <!-- Scala Compiler -->
75 <plugin>
76 <groupId>net.alchim31.maven</groupId>
77 <artifactId>scala-maven-plugin</artifactId>
78 <version>4.4.0</version>
79 <executions>
80 <!-- Run scala compiler in the process-resources phase, so that dependencies on
81 scala classes can be resolved later in the (Java) compile phase -->
82 <execution>
83 <id>scala-compile-first</id>
84 <phase>process-resources</phase>
85 <goals>
86 <goal>compile</goal>
87 </goals>
88 </execution>
89 </executions>
90 <configuration>
91 <jvmArgs>
92 <jvmArg>-Xms128m</jvmArg>
93 <jvmArg>-Xmx512m</jvmArg>
94 </jvmArgs>
95 </configuration>
96 </plugin>
97
98 </plugins>
99 </build>
100
101 </project>
二、WordCount(批处理版本)
代码语言:javascript复制 1 import org.apache.flink.api.common.functions.FlatMapFunction;
2 import org.apache.flink.api.java.DataSet;
3 import org.apache.flink.api.java.ExecutionEnvironment;
4 import org.apache.flink.api.java.tuple.Tuple2;
5 import org.apache.flink.util.Collector;
6
7 /**
8 * WordCount批处理版本
9 */
10 public class BatchWordCount {
11
12 public static void main(String[] args) throws Exception {
13
14 // 1 设置环境
15 final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
16
17 // 2. 定义数据
18 String wordFilePath = "/Users/jimmy/Downloads/word.txt";
19 DataSet<String> text = env.readTextFile(wordFilePath);
20
21 // 3. 处理逻辑
22 DataSet<Tuple2<String, Integer>> counts =
23
24 text.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
25 @Override
26 public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
27 //将每行按word拆分
28 String[] tokens = value.toLowerCase().split("\W ");
29
30 //收集(类似:map-reduce思路)
31 for (String token : tokens) {
32 if (token.length() > 0) {
33 out.collect(new Tuple2<>(token, 1));
34 }
35 }
36 }
37 })
38 //按Tuple2里的第0项,即:word分组
39 .groupBy(0)
40 //然后对Tuple2里的第1项求合
41 .sum(1);
42
43 // 4. 打印结果
44 counts.print();
45
46 }
47 }
注:数据文件/Users/jimmy/Downloads/word.txt的位置,大家可根据实际情况调整,该文件的内容类似:
代码语言:javascript复制hello java
hello flink
三、WordCount(流式处理版本)
代码语言:javascript复制 1 import org.apache.flink.api.common.functions.FlatMapFunction;
2 import org.apache.flink.api.java.tuple.Tuple2;
3 import org.apache.flink.streaming.api.datastream.DataStream;
4 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
5 import org.apache.flink.util.Collector;
6
7 public class StreamWordCount {
8
9 public static void main(String[] args) throws Exception {
10 // 1 设置环境
11 final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
12
13 // 2. 定义数据
14 DataStream<String> text = env.socketTextStream("127.0.0.1", 9999);
15
16 // 3. 处理逻辑
17 DataStream<Tuple2<String, Integer>> counts =
18
19 text.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
20 @Override
21 public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
22 //将每行按word拆分
23 String[] tokens = value.toLowerCase().split("\b ");
24
25 //收集(类似:map-reduce思路)
26 for (String token : tokens) {
27 if (token.trim().length() > 0) {
28 out.collect(new Tuple2<>(token.trim(), 1));
29 }
30 }
31 }
32 })
33 //按Tuple2里的第0项,即:word分组
34 .keyBy(value -> value.f0)
35 //然后对Tuple2里的第1项求合
36 .sum(1);
37
38 // 4. 打印结果
39 counts.print();
40
41 // execute program
42 env.execute("Streaming WordCount");
43
44 }
45 }
注:运行前,先在终端命令行输入nc -l 9999,开启一个tcp server作为流式处理的数据源,然后再运行上面的代码,然后命令行输入一些文本,即可看到输出