Flink
- 一、Flink流处理API
- 1. Environment
- 2. Source
- 3. Transform
- 3* 支持的数据类型
- 3** 实现UDF函数(更细粒度的控制流)
- 4. Sink
- 二、Flink Window API
- 1. Window概念
- 2. Window API
流处理系统由于需要支持无限数据集的处理,一般采用一种数据驱动的处理方式。它会提前设置一些算子,然后等到数据到达后对数据进行处理。
为了表达复杂的逻辑,flink在内的分布式流处理引擎,一般采用 DAG(有向无环图) 图来表示整个计算逻辑,其中 DAG 图中的每一个点就代表一个基本的逻辑单元,也就是前面说的算子,由于计算逻辑被组织成有向图,数据会按照边的方向,从一些特殊的 Source 节点流入系统,然后通过网络传输、本地传输等不同的数据传输方式在算子之间进行发送和处理,最后会通过另外一些特殊的 Sink 节点将计算结果发送到某个外部系统或数据库中。
一、Flink流处理API
Environment -> Source -> Transform -> Sink 懒加载模式,需要手动执行。
1. Environment
1.1 getExecutionEnvironment 创建一个执行环境,表示当前执行程序的上下文。自动查询当前运行的方式,返回Local或Remote,调用底层方法。
1.2 createLocalEnvironment 返回本地执行环境,需要在调用时指定默认的并行度。
1.3 createRemoteEnvironment 返回集群执行环境,将Jar包提交到远程服务器。需要在调用时制定JM的IP和端口号,并指定要在集群中运行的Jar包(有变动需要修改源码)。
2. Source
2.1 fromCollection 有界流:从自定义的集合中读取、从文件中读取
无界流:从Kafka中读取数据
代码语言:javascript复制<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-${kafka.version}_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
先定义Kafka的Properties,env.addSource(new FlinkKafkaConsumer${version}[String](,,));
。Flink会将Kafka的Offset作为状态保存,并保证状态一致性。
自定义Source:自定义一个继承SourceFunction类
3. Transform
常见的转换算子:map、flatMap、Filter、KeyBy、(基本)滚动聚合算子、Reduce、(聚合)Split、Select、Connect、CoMap、Union(多流转换)。并行度可以在每个算子后设置。
- 基本转换算子 (1)map 映射,对每个元素进行一定的变换后,映射为另一个元素。输出泛型可以变化,常用作分词操作。
(2)flatMap 将元素摊平,每个元素可以变为0个、1个、或者多个元素。
(3)Filter 过滤元素。
(4)KeyBy DataStream转换为KeyedStream,逻辑地将一个流拆分成不相交的分区,每个分区包含具有相同key的元素(内部hash),分区不分流。
- 聚合算子 (5)滚动聚合算子(Rolling Aggregation) 针对KeyedStream的每一个支流做聚合。 sum()min()max()minBy()maxBy()
(6)Reduce 归并操作,它可以将KeyedStream 转变为 DataStream,实质是按照key做叠加计算。
- 多流转换算子 (7)Split DataStream转换为SplitStream,根据某些特征将一个DataStream拆分成两个或多个DataStream(结合Select提取数据)。 分流作用:处理kafka复杂数据中有效的数据,盖戳分流消费。
(8)Select(@deprecated:side output) 结合Split,将SplitStream数据提取出来,变为DataStream。
(9)Connect 两个DataStream(可以是不同类型流)合并为一个ConnectedStreams,但内部仍属于各自独立的DataStream。
(10)CoMap,CoFlatMap 结合Connect,将ConnectedStreams(可以是不同类型流)合并为一个DataStream。
(11)Union 一个或多个DataStream(是相同类型流)合并为一个DataStream。
3* 支持的数据类型
(1)Java和Scala基础数据类型; (2)Java和Scala元组(Tuples); (3)Scala样例类(case classes) (4)Java简单对象(POJO); (5)其他(ArrayList、HashMap、Enums)。
3** 实现UDF函数(更细粒度的控制流)
- 函数类(Function Classes) 自定义类继承对应的函数类,可以传参。
- 匿名函数(Lambda Function)
- 富函数(Rich Function) DataStream API提供的一个函数类的接口,所有Flink函数类都有其Rich版本。与常规函数的区别是,可以获取运行环境的上下文,并拥有一些生命周期方法(open、close、invoke)。 如MapFunction对应RichMapFunction。
4. Sink
Flink对外输出操作必须利用Sink完成(addSink(new SinkFunction(){})),print()实际调用的也是DataStreamSink方法,此外,官方提供了一部分框架的Sink。(Kafka提供了Source和Sink) (1)Kafka
代码语言:javascript复制<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.11_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
代码语言:javascript复制env.addSink(new FlinkKafkaConsumer011[String]("${id:port}", "brokerList", new SimpleStringSchema()))
// 到这里就实现了Kafka进,Kafka出
(2)Redis
代码语言:javascript复制<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>flink-connector-redis-2.11</artifactId>
<version>${bahir.version}</version>
</dependency>
(3)ES
代码语言:javascript复制<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch6_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
(4)MySQL(JDBC连接)
代码语言:javascript复制<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>${mysql.version}</version>
</dependency>
需要自定义RichSinkFunction(),仅在初始化时调用连接
二、Flink Window API
1. Window概念
将无界数据流切分为有界数据流集进行处理,窗口(window)就是切分无界流的一种方式,将流数据分发到有限大小的桶(bucket)中进行分析。
(1)类型 Time Window:
- 滚动时间窗口(Tumbling Windows) 将数据依据固定的窗口长度 windows size 1个参数对数据进行切分,时间对齐,窗口长度固定,没有重叠。
- 滑动时间窗口(Sliding Windows) 由固定的窗口长度 windows size 和滑动间隔 slice 2个参数组成 ,窗口长度固定,可以有重叠。当滑动间距等于窗口长度时为滚动时间窗口。(同一个数据可能属于不同的窗口)
- 会话窗口(Session Windows) 由一系列事件组合一个指定时间长度的timeout间隙组成,即一段时间没有接收到新的数据就会生成新的窗口。时间无对齐。(无计数窗口,因为不能舍弃一段数据)
Count Window
- 滚动计数窗口
- 滑动计数窗口
2. Window API
窗口分配器window()
方法,必须在keyBy之后才能用,再做聚合操作。flink还提供了.timeWindow
和.countWindow
方法。
(1)WindowAssigner window()方法接收的参数是一个WindowAssigner。 Flink提供了: 滚动窗口(.timeWindow(Time.secounds(15))); 滑动窗口(.timeWindow(Time.secounds(15), Time.secounds(15))); 会话窗口(.window(EventTimeSessionWindows.withGap(Time.minutes(10)))); 全局窗口(一个无界流); 滚动计数窗口(.countWindow(5)); 滑动计数窗口(.countWindow(10, 2))。
(2)WindowFunction 定义了要对窗口中收集的数据做的计算操作。
- 增量聚合函数(incremental aggregation functions) 每条数据到来就进行计算,保持一个简单的状态,计算内容简单。ReduceFunction、AggregateFunction。
- 全窗口函数(full window functions) 先把窗口所有数据收集起来,等到计算的时候会遍历所有数据。ProcessWindowFunction。
(3)其他可选API .trigger():触发器,定义window什么时候关闭,触发计算并输出结果。 .evitor():移除器,定义移除某些数据的逻辑。 .allowedLateness():允许处理迟到(窗口关闭后)的数据。 .sideOutputLateData():将迟到的数据放入侧输出流。 .getSideOutPut():获取侧输出流。