大数据Flink-Java学习之旅第一篇

2021-12-24 11:38:01 浏览数 (1)

一、Flink 简介

1、初识 Flink

Flink 起源于 Stratosphere 项目,Stratosphere 是在 2010~2014 年由 3 所地处柏林的大学和欧洲的一些其他的大学共同进行的研究项目,2014 年 4 月 Stratosphere 的代 码被 复制 并捐赠 给了 Apache 软件基 金会, 参加 这个 孵化项 目的 初始 成员 是Stratosphere 系统的核心开发人员,2014 年 12 月,Flink 一跃成为 Apache 软件基金会的顶级项目。

在德语中,Flink 一词表示快速和灵巧,项目采用一只松鼠的彩色图案作为 logo,这不仅是因为松鼠具有快速和灵巧的特点,还因为柏林的松鼠有一种迷人的红棕色,而 Flink 的松鼠 logo 拥有可爱的尾巴,尾巴的颜色与 Apache 软件基金会的 logo 颜色相呼应,也就是说,这是一只 Apache 风格的松鼠。

Flink 项目的理念是:“Apache Flink 是为分布式、高性能、随时可用以及准确的流处理应用程序打造的开源流处理框架”。

Apache Flink 是一个框架和分布式处理引擎,用于对无界和有界数据流进行有状态计算。Flink 被设计在所有常见的集群环境中运行,以内存执行速度和任意规模来执行计算。

2、Flink 的重要特点

2.1、事件驱动型(Event-driven)

事件驱动型应用是一类具有状态的应用,它从一个或多个事件流提取数据,并根据到来的事件触发计算、状态更新或其他外部动作。比较典型的就是以 kafka 为代表的消息队列几乎都是事件驱动型应用。

与之不同的就是 SparkStreaming 微批次,如图:

事件驱动型:

2.2、流与批的世界观

批处理的特点是有界、持久、大量,非常适合需要访问全套记录才能完成的计算工作,一般用于离线统计。

流处理的特点是无界、实时, 无需针对整个数据集执行操作,而是对通过系统传输的每个数据项执行操作,一般用于实时统计。

在 spark 的世界观中,一切都是由批次组成的,离线数据是一个大批次,而实时数据是由一个一个无限的小批次组成的。

而在 flink 的世界观中,一切都是由流组成的,离线数据是有界限的流,实时数据是一个没有界限的流,这就是所谓的有界流和无界流。

无界数据流:无界数据流有一个开始但是没有结束,它们不会在生成时终止并提供数据,必须连续处理无界流,也就是说必须在获取后立即处理 event。对于无界数据流我们无法等待所有数据都到达,因为输入是无界的,并且在任何时间点都不会完成。处理无界数据通常要求以特定顺序(例如事件发生的顺序)获取 event,以便能够推断结果完整性。

有界数据流:有界数据流有明确定义的开始和结束,可以在执行任何计算之前通过获取所有数据来处理有界流,处理有界流不需要有序获取,因为可以始终对有界数据集进行排序,有界流的处理也称为批处理。

这种以流为世界观的架构,获得的最大好处就是具有极低的延迟。

2.3、分层 api

Flink 几大模块

  • Flink Table & SQL
  • Flink Gelly
  • Flink CEP

二、快速上手

1、搭建 maven 工程 FlinkTutorial

代码语言:javascript复制
<dependencies>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-java</artifactId>
        <version>1.10.1</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java_2.12</artifactId>
        <version>1.10.1</version>
    </dependency>
</dependencies>
复制代码

2、批处理 wordcount

代码语言:javascript复制
public static void main(String[] args) throws Exception {
    // 创建执行环境
    ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

    // 从文件中读取数据
    String inputPath = "word.txt";
    DataSet<String> inputDataSet = env.readTextFile(inputPath);

    // 对数据集进行处理,按空格分词展开,转换成(word,1)二元组进行统计
    inputDataSet.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
        @Override
        public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
            for (String word : value.split(" ")) {
                out.collect(new Tuple2<>(word, 1));
            }
        }
    }).groupBy(0) // 按照第一个位置的word分组
            .sum(1) // 将第二个位置上的数据求和
            .print();

}
复制代码

3、流处理 wordcount

代码语言:javascript复制
public static void main(String[] args) throws Exception {
    // 创建流式处理环境
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    // 用parameter tool工具从程序启动参数中提取配置项
    ParameterTool parameterTool = ParameterTool.fromArgs(args);
    // nc -lk 7777
    // --host hadoop102 --port 7777
    String hostname = parameterTool.get("host", "hadoop102");
    int port = parameterTool.getInt("port", 7777);

    DataStream<String> inputDataSet = env.socketTextStream(hostname, port);

    // 基于数据流进行转换计算
    inputDataSet.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
        @Override
        public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
            for (String word : value.split(" ")) {
                out.collect(new Tuple2<>(word, 1));
            }
        }
    }).keyBy(0)
            .sum(1)
            .print();

    // 执行任务
    env.execute();
}

0 人点赞