1. maven依赖
代码语言:javascript复制<properties>
<!-- flink版本好 -->
<flink.version>1.8.1</flink.version>
<!-- scala主版本号 -->
<scala.binary.version>2.11</scala.binary.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
</dependencies>
2. Flink WordCount Java版
代码语言:javascript复制package com.bairong.flink.java;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
/**
* author: YangYunhe
* date: 2019/7/22 19:32
* description: 每隔1s对过去2s的数据进行WordCount
*/
public class SocketWindowWordCountJava {
public static class WordWithCount {
public String word;
public long count;
public WordWithCount() {}
public WordWithCount(String word, long count) {
this.word = word;
this.count = count;
}
@Override
public String toString() {
return word ": " count;
}
}
public static void main(String[] args) throws Exception {
// 1. 解析外部参数,获取要监听的主机、端口,没有配置则取默认值localhost:9999
ParameterTool tool = ParameterTool.fromArgs(args);
String host = tool.get("host", "localhost");
int port = tool.getInt("port", 9999);
// 2. 获取Flink的运行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 3. 初始化数据
DataStreamSource<String> source = env.socketTextStream(host, port);
// 4. 计算,扁平化,每个单次计数为1,分组,累加次数
SingleOutputStreamOperator<WordWithCount> counts = source.flatMap(new FlatMapFunction<String, WordWithCount>() {
@Override
public void flatMap(String line, Collector<WordWithCount> collector) throws Exception {
String[] words = line.split("\s ");
for(String word : words) {
collector.collect(new WordWithCount(word, 1L));
}
}
}).keyBy("word")
.timeWindow(Time.seconds(2), Time.seconds(1))
.reduce(new ReduceFunction<WordWithCount>() {
@Override
public WordWithCount reduce(WordWithCount word1, WordWithCount word2) throws Exception {
return new WordWithCount(word1.word, word1.count word2.count);
}
});
/*
* 如果只是简单的相加,可以直接使用sum()方法
* .keyBy("word").timeWindow(Time.seconds(2), Time.seconds(1)).sum("count")
*/
// 5. 打印结果,设置并行度
counts.print().setParallelism(1);
// 6. 开启流任务,这是一个action算子,将触发计算
env.execute("SocketWindowWordCountJava");
}
}
3. 运行代码
(1) 设置参数,指定host和port
(2) 在服务器上执行 nc 命令
(3) 运行程序
(4) 在服务器上输出一些单词,在程序控制台查看结果
Flink WordCount 程序Java版就完成咯。
4. Flink WordCount Scala版
代码语言:javascript复制package com.bairong.flink.scala
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
/**
* author: YangYunhe
* date: 2019/7/22 21:14
* description: Flink WordCount Scala版
*/
object SocketWindowWordCountScala {
def main(args: Array[String]): Unit = {
val tool = ParameterTool.fromArgs(args)
val host = tool.get("host", "localhost")
val port = tool.getInt("port", 9999)
val env = StreamExecutionEnvironment.getExecutionEnvironment
val source: DataStream[String] = env.socketTextStream(host, port)
source.flatMap(_.split("\s "))
.map((_, 1))
.keyBy(0)
.timeWindow(Time.seconds(2), Time.seconds(1))
.sum(1)
.print()
.setParallelism(1)
env.execute("SocketWindowWordCountScala")
}
}
测试过程同上。
有一个注意点是,scala API的类要全部导入:
代码语言:javascript复制import org.apache.flink.streaming.api.scala._
否则代码编译会报错: