Flink(二)

2022-10-25 16:02:27 浏览数 (1)

Flink

  • 一、Flink流处理API
    • 1. Environment
    • 2. Source
    • 3. Transform
    • 3* 支持的数据类型
    • 3** 实现UDF函数(更细粒度的控制流)
    • 4. Sink
  • 二、Flink Window API
    • 1. Window概念
    • 2. Window API

流处理系统由于需要支持无限数据集的处理,一般采用一种数据驱动的处理方式。它会提前设置一些算子,然后等到数据到达后对数据进行处理。

为了表达复杂的逻辑,flink在内的分布式流处理引擎,一般采用 DAG(有向无环图) 图来表示整个计算逻辑,其中 DAG 图中的每一个点就代表一个基本的逻辑单元,也就是前面说的算子,由于计算逻辑被组织成有向图,数据会按照边的方向,从一些特殊的 Source 节点流入系统,然后通过网络传输、本地传输等不同的数据传输方式在算子之间进行发送和处理,最后会通过另外一些特殊的 Sink 节点将计算结果发送到某个外部系统或数据库中。

一、Flink流处理API

Environment -> Source -> Transform -> Sink 懒加载模式,需要手动执行。

1. Environment

1.1 getExecutionEnvironment 创建一个执行环境,表示当前执行程序的上下文。自动查询当前运行的方式,返回Local或Remote,调用底层方法。

1.2 createLocalEnvironment 返回本地执行环境,需要在调用时指定默认的并行度。

1.3 createRemoteEnvironment 返回集群执行环境,将Jar包提交到远程服务器。需要在调用时制定JM的IP和端口号,并指定要在集群中运行的Jar包(有变动需要修改源码)。

2. Source

2.1 fromCollection 有界流:从自定义的集合中读取、从文件中读取

无界流:从Kafka中读取数据

代码语言:javascript复制
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-kafka-${kafka.version}_${scala.version}</artifactId>
  <version>${flink.version}</version>
</dependency>

先定义Kafka的Properties,env.addSource(new FlinkKafkaConsumer${version}[String](,,));。Flink会将Kafka的Offset作为状态保存,并保证状态一致性。

自定义Source:自定义一个继承SourceFunction类

3. Transform

常见的转换算子:map、flatMap、Filter、KeyBy、(基本)滚动聚合算子、Reduce、(聚合)Split、Select、Connect、CoMap、Union(多流转换)。并行度可以在每个算子后设置。

  • 基本转换算子 (1)map 映射,对每个元素进行一定的变换后,映射为另一个元素。输出泛型可以变化,常用作分词操作。

(2)flatMap 将元素摊平,每个元素可以变为0个、1个、或者多个元素。

(3)Filter 过滤元素。

(4)KeyBy DataStream转换为KeyedStream,逻辑地将一个流拆分成不相交的分区,每个分区包含具有相同key的元素(内部hash),分区不分流。

  • 聚合算子 (5)滚动聚合算子(Rolling Aggregation) 针对KeyedStream的每一个支流做聚合。 sum()min()max()minBy()maxBy()

(6)Reduce 归并操作,它可以将KeyedStream 转变为 DataStream,实质是按照key做叠加计算。

  • 多流转换算子 (7)Split DataStream转换为SplitStream,根据某些特征将一个DataStream拆分成两个或多个DataStream(结合Select提取数据)。 分流作用:处理kafka复杂数据中有效的数据,盖戳分流消费。

(8)Select(@deprecated:side output) 结合Split,将SplitStream数据提取出来,变为DataStream。

(9)Connect 两个DataStream(可以是不同类型流)合并为一个ConnectedStreams,但内部仍属于各自独立的DataStream。

(10)CoMap,CoFlatMap 结合Connect,将ConnectedStreams(可以是不同类型流)合并为一个DataStream。

(11)Union 一个或多个DataStream(是相同类型流)合并为一个DataStream。

3* 支持的数据类型

(1)Java和Scala基础数据类型; (2)Java和Scala元组(Tuples); (3)Scala样例类(case classes) (4)Java简单对象(POJO); (5)其他(ArrayList、HashMap、Enums)。

3** 实现UDF函数(更细粒度的控制流)

  1. 函数类(Function Classes) 自定义类继承对应的函数类,可以传参。
  2. 匿名函数(Lambda Function)
  3. 富函数(Rich Function) DataStream API提供的一个函数类的接口,所有Flink函数类都有其Rich版本。与常规函数的区别是,可以获取运行环境的上下文,并拥有一些生命周期方法(open、close、invoke)。 如MapFunction对应RichMapFunction。

4. Sink

Flink对外输出操作必须利用Sink完成(addSink(new SinkFunction(){})),print()实际调用的也是DataStreamSink方法,此外,官方提供了一部分框架的Sink。(Kafka提供了Source和Sink) (1)Kafka

代码语言:javascript复制
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-kafka-0.11_2.11</artifactId>
  <version>${flink.version}</version>
</dependency>
代码语言:javascript复制
env.addSink(new FlinkKafkaConsumer011[String]("${id:port}", "brokerList", new SimpleStringSchema()))

// 到这里就实现了Kafka进,Kafka出

(2)Redis

代码语言:javascript复制
<dependency>
  <groupId>org.apache.bahir</groupId>
  <artifactId>flink-connector-redis-2.11</artifactId>
  <version>${bahir.version}</version>
</dependency>

(3)ES

代码语言:javascript复制
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-elasticsearch6_2.11</artifactId>
  <version>${flink.version}</version>
</dependency>

(4)MySQL(JDBC连接)

代码语言:javascript复制
<dependency>
  <groupId>mysql</groupId>
  <artifactId>mysql-connector-java</artifactId>
  <version>${mysql.version}</version>
</dependency>

需要自定义RichSinkFunction(),仅在初始化时调用连接

二、Flink Window API

1. Window概念

将无界数据流切分为有界数据流集进行处理,窗口(window)就是切分无界流的一种方式,将流数据分发到有限大小的桶(bucket)中进行分析。

(1)类型 Time Window:

  • 滚动时间窗口(Tumbling Windows) 将数据依据固定的窗口长度 windows size 1个参数对数据进行切分,时间对齐,窗口长度固定,没有重叠。
  • 滑动时间窗口(Sliding Windows) 由固定的窗口长度 windows size 和滑动间隔 slice 2个参数组成 ,窗口长度固定,可以有重叠。当滑动间距等于窗口长度时为滚动时间窗口。(同一个数据可能属于不同的窗口)
  • 会话窗口(Session Windows) 由一系列事件组合一个指定时间长度的timeout间隙组成,即一段时间没有接收到新的数据就会生成新的窗口。时间无对齐。(无计数窗口,因为不能舍弃一段数据)

Count Window

  • 滚动计数窗口
  • 滑动计数窗口

2. Window API

窗口分配器window()方法,必须在keyBy之后才能用,再做聚合操作。flink还提供了.timeWindow.countWindow方法。

(1)WindowAssigner window()方法接收的参数是一个WindowAssigner。 Flink提供了: 滚动窗口(.timeWindow(Time.secounds(15))); 滑动窗口(.timeWindow(Time.secounds(15), Time.secounds(15))); 会话窗口(.window(EventTimeSessionWindows.withGap(Time.minutes(10)))); 全局窗口(一个无界流); 滚动计数窗口(.countWindow(5)); 滑动计数窗口(.countWindow(10, 2))。

(2)WindowFunction 定义了要对窗口中收集的数据做的计算操作。

  • 增量聚合函数(incremental aggregation functions) 每条数据到来就进行计算,保持一个简单的状态,计算内容简单。ReduceFunction、AggregateFunction。
  • 全窗口函数(full window functions) 先把窗口所有数据收集起来,等到计算的时候会遍历所有数据。ProcessWindowFunction。

(3)其他可选API .trigger():触发器,定义window什么时候关闭,触发计算并输出结果。 .evitor():移除器,定义移除某些数据的逻辑。 .allowedLateness():允许处理迟到(窗口关闭后)的数据。 .sideOutputLateData():将迟到的数据放入侧输出流。 .getSideOutPut():获取侧输出流。

0 人点赞