day03_Flink高级API
今日目标
- Flink的四大基石
- Flink窗口Window操作
- Flink时间Time
- Flink水印Watermark机制
- Flink的state状态管理-keyed state 和 operator state
Flink的四大基石
- Checkpoint 分布式一致性,解决数据丢失,故障恢复数据
- State 状态,分为Keyed State ,Operator State; 数据结构的角度来说 ValueState、ListState、MapState,BroadcastState
- Time , EventTime事件时间、Ingestion摄取时间、Process处理时间
- Window窗口,TimeWindow 、 countwindow、 sessionwindow
Window操作
Window分类
- time
- 用的比较多 滚动窗口和滑动窗口
- count
如何使用
案例
- 需求
/**
* Author itcast
* Date 2021/5/7 9:13
* 有如下数据表示:
* 信号灯编号和通过该信号灯的车的数量
* 9,3
* 9,2
* 9,7
* 4,9
* 2,6
* 1,5
* 2,3
* 5,7
* 5,4
* 需求1:每5秒钟统计一次,最近5秒钟内,各个路口通过红绿灯汽车的数量--基于时间的滚动窗口
* 需求2:每5秒钟统计一次,最近10秒钟内,各个路口通过红绿灯汽车的数量--基于时间的滑动窗口
*/
Time 时间
- EventTime的重要性
- 防止出现网络抖动,造成数据的乱序,数据统计的丢失
- 窗口: 开始时间-结束时间
watermark 水印时间
- watermark 水印机制
- watermark 就是时间戳
- watermark = eventTime - maxDelayTime
- 触发计算 watermak >= 结束时间
watermark 案例
Allowed lateness
- 案例 有订单数据,格式为: (订单ID,用户ID,时间戳/事件时间,订单金额) 要求每隔5s,计算5秒内,每个用户的订单总金额 并添加Watermark来解决一定程度上的数据延迟和数据乱序问题。
package cn.itcast.sz22.day03;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.OutputTag;
import java.time.Duration;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
/**
* Author itcast
* Date 2021/5/7 14:51
* 有订单数据,格式为: (订单ID,用户ID,时间戳/事件时间,订单金额)
* 要求每隔5s,计算5秒内,每个用户的订单总金额
* 并添加Watermark来解决一定程度上的数据延迟和数据乱序问题。
*/
public class WatermarkDemo03 {
public static void main(String[] args) throws Exception {
//1.env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//2.Source
//模拟实时订单数据(数据有延迟和乱序)
DataStreamSource<Order> orderDS = env.addSource(new SourceFunction<Order>() {
private boolean flag = true;
@Override
public void run(SourceContext<Order> ctx) throws Exception {
Random random = new Random();
while (flag) {
String orderId = UUID.randomUUID().toString();
int userId = random.nextInt(3);
int money = random.nextInt(100);
//模拟数据延迟和乱序!
long eventTime = System.currentTimeMillis() - random.nextInt(15) * 1000;
ctx.collect(new Order(orderId, userId, money, eventTime));
TimeUnit.SECONDS.sleep(1);
}
}
@Override
public void cancel() {
flag = false;
}
});
OutputTag<Order> oot = new OutputTag<Order>("maxDelayOrder", TypeInformation.of(Order.class));
//分配水印机制 eventTime 默认使用 maxDelay 3秒
SingleOutputStreamOperator<Order> result = orderDS.assignTimestampsAndWatermarks(WatermarkStrategy
.<Order>forBoundedOutOfOrderness(Duration.ofSeconds(3))
.withTimestampAssigner((element, recordTimestamp) -> element.getEventTime()))
.keyBy(t -> t.getUserId())
//窗口设置 每隔5s,计算5秒内
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
//实例化侧输出流 主要用于晚于最大延迟 3 秒的数据
.allowedLateness(Time.seconds(3))
.sideOutputLateData(oot)
//统计
.sum("money");
result.print("正常数据");
result.getSideOutput(oot).print("严重迟到的数据");
env.execute();
}
@Data
@AllArgsConstructor
@NoArgsConstructor
public static class Order {
private String orderId;
private Integer userId;
private Integer money;
private Long eventTime;
}
}
状态管理 state
- 有状态管理场景
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-znYxlAeB-1624261970363)(assets/image-20210507151242102.png)]
- 是否被Flink托管分为两类
- managed state 通过Flink自身进行状态的管理 数据结构: valueState ListState mapState
- raw state 需要用户、程序员自己维护状态 数据结构: ListState
- 是否基于 key 进行state 管理
- keyed state 数据结构: valueState ListState mapState reducingState
- operator state 数据结构: ListState
Keyed state
- 案例 - 使用KeyState中的ValueState获取数据中的最大值(实际中直接使用maxBy即可),使用值状态自定义,输入Tuple2单词/, Long/长度/> 输出 Tuple3单词/, Long/长度/, Long/历史最大值/> 类型
- map映射
- 定义valueState 用于统计当前的 历史最大值
- 输出 Tuple3单词/, Long/长度/, Long/历史最大值/>
package cn.itcast.sz22.day03;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
* Author itcast
* Date 2021/5/7 15:58
* 使用KeyState中的ValueState获取数据中的最大值(实际中直接使用maxBy即可),使用值状态自定义.
*/
public class StateDemo01 {
public static void main(String[] args) throws Exception {
//1.env 设置并发度为1
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//2.Source 参看课件
DataStreamSource<Tuple2<String, Long>> tupleDS = env.fromElements(
Tuple2.of("北京", 1L),
Tuple2.of("上海", 2L),
Tuple2.of("北京", 6L),
Tuple2.of("上海", 8L),
Tuple2.of("北京", 3L),
Tuple2.of("上海", 4L)
);
//3.Transformation
//使用KeyState中的ValueState获取流数据中的最大值(实际中直接使用maxBy即可)
//实现方式1:直接使用maxBy--开发中使用该方式即可
DataStream<Tuple2<String, Long>> result1 = tupleDS.keyBy(t -> t.f0)
.maxBy(1);
//min只会求出最小的那个字段,其他的字段不管
//minBy会求出最小的那个字段和对应的其他的字段
//max只会求出最大的那个字段,其他的字段不管
//maxBy会求出最大的那个字段和对应的其他的字段
//实现方式2:通过managed state输入的state
//Tuple2 输出 Tuple3
SingleOutputStreamOperator<Tuple3<String, Long, Long>> maxCount = tupleDS.keyBy(t -> t.f0)
.map(new RichMapFunction<Tuple2<String, Long>, Tuple3<String, Long, Long>>() {
//保存当前内存中最大的值的state
private transient ValueState<Long> currentMaxValue;
@Override
public void open(Configuration parameters) throws Exception {
//存储到内存中的数据结构的描述
ValueStateDescriptor desc = new ValueStateDescriptor("maxCount", TypeInformation.of(Long.class));
currentMaxValue = getRuntimeContext().getState(desc);
}
@Override
public Tuple3<String, Long, Long> map(Tuple2<String, Long> value) throws Exception {
String city = value.f0;
Long currentValue = value.f1;
if (currentMaxValue.value() == null || currentMaxValue.value() < currentValue) {
currentMaxValue.update(currentValue);
return Tuple3.of(city, currentValue, currentMaxValue.value());
} else {
return Tuple3.of(city, currentValue, currentMaxValue.value());
}
}
});
//3.1.先根据字符串f0分组然后进行 map 操作,将Tuple2 输出 Tuple3
//-1.定义值类型的状态用来存储最大值
//3.2.重写 RichMapFunction 的open 方法
//-2.定义状态描述符
//-3.从当前上下文获取内存中的状态值
//3.3.重写 map 方法
//-4.获取state中历史最大值value和当前元素的最大值并比较
//-5.如果当前值大或历史值为空更新状态;返回Tuple3元祖结果
//4.Sink 打印输出
// result1.printToErr();
maxCount.print();
//5.execute 执行环境
env.execute();
}
}
operate state
- 大多数场景就是读取 source ,用到数据结构 ListState
- 案例 - 使用ListState存储offset模拟Kafka的offset维护
package cn.itcast.sz22.day03;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
/**
* Author itcast
* Date 2021/5/7 16:59
* 使用ListState存储offset模拟Kafka的offset维护
*/
public class OperatorStateDemo01 {
public static void main(String[] args) throws Exception {
//1.env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//先直接使用下面的代码设置Checkpoint时间间隔和磁盘路径以及代码遇到异常后的重启策略,下午会学
env.enableCheckpointing(1000);//每隔 1s 执行一次Checkpoint
//将全局的状态保存到哪里? hdfs://node1:8020/checkpoint/
env.setStateBackend(new FsStateBackend("file:///D:/ckp"));
//当前任务被取消,checkpoint是否被保存下来
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
//当前checkpoint 机制 EXACTLY_ONCE
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
//固定延迟重启策略: 程序出现异常的时候,重启2次,每次延迟3秒钟重启,超过2次,程序退出
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 3000));
//2.Source
DataStreamSource<String> sourceData = env.addSource(new MyMoniKafkaSource());
//3.Transformation
//4.Sink
sourceData.print();
//5.execute
env.execute();
}
//1.创建类 MyMoniKafkaSource 继承 RichparallelSourceFunction 并实现 CheckpointedFunction
public static class MyMoniKafkaSource extends RichParallelSourceFunction<String> implements CheckpointedFunction{
//1.1. 定义ListState用于存储 offsetState、offset、flag
ListState<Long> offsetState;
Long offset = 0L;
boolean flag = true;
//1.2. 重写 initializeState 方法
// //创建List状态描述器
// //根据状态描述器初始化状态通过context
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
ListStateDescriptor<Long> desc = new ListStateDescriptor<>("offsetState",
TypeInformation.of(Long.class));
offsetState = context.getOperatorStateStore()
.getListState(desc);
}
@Override
public void run(SourceContext<String> ctx) throws Exception {
//获取并迭代ListState中的值,如果存在赋值给offset
Iterable<Long> offsets = offsetState.get();
if(offsets.iterator().hasNext()){
offset = offsets.iterator().next();
}
// //while(flag)
while(flag){
//将处理的offset累加1、获取当前子任务的Index
offset = 1;
//ctx收集id和offset("分区:" id "消费到offset的位置为:" offset
int id = getRuntimeContext().getIndexOfThisSubtask();
ctx.collect("分区:" id "消费到offset的位置为:" offset);
Thread.sleep(2000);
//)并输出
// //休眠2秒,此时保存state到checkpoint
// //模拟异常 每5秒钟抛出异常,看后续offset是否还能恢复
if(offset%5==0){
System.out.println("当前程序出现bug");
throw new Exception("当前程序出现bug");
}
}
}
@Override
public void cancel() {
flag = false;
}
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
//清空内存offsetState中存储的offset
offsetState.clear();
//添加offset到state中
offsetState.add(offset);
}
}
}
set的位置为:" offset int id = getRuntimeContext().getIndexOfThisSubtask(); ctx.collect(“分区:” id “消费到offset的位置为:” offset); Thread.sleep(2000); //)并输出 // //休眠2秒,此时保存state到checkpoint // //模拟异常 每5秒钟抛出异常,看后续offset是否还能恢复 if(offset%5==0){ System.out.println(“当前程序出现bug”); throw new Exception(“当前程序出现bug”); } } }
代码语言:javascript复制 @Override
public void cancel() {
flag = false;
}
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
//清空内存offsetState中存储的offset
offsetState.clear();
//添加offset到state中
offsetState.add(offset);
}
}
}