001. Flink入门案例-WordCount实时处理

2019-07-25 16:56:41 浏览数 (1)

1. maven依赖

代码语言:javascript复制
<properties>
    <!-- flink版本好 -->
    <flink.version>1.8.1</flink.version>
    <!-- scala主版本号 -->
    <scala.binary.version>2.11</scala.binary.version>
</properties>

<dependencies>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-java</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
        <version>${flink.version}</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-scala_${scala.binary.version}</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
        <version>${flink.version}</version>
    </dependency>
</dependencies>

2. Flink WordCount Java版

代码语言:javascript复制
package com.bairong.flink.java;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

/**
 * author: YangYunhe
 * date: 2019/7/22 19:32
 * description: 每隔1s对过去2s的数据进行WordCount
 */
public class SocketWindowWordCountJava {

    public static class WordWithCount {

        public String word;
        public long count;

        public WordWithCount() {}

        public WordWithCount(String word, long count) {
            this.word = word;
            this.count = count;
        }

        @Override
        public String toString() {
            return word   ": "   count;
        }
    }

    public static void main(String[] args) throws Exception {

        // 1. 解析外部参数,获取要监听的主机、端口,没有配置则取默认值localhost:9999
        ParameterTool tool = ParameterTool.fromArgs(args);
        String host = tool.get("host", "localhost");
        int port = tool.getInt("port", 9999);

        // 2. 获取Flink的运行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 3. 初始化数据
        DataStreamSource<String> source = env.socketTextStream(host, port);

        // 4. 计算,扁平化,每个单次计数为1,分组,累加次数
        SingleOutputStreamOperator<WordWithCount> counts = source.flatMap(new FlatMapFunction<String, WordWithCount>() {
            @Override
            public void flatMap(String line, Collector<WordWithCount> collector) throws Exception {
                String[] words = line.split("\s ");
                for(String word : words) {
                    collector.collect(new WordWithCount(word, 1L));
                }

            }
        }).keyBy("word")
        .timeWindow(Time.seconds(2), Time.seconds(1))
        .reduce(new ReduceFunction<WordWithCount>() {
            @Override
            public WordWithCount reduce(WordWithCount word1, WordWithCount word2) throws Exception {
                return new WordWithCount(word1.word, word1.count   word2.count);
            }
        });
        /*
         * 如果只是简单的相加,可以直接使用sum()方法
         * .keyBy("word").timeWindow(Time.seconds(2), Time.seconds(1)).sum("count")
         */

        // 5. 打印结果,设置并行度
        counts.print().setParallelism(1);

        // 6. 开启流任务,这是一个action算子,将触发计算
        env.execute("SocketWindowWordCountJava");

    }

}

3. 运行代码

(1) 设置参数,指定host和port

(2) 在服务器上执行 nc 命令

(3) 运行程序

(4) 在服务器上输出一些单词,在程序控制台查看结果

Flink WordCount 程序Java版就完成咯。

4. Flink WordCount Scala版

代码语言:javascript复制
package com.bairong.flink.scala

import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time

/**
  * author: YangYunhe
  * date: 2019/7/22 21:14
  * description: Flink WordCount Scala版
  */
object SocketWindowWordCountScala {

  def main(args: Array[String]): Unit = {

    val tool = ParameterTool.fromArgs(args)
    val host = tool.get("host", "localhost")
    val port = tool.getInt("port", 9999)

    val env = StreamExecutionEnvironment.getExecutionEnvironment

    val source: DataStream[String] = env.socketTextStream(host, port)

    source.flatMap(_.split("\s "))
      .map((_, 1))
      .keyBy(0)
      .timeWindow(Time.seconds(2), Time.seconds(1))
      .sum(1)
      .print()
      .setParallelism(1)

    env.execute("SocketWindowWordCountScala")

  }

}

测试过程同上。

有一个注意点是,scala API的类要全部导入:

代码语言:javascript复制
import org.apache.flink.streaming.api.scala._

否则代码编译会报错:

0 人点赞