Flink 的算子函数和spark的大致一样,但是由于其是流处理的模式,所有还要有需要加强理解的地方
Flink 中 和spark算子一致的算子
- Map, FlaMap 做一对一,一对多映射
- Reuce 多对一进行聚合
- 聚合函数,sum,min,minBy,MaxBy 等
- keyBy 按Key进行分组 名字不一样但是操作一样。
Flink 特有的或需要重新理解的算子
- 窗口函数: 窗口函数用于对每一个key开窗口,windowsAll 全体元素开窗口
text.keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(5)))
text.windowAll(TumblingEventTimeWindows.of(Time.seconds(5)))
窗口函数实际上分为滚动时间窗口,滑动时间窗口,会话窗口
滚动时间窗口不会发生重叠, 滑动时间窗口,当步长小于窗口大小,就会重叠。
会话窗口是根据相邻时间间隔确定窗口边界
全局窗口必须定义触发器
在窗口内也可以进行其他的操作
- 窗口连接
两个数据源相同窗口内的连接
代码语言:javascript复制text.join(windowCounts)
.where()
.equalTo()
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.apply((e1,e2) => e1 "," e2)
代码语言:javascript复制0 1 2
0 1 2 3
0,1 0,1 1,0 1,0 2,2 3,2 一个窗口一个窗口内连接
间隔连接,以一个元素去连接一个窗口形成锥形
代码语言:javascript复制 text.keyBy(0).intervalJoin(windowCounts.keyBy(0))
.between(Time.milliseconds(-2),Time.milliseconds(2))
.process(new ProcessJoinFunction[Integer,Integer,String] {
override def processElement(in1: Integer, in2: Integer, context: ProcessJoinFunction[Integer, Integer, String]#Context, collector: Collector[String]): Unit = {
collector.collect(in1 "," in2)
}
})
```
```
0 1 6
0 2 3
以2 进行聚合 2,0 2,1
- 数据分区
数据分区的好处是,如果分区数和算子数一致,则他们会直接运行到一个节点,通过内存进行传输,减少网络带宽的压力
自定义分区 :
代码语言:javascript复制text.partitionCustom(partitioner,"key")
使用shuffle() 进行均匀分区
代码语言:javascript复制text.shuffle()`
使用负载均衡的轮询调度算法进行数据分区
代码语言:javascript复制text.rebalance
可伸缩动态分区,使数据尽可能在一个slot内流转,减少网络开销
代码语言:javascript复制dataStream.rescale()
广播分区,每一个元素广播到下一个节点
代码语言:javascript复制text.broadcast()
- 资源共享
Flink 将多个任务连接成一个任务在一个线程中执行,以实现资源共享
(1) 创建链, 开启作业优化
代码语言:javascript复制dataStream.map(..).map(...).startNewChain().map(...)
(2) Slot共享组
在同一个组所有任务在同一个实例中运行
代码语言:javascript复制dataStream.map(...).slotSharingGroup("name")
(3) 关闭作业优化
代码语言:javascript复制dataStream.map(...).disableChaining()
- RichFunction函数
处理函数生命周期和获取函数上下文能力的算子
代码语言:javascript复制@Public
public abstract class AbstractRichFunction implements RichFunction, Serializable {
private static final long serialVersionUID = 1L;
private transient RuntimeContext runtimeContext;
public AbstractRichFunction() {
}
public void setRuntimeContext(RuntimeContext t) {
this.runtimeContext = t;
}
// 得到上下文函数
public RuntimeContext getRuntimeContext() {
if (this.runtimeContext != null) {
return this.runtimeContext;
} else {
throw new IllegalStateException("The runtime context has not been initialized.");
}
}
public IterationRuntimeContext getIterationRuntimeContext() {
if (this.runtimeContext == null) {
throw new IllegalStateException("The runtime context has not been initialized.");
} else if (this.runtimeContext instanceof IterationRuntimeContext) {
return (IterationRuntimeContext)this.runtimeContext;
} else {
throw new IllegalStateException("This stub is not part of an iteration step function.");
}
}
// 自定义初始化和销毁函数
public void open(Configuration parameters) throws Exception {
}
public void close() throws Exception {
}
}
- 触发器
基于事件的触发器
(1)onElement 窗口没收到一个元素,调用该方法
(2)onProcessingTime 根据注册处理时间进行触发,定时可以参数设定
(3)onEventTime 根据注册事件时间进行触发,定时可以参数设定
(4)onMerge 两个窗口合并时触发
- 清除器
在触发器后函数执行窗口前或者后执行清除的操作
evictor()可以在触发器后,窗口执行前或者后都可以触发
- 状态分类
val env = StreamExecutionEnvironment.getExecutionEnvironment()
env.setStateBackend(...)
设置状态后端,内存,JVM堆内存,JVM堆外内存,
9.检查点
检查点是Flink实现 exactly-once 语义的核心机制,启用检测点,需要:
(1) 支持时空穿梭的外部数据源, kafka 和 分布式文件系统
(2) 可持久化状态的外部存储, 如分布式文件系统。
检查点默认是关闭的,启用检查点需要配置 一致性级别, exactly-once
检测超时时间,
Kafka进行流计算实例
- 创建连接器
添加kafka source
代码语言:javascript复制 // 设置配置文件
val properties = new Properties()
properties.setProperty("bootstraps.servers","localhost:9092")
properties.setProperty("zookeeper.connect","localhost:2181")
// 设置消费组
properties.setProperty("group.id","test")
val myConsumer = new FlinkKafkaConsumer09[String]("topic",
new SimpleStringSchema(),properties)
stream = env.addSource(myConsumer)
- 创建反序列化器
// https://mvnrepository.com/artifact/org.apache.flink/flink-avro
compile group: 'org.apache.flink', name: 'flink-avro', version: '1.7.1'
- 设置消息起始位置的偏移 设置 据上一次的偏移位置
myConsumer.setStartFromGroupOffsets()
// 从最早和最晚开始记录
代码语言:javascript复制myConsumer.setStartFromEarliest()
myConsumer.setStartFromLatest()
// 从固定的时间点开始
代码语言:javascript复制myConsumer.setStartFromTimestamp(23L)
// 设置分区的起始位置
代码语言:javascript复制val specificStartOffsets = new java.util.HashMap[]
- 设置检查点
env.enableCheckpointing(5000)