今天是 Flink 从 0 到 1 系列的第 2 篇:《WordCount及FlinkSQL》。
目标:通过每天一小会儿,熟悉 Flink 大大小小知识点。
环境版本
JDK:1.8 Flink:1.13.6 Scala:2.12 github:https://github.com/xiaozhutec/FlinkProject1.13.6.git
创建Flink 工程网上已经很多说明方法了,这里先不赘述,以下全部的代码使用 IDEA 进行编码。
本文讲解的 WordCount 程序是大数据的入门程序。
WordCount 程序是在不同上下文环境下实现的,是一个入门版本,可以跟着一步一步实现起来。包括 Streaming 和 Batch 以及 SQL 的简单案例。
上述所有的 Flink 语义都会在后面分篇章详细赘述。
基础配置
首先pom.xml 中要配置的依赖是:
provided 选项在这表示此依赖只在代码编译的时候使用,运行和打包的时候不使用。
版本依赖
代码语言:javascript复制<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.13.6</flink.version>
<scala.version>2.12</scala.version>
</properties>
java 相关依赖:
代码语言:javascript复制<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
<!-- <scope>provided</scope> -->
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.version}</artifactId>
<version>${flink.version}</version>
<!-- <scope>provided</scope>-->
</dependency>
scala 相关依赖:
代码语言:javascript复制<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<!-- <scope>provided</scope> -->
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<!-- <scope>provided</scope>-->
</dependency>
另外,pom文件中镜像文件建议配置maven仓库,国内下载速度会快,如果找不到对应的镜像文件,需要切换到国外仓库。
代码语言:javascript复制<repositories>
<repository>
<id>central</id>
<name>aliyun maven</name>
<url>http://maven.aliyun.com/nexus/content/groups/public/</url>
<layout>default</layout>
<!-- 是否开启发布版构件下载 -->
<releases>
<enabled>true</enabled>
</releases>
<!-- 是否开启快照版构件下载 -->
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
</repositories>
语言界的 hello word,大数据界的 WordCount,都是一个入门Demo。
今天咱们也按照这个入门的 Demo,把 Flink 相关代码捋顺。
包括 Streaming、Batch 以及 Flink Sql 三方面分别来实现。
Streaming WordCount
先来分析一个 Streaming WordCount。
为了模仿流式计算,咱们在本地利用 netcat
命令 nc -l {port}
来进行模仿数据产出。
同时,咱们实现的功能是:每隔 1s 计算过去 2s 内产出数据各单词的个数,也就是实现每隔1s计算过去 2s 的 WordCount 程序。
将窗口内接收到的数据进行拆分致每一行,然后分别赋值为1,之后进行分组求和。
大致处理的流程如上所示,现在来一步一步实现这个案例。
先开始创建 Flink 的运行环境:
代码语言:javascript复制StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
然后指定了数据 Source 源,以及 Source 源的一些配置:
代码语言:javascript复制String hostname = "127.0.0.1";
int port = 8899;
String delimiter = "n";
// 链接 socket 获取数据
DataStreamSource<String> text = env.socketTextStream(hostname, port, delimiter);
之后就进行了数据的平铺,分组,窗口计算等操作。
另外,程序中实现了一个内部类WordWithCount
,用来表示单词的 key 和 count。
利用 keyBy()
函数对 key
进行分组。
用window
函数表示每一个滑动窗口,SlidingProcessingTimeWindows
实现每隔 1s 对过去 2s 进行计数。
后面的教程会详细讲解 Windows 相关知识,这里仅做入门学习。
下面整体看下代码:
代码语言:javascript复制public class SocketWindowWCJava {
public static void main(String[] args) throws Exception {
// 获取流式运行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
String hostname = "127.0.0.1";
int port = 8899;
String delimiter = "n";
// 获取数据源(Socket数据源,单词以逗号分割)
DataStreamSource<String> source = env.socketTextStream(hostname, port, delimiter);
SingleOutputStreamOperator<WC> res = source.flatMap(new FlatMapFunction<String, WC>() {
@Override
public void flatMap(String value, Collector<WC> out) throws Exception {
String[] splits = value.split(",");
for (String split : splits) {
out.collect(new WC(split, 1));
}
}
}).keyBy(x -> x.word)
.window(SlidingProcessingTimeWindows.of(Time.seconds(1), Time.seconds(2))) // 每隔1秒,统计过去2秒的数据
// .sum("count");
.reduce(new ReduceFunction<WC>() {
@Override
public WC reduce(WC t1, WC t2) throws Exception {
return new WC(t1.word, t1.count t2.count);
}
});
res.print().setParallelism(1);
env.execute("SocketWindowWCJava");
}
public static class WC {
public String word;
public int count;
public WC() {}
public WC(String word, int count) {
this.word = word;
this.count = count;
}
@Override
public String toString() {
return "WC{"
"word='" word '''
", count=" count
'}';
}
}
}
现在把程序执行起来,先在本地起一个netcat
程序,然后启动Flink
程序:
$ nc -lk 8899
flink,flink,spark
hadoop,flink
之后,控制台进行了相应的打印:
用 java 实现完,接下来用 scala 也实现一下相同的逻辑,有兴趣的朋友可作参考:
代码语言:javascript复制object SocketWindowWCScala {
def main(args: Array[String]): Unit = {
// 获取运行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
val hostname = "localhost"
val port = 8899
val delimiter = 'n'
val source = env.socketTextStream(hostname, port, delimiter)
import org.apache.flink.api.scala._
// 数据格式:word,word2,word3
val res = source.flatMap(line => line.split(',')) // 将每一行按照逗号打平
.map(word => WC(word, 1))
.keyBy(x => x.word)
.window(SlidingProcessingTimeWindows.of(Time.seconds(1), Time.seconds(2)))
.reduce((v1, v2) => WC(v1.word, v1.count v2.count))
res.print("data: ").setParallelism(1)
env.execute("SocketWindowWCScala")
}
case class WC(word: String, count: Long)
}
依然是启动 flink 程序和 nc:
代码语言:javascript复制nc -lk 8888
flink,flink,spark
hadoop,flink
再看控制台的打印结果,是和咱们想实现的一致:
再次注意:窗口的使用方式在新版本中有较大的区别,这个咱们在后面会详细把这部分进行讲解。
Batch WordCount
批处理程序,这里用一个文本来作为数据源。
将文本中的数据进行拆分致每一行,然后分别赋值为1,之后进行分组求和。
处理逻辑依然如图所示,然后下面咱们也创建一个文本如图里的内容(src/main/datas/dm.csv):
代码语言:javascript复制Java,Fink
Scala
Streaming
Flink,Java
Scala
Batch,Scala
首先创建 Flink 运行环境
代码语言:javascript复制ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
之后进行读取文件
代码语言:javascript复制DataSource text = env.readTextFile(filePath);
然后通过实现 FlatMapFunction
接口进行数据的打平操作(上面类 Tokenizer 的实现)。
最后进行分组求和,Batch WordCount 全部完成!
下面看 Batch 整体代码:
代码语言:javascript复制public class WordCountJava {
public static void main(String[] args) throws Exception {
String filePath = "./datas/dm.csv";
String resultPath = "./datas/wc_rst.csv";
// 获取运行环境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSource<String> source = env.readTextFile(filePath);
AggregateOperator<Tuple2<String, Integer>> res = source.flatMap(new JGFlatMapFunction())
.groupBy(0)
.sum(1);
res.print();
res.writeAsCsv(resultPath).setParallelism(1);
env.execute("WordCountJava");
}
public static class JGFlatMapFunction implements FlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
String[] splits = value.split(",");
for (String split : splits) {
out.collect(Tuple2.of(split, 1));
}
}
}
}
程序中,通过读取./datas/dm.csv
中的数据,最后计算结果打印到控制台以及存储结果数据到./datas/wc_rst.csv
执行起来,看打印结果:
求得给定文件的 WordCount 的结果。
下面用 Scala 实现一次:
代码语言:javascript复制object WordCountScala {
def main(args: Array[String]): Unit = {
val filePath = "./datas/dm.csv"
val resultPath = "./datas/wc_rst.csv"
// 获取运行环境
val env = ExecutionEnvironment.getExecutionEnvironment
val text = env.readTextFile(filePath)
//引入隐式转换
import org.apache.flink.api.scala._
val counts = text.flatMap { _.toLowerCase.split(",") filter { _.nonEmpty } }
.map((_, 1))
.groupBy(0)
.sum(1)
counts.print()
counts.writeAsCsv(resultPath, "n", " ")
}
}
用 Scala 实现起来就很简单了。
注意:这块如果代码出错的话,试着找找导入的包是否正确。
Flink SQL WordCount
尤其是有过 MapReduce 和 Hive 经历的朋友,就可以和它们放在一起做比较,一个复杂,一个简单。
比如说下面的 SQL 语句,就一句就可以省去上面那么多的代码工作量。
代码语言:javascript复制SELECT word, COUNT(*) FROM table GROUP BY word;
下面利用 FlinkSQL 实现 WordCount 功能。
首先,pom 文件必须要添加的依赖:
代码语言:javascript复制<!-- use the Table API & SQL for defining pipelines.-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala-bridge_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- run the Table API & SQL programs locally within your IDE,-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- SQL Client-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
<!-- <scope>provided</scope>-->
</dependency>
先用 Java 来实现 FlinkSQL,将 nc 程序起来,进行按照逗号分割进行测试。
代码语言:javascript复制$ nc -lk 8899
spark,flink,spark
spark,flink,spark
...
a. 首先创建 Flink 的运行环境以及 SQL api 环境:
代码语言:javascript复制StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
b. nc 输出,将字符串转换为 (word, 1L) 的格式:
代码语言:javascript复制SingleOutputStreamOperator<WC> dataStream = env.socketTextStream("localhost", 8899)
.flatMap(new FlatMapFunction<String, WC>() {
@Override
public void flatMap(String value, Collector<WC> out) throws Exception {
String[] splits = value.split(",");
for (String split : splits) {
out.collect(new WC(split, 1L));
}
}
});
c. 注册成表,转为视图&查询
代码语言:javascript复制Table WordCountTable = tableEnv.fromDataStream(dataStream);
tableEnv.createTemporaryView("WC", WordCountTable);
Table resultTable = tableEnv.sqlQuery("SELECT word, SUM(`count`) FROM WC group by word");
d. 转为 Stream 并且打印出来
代码语言:javascript复制tableEnv.toRetractStream(resultTable, Row.class).print().setParallelism(1);
下面看整体代码:
代码语言:javascript复制public class WordCountWithSQLJava {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
SingleOutputStreamOperator<WC> dataStream = env.socketTextStream("localhost", 8899)
.flatMap(new FlatMapFunction<String, WC>() {
@Override
public void flatMap(String value, Collector<WC> out) throws Exception {
String[] splits = value.split(",");
for (String split : splits) {
out.collect(new WC(split, 1L));
}
}
});
//DataStream 转sql & 查询
Table WordCountTable = tableEnv.fromDataStream(dataStream);
tableEnv.createTemporaryView("WC", WordCountTable);
Table resultTable = tableEnv.sqlQuery("SELECT word, SUM(`count`) FROM WC group by word");
// 将结果数据转换为DataStream toRetractStream toAppendStream
tableEnv.toRetractStream(resultTable, Row.class).print().setParallelism(1);
env.execute("WCSQLJava");
}
public static class WC {
public String word;
public long count;
public WC() {}
public WC(String word, long count) {
this.word = word;
this.count = count;
}
@Override
public String toString() {
return "WC {"
"word='" word '''
", count=" count
'}';
}
}
}
整体代码执行结果:
其中, 是操作后,I 是插入,U是更新,D是删除。例如:-U是撤回前的数据, U是更新后的数据
true代表数据插入,false代表数据的撤回
Java 实现后,下面再用 Scala 来实现一次,代码逻辑一致,可以参考:
代码语言:javascript复制object WordCountSQLScala {
def main(args: Array[String]): Unit = {
// 创建运行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv = StreamTableEnvironment.create(env)
import org.apache.flink.api.scala._
// 从 nc 接入数据, 数据格式:word,word2,word3
val dataStream = env.socketTextStream("localhost", 8899, 'n')
.flatMap(line => line.split(','))
.map(word => WC(word, 1L))
// 转换为一个表(table) & 查询
val inputTable = tableEnv.fromDataStream(dataStream)
tableEnv.createTemporaryView("WC", inputTable)
val resultTable = tableEnv.sqlQuery("SELECT word, SUM(`count`) FROM WC GROUP BY word")
// toAppendStream toRetractStream
val resValue = tableEnv.toChangelogStream(resultTable)
resValue.print().setParallelism(1)
env.execute("WordCountSQLScala")
}
case class WC(word: String, count: Long)
}
代码执行的结果也一致:
总结
今天实现了大数据的经典案例 WordCount,然后在不同场景下的实现。包括 Streaming 和 Batch,以及 Flink SQL 的实现。
该篇文章还只是一个入门级的程序,后面将会各重要点进行详细阐述。