Flink 的三种WordCount(文末领取Flink书籍)

2022-09-21 21:09:11 浏览数 (1)

Hi,大家好,我是 Johngo 呀!

今天是 Flink 从 0 到 1 系列的第 2 篇:《WordCount及FlinkSQL》。

目标:通过每天一小会儿,熟悉 Flink 大大小小知识点。

环境版本

JDK:1.8 Flink:1.13.6 Scala:2.12 github:https://github.com/xiaozhutec/FlinkProject1.13.6.git

创建Flink 工程网上已经很多说明方法了,这里先不赘述,以下全部的代码使用 IDEA 进行编码。

本文讲解的 WordCount 程序是大数据的入门程序。

WordCount 程序是在不同上下文环境下实现的,是一个入门版本,可以跟着一步一步实现起来。包括 Streaming 和 Batch 以及 SQL 的简单案例。

上述所有的 Flink 语义都会在后面分篇章详细赘述。

基础配置

首先pom.xml 中要配置的依赖是:

provided 选项在这表示此依赖只在代码编译的时候使用,运行和打包的时候不使用。

版本依赖

代码语言:javascript复制
<properties>
    <maven.compiler.source>8</maven.compiler.source>
    <maven.compiler.target>8</maven.compiler.target>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <flink.version>1.13.6</flink.version>
    <scala.version>2.12</scala.version>
</properties>

java 相关依赖:

代码语言:javascript复制
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-java</artifactId>
    <version>${flink.version}</version>
  <!-- <scope>provided</scope> -->
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-java_${scala.version}</artifactId>
    <version>${flink.version}</version>
  <!-- <scope>provided</scope>-->
</dependency>

scala 相关依赖:

代码语言:javascript复制
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
  <!-- <scope>provided</scope> -->
</dependency>

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-clients_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
  <!-- <scope>provided</scope>-->
</dependency>

另外,pom文件中镜像文件建议配置maven仓库,国内下载速度会快,如果找不到对应的镜像文件,需要切换到国外仓库。

代码语言:javascript复制
<repositories>
    <repository>
        <id>central</id>
        <name>aliyun maven</name>
        <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
        <layout>default</layout>
        <!-- 是否开启发布版构件下载 -->
        <releases>
            <enabled>true</enabled>
        </releases>
        <!-- 是否开启快照版构件下载 -->
        <snapshots>
            <enabled>false</enabled>
        </snapshots>
    </repository>
</repositories>

语言界的 hello word,大数据界的 WordCount,都是一个入门Demo。

今天咱们也按照这个入门的 Demo,把 Flink 相关代码捋顺。

包括 Streaming、Batch 以及 Flink Sql 三方面分别来实现。

Streaming WordCount

先来分析一个 Streaming WordCount。

为了模仿流式计算,咱们在本地利用 netcat 命令 nc -l {port}来进行模仿数据产出。

同时,咱们实现的功能是:每隔 1s 计算过去 2s 内产出数据各单词的个数,也就是实现每隔1s计算过去 2s 的 WordCount 程序。

将窗口内接收到的数据进行拆分致每一行,然后分别赋值为1,之后进行分组求和。

大致处理的流程如上所示,现在来一步一步实现这个案例。

先开始创建 Flink 的运行环境:

代码语言:javascript复制
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

然后指定了数据 Source 源,以及 Source 源的一些配置:

代码语言:javascript复制
String hostname = "127.0.0.1";
int port = 8899;
String delimiter = "n";
// 链接 socket 获取数据
DataStreamSource<String> text = env.socketTextStream(hostname, port, delimiter);

之后就进行了数据的平铺,分组,窗口计算等操作。

另外,程序中实现了一个内部类WordWithCount,用来表示单词的 key 和 count。

利用 keyBy()函数对 key进行分组。

window函数表示每一个滑动窗口,SlidingProcessingTimeWindows实现每隔 1s 对过去 2s 进行计数。

后面的教程会详细讲解 Windows 相关知识,这里仅做入门学习。

下面整体看下代码:

代码语言:javascript复制
public class SocketWindowWCJava {
    public static void main(String[] args) throws Exception {
        // 获取流式运行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        String hostname = "127.0.0.1";
        int port = 8899;
        String delimiter = "n";
        // 获取数据源(Socket数据源,单词以逗号分割)
        DataStreamSource<String> source = env.socketTextStream(hostname, port, delimiter);
        SingleOutputStreamOperator<WC> res = source.flatMap(new FlatMapFunction<String, WC>() {

                    @Override
                    public void flatMap(String value, Collector<WC> out) throws Exception {
                        String[] splits = value.split(",");
                        for (String split : splits) {
                            out.collect(new WC(split, 1));
                        }
                    }
                }).keyBy(x -> x.word)
                .window(SlidingProcessingTimeWindows.of(Time.seconds(1), Time.seconds(2)))  // 每隔1秒,统计过去2秒的数据
                // .sum("count");
                .reduce(new ReduceFunction<WC>() {
                    @Override
                    public WC reduce(WC t1, WC t2) throws Exception {
                        return new WC(t1.word, t1.count t2.count);
                    }
                });
        
        res.print().setParallelism(1);
        env.execute("SocketWindowWCJava");
    }

    public static class WC {
        public String word;
        public int count;

        public WC() {}
        public WC(String word, int count) {
            this.word = word;
            this.count = count;
        }

        @Override
        public String toString() {
            return "WC{"  
                    "word='"   word   '''  
                    ", count="   count  
                    '}';
        }
    }
}

现在把程序执行起来,先在本地起一个netcat程序,然后启动Flink程序:

代码语言:javascript复制
$ nc -lk 8899
flink,flink,spark
hadoop,flink

之后,控制台进行了相应的打印:

用 java 实现完,接下来用 scala 也实现一下相同的逻辑,有兴趣的朋友可作参考:

代码语言:javascript复制
object SocketWindowWCScala {
  def main(args: Array[String]): Unit = {
    // 获取运行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    val hostname = "localhost"
    val port = 8899
    val delimiter = 'n'
    val source = env.socketTextStream(hostname, port, delimiter)

    import org.apache.flink.api.scala._
    // 数据格式:word,word2,word3
    val res = source.flatMap(line => line.split(',')) // 将每一行按照逗号打平
      .map(word => WC(word, 1))
      .keyBy(x => x.word)
      .window(SlidingProcessingTimeWindows.of(Time.seconds(1), Time.seconds(2)))
      .reduce((v1, v2) => WC(v1.word, v1.count   v2.count))

    res.print("data: ").setParallelism(1)

    env.execute("SocketWindowWCScala")
  }

  case class WC(word: String, count: Long)
}

依然是启动 flink 程序和 nc:

代码语言:javascript复制
nc -lk 8888
flink,flink,spark
hadoop,flink

再看控制台的打印结果,是和咱们想实现的一致:

再次注意:窗口的使用方式在新版本中有较大的区别,这个咱们在后面会详细把这部分进行讲解。

Batch WordCount

批处理程序,这里用一个文本来作为数据源。

将文本中的数据进行拆分致每一行,然后分别赋值为1,之后进行分组求和。

处理逻辑依然如图所示,然后下面咱们也创建一个文本如图里的内容(src/main/datas/dm.csv):

代码语言:javascript复制
Java,Fink
Scala 
Streaming
Flink,Java 
Scala
Batch,Scala

首先创建 Flink 运行环境

代码语言:javascript复制
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

之后进行读取文件

代码语言:javascript复制
DataSource text = env.readTextFile(filePath);

然后通过实现 FlatMapFunction 接口进行数据的打平操作(上面类 Tokenizer 的实现)。

最后进行分组求和,Batch WordCount 全部完成!

下面看 Batch 整体代码:

代码语言:javascript复制
public class WordCountJava {
    public static void main(String[] args) throws Exception {
        String filePath = "./datas/dm.csv";
        String resultPath = "./datas/wc_rst.csv";

        // 获取运行环境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        DataSource<String> source = env.readTextFile(filePath);

        AggregateOperator<Tuple2<String, Integer>> res = source.flatMap(new JGFlatMapFunction())
                .groupBy(0)
                .sum(1);
        res.print();
        res.writeAsCsv(resultPath).setParallelism(1);

        env.execute("WordCountJava");
    }

    public static class JGFlatMapFunction implements FlatMapFunction<String, Tuple2<String, Integer>> {
 
        @Override
        public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
            String[] splits = value.split(",");
            for (String split : splits) {
                out.collect(Tuple2.of(split, 1));
            }
        }
    }
}

程序中,通过读取./datas/dm.csv中的数据,最后计算结果打印到控制台以及存储结果数据到./datas/wc_rst.csv

执行起来,看打印结果:

求得给定文件的 WordCount 的结果。

下面用 Scala 实现一次:

代码语言:javascript复制
object WordCountScala {
  def main(args: Array[String]): Unit = {
    val filePath = "./datas/dm.csv"
    val resultPath = "./datas/wc_rst.csv"

    // 获取运行环境
    val env = ExecutionEnvironment.getExecutionEnvironment

    val text = env.readTextFile(filePath)

    //引入隐式转换
    import org.apache.flink.api.scala._
    val counts = text.flatMap { _.toLowerCase.split(",") filter { _.nonEmpty } }
      .map((_, 1))
      .groupBy(0)
      .sum(1)
    counts.print()
    counts.writeAsCsv(resultPath, "n", " ")
  }
}

用 Scala 实现起来就很简单了。

注意:这块如果代码出错的话,试着找找导入的包是否正确。

Flink SQL WordCount

尤其是有过 MapReduce 和 Hive 经历的朋友,就可以和它们放在一起做比较,一个复杂,一个简单。

比如说下面的 SQL 语句,就一句就可以省去上面那么多的代码工作量。

代码语言:javascript复制
SELECT word, COUNT(*) FROM table GROUP BY word;

下面利用 FlinkSQL 实现 WordCount 功能。

首先,pom 文件必须要添加的依赖:

代码语言:javascript复制
<!-- use the Table API & SQL for defining pipelines.-->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-api-java-bridge_${scala.version}</artifactId>
    <version>${flink.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-api-scala-bridge_${scala.version}</artifactId>
    <version>${flink.version}</version>
</dependency>
<!-- run the Table API & SQL programs locally within your IDE,-->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-planner-blink_${scala.version}</artifactId>
    <version>${flink.version}</version>
</dependency>

<!-- SQL Client-->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-common</artifactId>
    <version>${flink.version}</version>
    <!-- <scope>provided</scope>-->
</dependency>

先用 Java 来实现 FlinkSQL,将 nc 程序起来,进行按照逗号分割进行测试。

代码语言:javascript复制
$ nc -lk 8899
spark,flink,spark
spark,flink,spark
...

a. 首先创建 Flink 的运行环境以及 SQL api 环境:

代码语言:javascript复制
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

b. nc 输出,将字符串转换为 (word, 1L) 的格式:

代码语言:javascript复制
SingleOutputStreamOperator<WC> dataStream = env.socketTextStream("localhost", 8899)
        .flatMap(new FlatMapFunction<String, WC>() {
    @Override
    public void flatMap(String value, Collector<WC> out) throws Exception {
        String[] splits = value.split(",");
        for (String split : splits) {
            out.collect(new WC(split, 1L));
        }
    }
});

c. 注册成表,转为视图&查询

代码语言:javascript复制
Table WordCountTable = tableEnv.fromDataStream(dataStream);
tableEnv.createTemporaryView("WC", WordCountTable);
Table resultTable = tableEnv.sqlQuery("SELECT word, SUM(`count`) FROM WC group by word");

d. 转为 Stream 并且打印出来

代码语言:javascript复制
tableEnv.toRetractStream(resultTable, Row.class).print().setParallelism(1);

下面看整体代码:

代码语言:javascript复制
public class WordCountWithSQLJava {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        SingleOutputStreamOperator<WC> dataStream = env.socketTextStream("localhost", 8899)
                .flatMap(new FlatMapFunction<String, WC>() {
            @Override
            public void flatMap(String value, Collector<WC> out) throws Exception {
                String[] splits = value.split(",");
                for (String split : splits) {
                    out.collect(new WC(split, 1L));
                }
            }
        });

        //DataStream 转sql & 查询
        Table WordCountTable = tableEnv.fromDataStream(dataStream);
        tableEnv.createTemporaryView("WC", WordCountTable);
        Table resultTable = tableEnv.sqlQuery("SELECT word, SUM(`count`) FROM WC group by word");

        // 将结果数据转换为DataStream toRetractStream toAppendStream
        tableEnv.toRetractStream(resultTable, Row.class).print().setParallelism(1);
        env.execute("WCSQLJava");
    }

    public static class WC {
        public String word;
        public long count;

        public  WC() {}
        public WC(String word, long count) {
            this.word = word;
            this.count = count;
        }
        @Override
        public String toString() {
            return "WC {"  
                    "word='"   word   '''  
                    ", count="   count  
                    '}';
        }
    }
}

整体代码执行结果:

其中, 是操作后,I 是插入,U是更新,D是删除。例如:-U是撤回前的数据, U是更新后的数据

true代表数据插入,false代表数据的撤回

Java 实现后,下面再用 Scala 来实现一次,代码逻辑一致,可以参考:

代码语言:javascript复制
object WordCountSQLScala {
  def main(args: Array[String]): Unit = {
    // 创建运行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val tableEnv = StreamTableEnvironment.create(env)

    import org.apache.flink.api.scala._
    // 从 nc 接入数据, 数据格式:word,word2,word3
    val dataStream = env.socketTextStream("localhost", 8899, 'n')
      .flatMap(line => line.split(','))
      .map(word => WC(word, 1L))

    // 转换为一个表(table) & 查询
    val inputTable = tableEnv.fromDataStream(dataStream)
    tableEnv.createTemporaryView("WC", inputTable)
    val resultTable = tableEnv.sqlQuery("SELECT word, SUM(`count`) FROM WC GROUP BY word")

    // toAppendStream toRetractStream
    val resValue = tableEnv.toChangelogStream(resultTable)
    resValue.print().setParallelism(1)

    env.execute("WordCountSQLScala")
  }

  case class WC(word: String, count: Long)
}

代码执行的结果也一致:

总结

今天实现了大数据的经典案例 WordCount,然后在不同场景下的实现。包括 Streaming 和 Batch,以及 Flink SQL 的实现。

该篇文章还只是一个入门级的程序,后面将会各重要点进行详细阐述。

0 人点赞