构建第一个Flink应用-WordCount

2020-07-10 11:59:26 浏览数 (1)

本篇文章大概5143字,阅读时间大约13分钟

体验flink的hello world

使用maven初始化第一个flink的wordcount应用,将应用打包上传到flink-standalone集群,运行起来。

1

文档编写目的

  • 使用maven生成flink的模板应用
  • 开发wordcount应用

2

构建maven工程

进入模板工程的目录,构建一个maven工程

代码语言:javascript复制
mvn archetype:generate 
-DarchetypeGroupId=org.apache.flink 
-DarchetypeArtifactId=flink-quickstart-java 
-DarchetypeVersion=1.10.1

运行该命令会提示输入maven项目的groupId artifactId version信息,输入即可

将工程导入idea,引入flink-scala的依赖,去除模板项目中java依赖的scope

代码语言:javascript复制
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-scala_${scala.binary.version}</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
      <version>${flink.version}</version>
    </dependency>

添加scala编译插件

代码语言:javascript复制
      <plugin>
        <groupId>net.alchim31.maven</groupId>
        <artifactId>scala-maven-plugin</artifactId>
        <version>3.4.6</version>
        <executions>
          <execution>
            <goals>
              <goal>compile</goal>
              <goal>testCompile</goal>
            </goals>
          </execution>
        </executions>
      </plugin>

      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-assembly-plugin</artifactId>
        <version>3.0.0</version>
        <configuration>
          <descriptorRefs>
            <descriptorRef>jar-with-dependencies</descriptorRef>
          </descriptorRefs>
        </configuration>
        <executions>
          <execution>
            <id>make-assembly</id>
            <phase>package</phase>
            <goals>
              <goal>single</goal>
            </goals>
          </execution>
        </executions>
      </plugin>

3

Scala

StreamingWordCount

本地调试
代码语言:javascript复制
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}

object StreamingWordCount {

  val HOST:String = "localhost"
  val PORT:Int = 9001

  /**
   * stream word count
   * @param args input params
   */
  def main(args: Array[String]): Unit = {

    //get streaming env
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    //get socket text stream
    val wordsDstream: DataStream[String] = env.socketTextStream(HOST, PORT)

    import org.apache.flink.api.scala._

    //word count
    val wordRes: DataStream[(String, Int)] = wordsDstream.flatMap(_.split(","))
      .filter(_.nonEmpty)
      .map((_, 1))
      .keyBy(0)
      .sum(1)

    wordRes.print()
    
    env.execute("Flink Streaming WordCount!")
  }
}

启动应用,在终端进行socket word输入

代码语言:javascript复制
nc -lk 9001

终端输入word数据流

streaming应用的控制台中可以看到

streaming word count调试完成

集群运行

按照之前文章中编译的flink-1.10.1的包,启动集群

代码语言:javascript复制
./bin/start-cluster.sh

访问localhost:8081出现flink-web

在submit new job中上传刚才打包好的应用程序,在maven中package一下就可以,点击submit运行

在终端上输入words,采用逗号分隔

查看task managers中的stdout

BatchWordCount

代码语言:javascript复制
import org.apache.flink.api.scala.ExecutionEnvironment

object BatchWordCount {

  /**
   * batch word count
   *
   * @param args input params
   */
  def main(args: Array[String]): Unit = {

    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment

    import org.apache.flink.api.scala._

    val words: DataSet[String] = env.fromElements("spark,flink,hbase", "impala,hbase,kudu", "flink,flink,flink")

    //word count
    val wordRes: AggregateDataSet[(String, Int)] = words.flatMap(_.split(","))
      .map((_, 1))
      .groupBy(0)
      .sum(1)

    wordRes.print()
  }
}

运行结果如下:

4

Java

BatchWordCount

代码语言:javascript复制
package com.eights;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.AggregateOperator;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
import org.apache.flink.util.StringUtils;

public class BatchJob {

    public static void main(String[] args) throws Exception {
        // set up the batch execution environment
        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        DataSource<String> words = env.fromElements("spark,flink,hbase", "impala,hbase,kudu", "flink,flink,flink");

        AggregateOperator<Tuple2<String, Integer>> wordCount = words.flatMap(new WordLineSplitter())
                .groupBy(0)
                .sum(1);

        wordCount.print();

    }

    public static final class WordLineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {

        @Override
        public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) {
            String[] wordsArr = s.split(",");

            for (String word : wordsArr) {
                if (!StringUtils.isNullOrWhitespaceOnly(word)) {
                    collector.collect(new Tuple2<>(word, 1));
                }
            }

        }
    }
}

运行结果

StreamingWordCount

代码语言:javascript复制
package com.eights;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import org.apache.flink.util.StringUtils;

public class StreamingJob {

    public static void main(String[] args) throws Exception {
        // set up the streaming execution environment
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        String HOST = "localhost";
        int PORT = 9001;

        DataStreamSource<String> wordsSocketStream = env.socketTextStream(HOST, PORT);

        SingleOutputStreamOperator<Tuple2<String, Integer>> wordRes = wordsSocketStream.flatMap(new WordsLineSplitter())
                .keyBy(0)
                .sum(1);

        wordRes.print();

        // execute program
        env.execute("Flink Streaming Java API Word Count");
    }

    private static class WordsLineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
        @Override
        public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) {
            String[] wordsArr = s.split(",");

            for (String word : wordsArr) {
                if (!StringUtils.isNullOrWhitespaceOnly(word)) {
                    collector.collect(new Tuple2<>(word, 1));
                }
            }
        }
    }
}

运行结果如下

Ps:

编写文档的目的,主要是备忘和记录自己的大数据组件学习路径,记下坑和处理的流程。每周坚持写两篇吧,一年之后回头看自己的大数据之路~

0 人点赞