Flink之状态编程

2023-10-20 08:30:01 浏览数 (2)

摘要本文将从状态的概念入手,详细介绍 Flink 中的状态分类、状态的使用、持久化及状态后端的配置。

一、Flink状态概念

Flink的处理机制核心:有状态的流式计算,那么什么是有状态,什么是无状态呢?

在流式处理中,数据是连续不断的到来和处理的,每个任务在计算的时候,可以基于当前数据直接转换就能得到结果如map,filter(无状态), 也可以是依赖上一个数据才能得到结果,这个时候我们就需要将上一个结果记录下来如sum(有状态)。

下面的几个场景都需要使用流处理的状态功能: 1、数据流中的数据有重复,我们想对重复数据去重,需要记录哪些数据已经流入过应用,当新数据流入时,根据已流入过的数据来判断去重。 2、检查输入流是否符合某个特定的模式,需要将之前流入的元素以状态的形式缓存下来。比如,判断一个温度传感器数据流中的温度是否在持续上升。 3、对一个时间窗口内的数据进行聚合分析,分析一个小时内某项指标的75分位或99分位的数值。

有状态的算子处理流程如下: 1、接收到上游数据 2、通过上下文获取当前状态 3、根据业务逻辑计算,更新状态 4、将处理结果输出给下游

Flink的算子任务,可以设置并行度,从而在不同的slot运行多个实例,我们把这个实例叫成“并行子任务”或者“算子子任务”。

二、状态分类

1、托管状态(推荐):由flink统一管理 存储、故障恢复、重组等 2、原始状态: 需要我们自定义,一般不用除非托管搞不定

重点介绍托管状态 我们知道 Flink一个算子任务,可以分为多个并行子任务,分配在不同的任务槽(task slot)中运行,而这些slot的计算资源是物理隔离的, 所以flink管理的的状态是在不同的并行子任务是无法共享的,基于这个想法我们可以将状态分为 算子状态和按键状态

算子状态:状态的作用在一个并行子任务,也就是一个算子子任务,所有这个子任务处理的数据共享一个状态 按键状态:我们的流可以根据keyby进行分组成keyedStream,这个时候同一个key共享一个状态

值得注意的是无论是keyed state还是operator state,他们都是在本地实例上进行维护的,也就是说每一个并行子任务维护着对应的状态 算子子任务之间的状态并不能共享。

算子状态的实际应用场景不如 Keyed State 多,一般用在 Source 或 Sink 等与外部系统连接的算子上,或者完全没有 key 定义的场景。比如 Flink 的 Kafka 连接器中,就用到了算子状态。在我们给 Source 算子设置并行度后,Kafka 消费者的每一个并行实例,都会为对应的主题(topic)分区维护一个偏移量, 作为算子状态保存起来。这在保证 Flink 应用“精确一次”(exactly-once)状态一致性时非常有用。

三、状态数据结构

按键状态数据结构分为5种: 1、值状态(ValueState) 2、列表状态(ListState) 3、映射状态(MapState) 4、归约状态(ReducingState) 5、聚合状态(AggregatingState)

算子状态数据结构分为3种 1、列表状态(ListState) 2、联合列表状态(UnionListState) 3、广播状态(BroadcastState): 有时我们希望算子并行子任务都保持同一份“全局”状态,用来做统一的配置和规则设定。这时所有分区的所有数据都会访问到同一个状态,状态就像被“广播”到所有分区一样,这种特殊的算子状态,就叫作广播状态(BroadcastState)。

状态的注册,主要是通过“状态描述器”(StateDescriptor)来实现的。状态描述器中最重要的内容,就是状态的名称(name)和类型(type)。

代码语言:javascript复制
public void open(Configuration parameters) throws Exception {
lastTemperatureValueState = getRuntimeContext()
.getState(new ValueStateDescriptor("last-temp", Double.class));
}

一般来说我们在生命周期方法.open()中获取状态对象。但这个变量不应该在 open 中声明——应该在外面直接把它定义为类的属性, 这样就可以在不同的方法中通用了。而在外部又不能直接获取状态,因为编译时是无法拿到运行时上下文的。 所以最终的解决方案就变成了:在外部声明状态对象,在 open 生命周期方法中通过运行时上下文获取状态。

四、状态具体使用demo

代码语言:javascript复制
import dto.SensorReadingDTO;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import util.DateUtil;
//如果传感器的温度差大于10度就预警
//使用状态记录上一次的状态
public class Status_1_KeyedState {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment
.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<String> streamSource = env.socketTextStream("127.0.0.1", 10008);
DataStream<SensorReadingDTO> dataStream = streamSource
.map(new MapFunction<String, SensorReadingDTO>() {
@Override
public SensorReadingDTO map(String input) throws Exception {
if (StringUtils.isNotBlank(input)) {
String[] infoArray = input.split(",");
SensorReadingDTO sensorReadingDTO = new SensorReadingDTO();
sensorReadingDTO.setId(infoArray[0]);
sensorReadingDTO.setTimestamp(Long.valueOf(infoArray[1]) * 1000);
sensorReadingDTO.setTemperature(Double.valueOf(infoArray[2]));
sensorReadingDTO.setTimestampStr(DateUtil.format(sensorReadingDTO.getTimestamp()));
return sensorReadingDTO;
}
return null;
}
});
//使用flatMap 可以输出0,1个或多个,没有超过10度的,就不要输出
//但是使用map 只能输入1个,必须输出一个,所以不符合
DataStream<Tuple3<String,Double,Double>> checkDataStream = dataStream.keyBy(SensorReadingDTO::getId)
.flatMap(new MyMapper(Double.valueOf(10)));
checkDataStream.print();
env.execute();
}
public static class MyMapper extends RichFlatMapFunction<SensorReadingDTO, Tuple3<String,Double,Double>> {
private ValueState<Double> lastTemperatureValueState;
private final Double threshold;
public MyMapper(Double threshold) {
this.threshold = threshold;
}
@Override
public void open(Configuration parameters) throws Exception {
lastTemperatureValueState = getRuntimeContext()
.getState(new ValueStateDescriptor("last-temp", Double.class));
}
@Override
public void close() throws Exception {
//释放
lastTemperatureValueState.clear();
}
@Override
public void flatMap(SensorReadingDTO sensorReadingDTO, Collector<Tuple3<String,Double,Double>> out) throws Exception {
//第一次 为空记录当前温度
Double lastTemp = lastTemperatureValueState.value();
Double curTemp = sensorReadingDTO.getTemperature();
// 如果不为空,判断是否温差超过阈值,超过则报警
if (lastTemp != null) {
if (Math.abs(curTemp - lastTemp) >= threshold) {
out.collect(new Tuple3<>(sensorReadingDTO.getId(), lastTemp, curTemp));
}
}
// 更新保存的"上一次温度"
lastTemperatureValueState.update(curTemp);
}
}
}

五、状态后端

1、MemoryStateBackend 内存级的状态后端,会将键控状态作为内存中的对象进行管理,将它们存储在TaskManager的JVM堆上,而将checkpoint存储在JobManager的内存中 特点:快速、低延迟,但不稳定 2、FsStateBackend(默认) 将checkpoint存到远程的持久化文件系统(FileSystem)上,而对于本地状态,跟MemoryStateBackend一样,也会存在TaskManager的JVM堆上 同时拥有内存级的本地访问速度,和更好的容错保证 3、RocksDBStateBackend 将所有状态序列化后,存入本地的RocksDB中存储

0 人点赞