零距离接触Flink:全面解读流计算框架入门与实操指南

2023-09-14 10:48:52 浏览数 (1)

前言

Apache Flink作为开源的分布式流处理框架,受到了广泛的关注和应用。本文将分享如何从零开始搭建一个Flink运行环境,并在其上运行一个“WordCount”的例子程序。

Flink环境搭建

1. 环境准备

Flink支持在Linux、MacOS和Windows三大平台上部署。本文以Linux环境为例。

需要的软件依赖如下:

  • JDK 8或以上版本
  • Maven 3.5
  • Flink 1.14.5版本
代码语言:javascript复制
# 安装JDK
yum install -y java-1.8.0-openjdk-devel

# 安装Maven
yum install -y maven

接着下载Flink压缩包进行解压:

代码语言:javascript复制
wget https://dlcdn.apache.org/flink/flink-1.14.5/flink-1.14.5-bin-scala_2.12.tgz 
tar -xvf flink-1.14.5-bin-scala_2.12.tgz

2.单机模式运行Flink

单机模式下,JobManager和TaskManager均运行在同一台机器上。

代码语言:javascript复制
# 启动JobManager
./bin/start-cluster.sh

# 提交并运行WordCount程序
./bin/flink run examples/streaming/WordCount.jar

本文以单机模式为例进行讲解。实际生产环境中,建议部署在集群模式下运行。

3. 分布式集群模式

在集群模式下,JobManager和TaskManager会部署在不同节点上。

  • 首先在一台机器上启动ResourceManager
  • 在其他Worker节点上启动TaskManager
  • 提交Job到JobManager进行调度和运行

以此实现Flink在分布式环境下高可靠且高性能的计算。

4. 编写WordCount程序

WordCount是一个流式WordCount程序,读取文本源头,以单词为单位进行计数统计。

代码语言:javascript复制
// 定义文本源DataStream
DataStream<String> text = env.socketTextStream("localhost", 9999); 

//将每行内容切分转换成单词列表
DataStream<String> words = text
  .flatMap(new FlatMapFunction<String, String>() {
     public void flatMap(String value, Collector<String> out) {
       String[] split = value.toLowerCase().split("\W ");
       // ...
    }
  });
  
//按单词进行计数统计        
DataStream<Tuple2<String, Long>> counts = words
  .keyBy(value -> value)
  .sum(1);
  
//输出结果
counts.print();

5. 运行和结果

编译打包项目,使用FlinkClient提交Job:

代码语言:javascript复制
mvn clean package

bin/flink run target/wordcount-1.0-SNAPSHOT.jar

运行程序,使用netcat工具发送输入字符串,可以实时看到统计结果:

代码语言:javascript复制
nc localhost 9999
hello world bye
hello again

6.代码示例

这里提供一个完整的WordCount流处理程序代码示例:

代码语言:javascript复制
// 导入Flink相关包
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;

public class WordCount {

  public static void main(String[] args) throws Exception {

    // 创建执行环境
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    // 从文件读取文本行数据源
    DataStream<String> text = env.addSource(new MySourceFunction());

    // 将每行内容切分成单词
    DataStream<String> words = text.flatMap(new FlatMapFunction<String, String>() {
      public void flatMap(String value, Collector<String> out) {
        String[] splits = value.split("\s ");
        for (String word : splits) {
          out.collect(word);
        }
      }
    });

    // 按单词进行分组计数
    DataStream<Tuple2<String, Long>> result = words.keyBy(e -> e)
      .timeWindow(Time.seconds(5)) 
      .sum(1);

    // 打印最终结果
    result.print();

    // 执行任务
    env.execute("WordCount");

  }

  // 自定义文本数据源
  public static class MySourceFunction implements SourceFunction<String> {

    @Override
    public void run(SourceContext<String> ctx) throws Exception {
      // 从文件或集合读取文本 
      // ...
      ctx.collect("hello world"); 
    }

    @Override
    public void cancel() {

    }
  }

}

该示例从文件读取文本行,进行词频统计,并以对象流的方式输出结果。希望能给您一个完整代码实例的参考!

Flink与Yarn集成

Flink可以利用Yarn资源管理器来管理和调度Flink作业的执行。主要有以下步骤:

1. 安装和配置Yarn

安装Hadoop并配置Yarn资源管理器。

2. 配置Flink支持Yarn

修改flink-conf.yaml配置文件,添加如下配置:

代码语言:javascript复制
yarn.distributed.enabled: true

3. 打包Flink项目为Yarn应用

代码语言:javascript复制
mvn package -Pyarn 

4. 提交Flink作业到Yarn

代码语言:javascript复制
./bin/flink run -m yarn-cluster -yn 1 -ys 1 /path/to/job.jar

-m 参数指定使用Yarn作为资源管理器,-yn -ys 分配给任务的Container数量。

5. Yarn WebUI监控作业

可以在Yarn ResourceManager WebUI中查看和监控Flink作业状态。

6. 停止和重启作业

使用Flink Cli同样可以停止和重启在Yarn上运行的作业。

与此同时,Yarn也能根据负载自动扩缩容Flink作业上的Container数量。这样实现了Flink与Yarn的良好集成。

通过上述步骤就可以利用Yarn的资源管理能力来管理Flink分布式作业的执行了。

Flink通过时间窗口操作sql

Flink通过Table API和SQL来支持时间窗口的操作。

下面通过一个例子来说明:

1. 定义数据源

导入Flink的TableEnvironment:

代码语言:javascript复制
TableEnvironment tableEnv = TableEnvironment.create(env);

从Kafka读取数据注册成Table:

代码语言:javascript复制
tableEnv.connect(new FlinkKafkaConsumer<>(...)
  .property(...));

2. 定义表结构

使用DDL定义Table结构:

代码语言:javascript复制
CREATE TABLE inputTable (
  id STRING, 
  timestamp TIMESTAMP,
  ...)
WITH (...); 

3. 定义窗口

使用TUMBLE或HOP动态时间窗口

代码语言:javascript复制
SELECT 
  id, 
  COUNT(*) 
FROM 
  inputTable
GROUP BY 
  TUMBLE(timestamp, INTERVAL '5' MINUTE)

4. 窗口转换

支持窗口函数如SUM、COUNT、MAX等聚合计算:

代码语言:javascript复制
SELECT 
  SUM(amount) 
FROM 
  inputTable
GROUP BY 
  HOP(timestamp, INTERVAL '1' HOUR, INTERVAL '30' MINUTE)

5. 输出结果

将结果输出到Kafka或打印:

代码语言:javascript复制
tableEnv.toRetractStream[Row]...

通过Table API和SQL的时间窗口支持,可以更高效地操作和处理时间序列数据流。开发者可以使用熟悉的SQL语法进行流处理。

6. sql任务代码示例

这里提供一个完整的使用SQL实现单词计数的示例:

代码语言:javascript复制
// 创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
TableEnvironment tableEnv = TableEnvironment.create(env); 

// 从Kafka读取文本行数据
tableEnv.connect(new FlinkKafkaConsumer<>(...)
  .topic("kafka_topic"))
  .withFormat(new SimpleStringSchema())
  .createTemporaryTable("lines");

// 分词表
tableEnv.executeSql(
  "CREATE TABLE words WITH ('connector' = 'upsert', 'url' = '...") AS "  
  "SELECT "  
  "  ROW_NUMBER() OVER() AS id, "  
  "  word "   
  "FROM lines, LATERAL(FLATTEN(SPLIT(lines, ' ')))";

// 窗口聚合表   
tableEnv.executeSql(
  "CREATE TABLE word_counts WITH ('connector' = 'upsert', 'url' = '...'") AS "  
  "SELECT "  
  "  word, "  
  "  COUNT(*) AS count "  
  "FROM words "  
  "GROUP BY TUMBLE(rowtime, INTERVAL '5' SECOND), word");

// 输出结果
tableEnv.executeSql("INSERT INTO sink SELECT * FROM word_counts");

// 执行程序
env.execute();

这个完整示例包含数据输入、分词、窗口聚合和结果输出的全流程SQL定义。希望对您理解SQL实现流处理过程有帮助。

时间窗口说明

1. 滚动窗口

  • 滚动窗口分为定长窗口(TUMBLE)和滑动窗口(HOP)两种。
  • 定长窗口将事件锁定到连续的固定大小时间窗口中,窗口不重合。
  • 滑动窗口以固定时间间隔滑动,窗口重合部分可重复计算。

2. 窗口分配

  • 每条事件根据时间戳分配到对应的窗口份组中。
  • 窗口分配采用窗函数TIMESTAMP_WINDOW(timeField,窗口大小)实现。

3. 窗口聚合

  • 事件分配完毕后,对每个窗口执行聚合操作(如COUNT、SUM等)。
  • 窗口会将中间结果保存在状态后端(如RocksDB)。

4. 窗口结果输出

  • 窗口被关闭时(到期),将最终结果输出。
  • 也可以提前输出或定期输出中间结果。

5. 状态管理

  • 窗口状态会进行快照保存,实现断点续传重启能力。
  • 状态由KeyedStateBackend管理,比如RocksDB。

所以Flink时间窗口的原理就是:根据时间戳分配事件到窗口,窗口聚合操作更新状态,窗口关闭时输出结果。它独立于算子,为流处理引入了时间的概念。

6. 同批次时间窗口处理逻辑

如果一次从Kafka拉取的数据中,有一半的数据在当前时间窗口内,一半在窗口外,Flink会进行如下处理:

  • 先根据事件时间戳,将数据分配到对应的时间窗口分区组(keyed state)中。
  • 对每个时间窗口分区组单独处理:
    • 时间窗口内的数据按正常流程进行聚合计算。
    • 时间窗口外的数据不会参与当前窗口的聚合,但是会加入该key的back pressure。
  • 窗口结果输出时:
    • 只输出当前窗口已经关闭的分区组的结果。其他分区组处于开启状态,不会输出。
  • 周期性检查窗口状态:
    • 关闭那些超出时间范围的过期窗口。
    • 对还未到期的窗口继续累积状态,待到期后输出结果。

所以Flink可以正确区分时间窗口内外的数据:

  • 窗口内数据参与当前窗口计算
  • 窗口外数据加入back pressure,未来窗口处理
  • 只输出实际到期窗口的结果

这样保证了时间正确性,不会导致窗口结果计算错误

0 人点赞