flink时间系统系列篇幅目录:
一、时间系统概述介绍
二、Processing Time源码分析
三、Event Time源码分析
四、时间系统在窗口函数中的应用分析
五、ProcessFunction 使用分析
六、实例讲解:如何做定时输出
今天为大家带来flink时间系统系列最后一篇实战篇,同样也是查漏补缺篇:如何做定时输出,首先说一下定时输出的需求背景,在flink流处理中需要将任务处理的结果数据定时输出到外部存储中例如mysql/hbase等,如果我们单条输出就可能会造成对外部存储造成较大的压力,首先我们想到的批量输出,就是当需要输出的数据累计到一定大小然后批量写入外部存储,这种方式在flink 官方文档的operator state篇其实给了很好的实践例子,实现了批量输出并且对内存中缓存的数据做了state容错机制,保证数据不会丢失,但是同样存在这样的场景:某些业务可能有高低峰期,在高峰的时候,批量输出在外部存储中可以查到结果数据,但是在业务低峰期可能很长时间都满足输出条件,导致的结果是很长时间都看不到结果数据,这个时候就需要做定时输出。
先来看下一下大家可能想到的几种定时输出方案:
一、在sinkFunction 里面做一个定时器定时将内存的数据输出到外部存储,但是这里有两点需要考虑:a. 自己做定时器是一个异步执行过程,如果抛出异常是否能够被flink检测到并且使任务失败(经过实际测试是不能的);b. 异步输出所读取的数据与invoke里面写入的数据是存储在同于个存储(List)里面的,这里又要考虑线程安全的问题,那么需要做数据同步;
二、由于checkpoint是周期性的执行,那么我们可以利用在任务每次checkpoint的时候做一次数据检查将数据写入外部存储,也就是在CheckpointedFunction.snapshotState 方法中将数据输出,但是这种方式必须与checkpoint的时间同步,缺乏灵活性。
三、使用KeyedProcessFunction 来实现,在KeyedProcessFunction 可以使用flink提供的定时机制完成,但是有一个限制就是只针对KeyedStream流处理,在通常情况下输出的是一个DataStream.
由以上几种方案的弊端可知,要实现定时输出功能需要考虑以下几点:
1. 定时提供给用户灵活可配置
2. 缓存的数据必须提供容错
3. 定时输出错误必须能够抛出给flink
4. 定时输出读取的数据与invoke处理的数据同步性
5. 满足DataStream类型流输出
对于第一点很好实现做成参数配置即可,第二点缓存数据容错使用flink状态容错机制即可,重点看第三、四点。
首先声明一点定时输出是一个ProcessingTime的定时,在来看第三点异常捕获,在flink注册处理时间定时器所触发的定时处理同样是一个异步线程完成,那么在这里面是如何做到异步异常获取的,查看触发位置SystemProcessingTimeService.TriggerTask,
可以查看这里抛出的异常由一个AsyncExceptionHandler类型的exceptionHandler对象处理, 追踪其来源可发现AsyncExceptionHandler是有StreamTask 实现传入进来,也就是当定时调用出现异常会调用StreamTask.handleAsyncException ,而该方法可以使任务抛出异常并且失败;在看第四点,正常invoke处理数据,定时也处理数据,那么必然涉及到状态的切换,细心的同学可以看到在触发定时调用时使用了synchronizd(lock) 同步锁,追踪其来源可以找到lock 表示的就是StreamTask 中的lock对象,而这个对象又会传给StreamInputProcessor(可任务是数据处理的入口,会调用StreamOperator处理数据),而这里每次处理数据的时候也会使用synchronizd(lock) ,
到这里我想大家都应该明白了,正常的数据流处理与定时逻辑处理只能同时有一个进行,那么就解决key切换带来状态操作问题,同时也为我们提供的解决思路,使用flink自带定时来帮助我们完成定时输出处理。
使用flink自带定时功能,首先我们得能够获取到ProcessingTimeService这个对象,但是该对象的获取只能在AbstractStreamOperator通过getProcessingTimeService方法获取到,那么我们可以自定义一个StreamOperator 继承AbstractStreamOperator,首先看下代码实现:
代码语言:javascript复制public abstract class CommonSinkOperator<T extendsSerializable> extends AbstractStreamOperator<Object>implements ProcessingTimeCallback, OneInputStreamOperator<T, Object> {
private List<T> list;
private ListState<T> listState;
private int batchSize;
private long interval;
private ProcessingTimeService processingTimeService;
public CommonSinkOperator() {
}
public CommonSinkOperator(int batchSize, long interval){ this.chainingStrategy = ChainingStrategy.ALWAYS;
this.batchSize = batchSize;
this.interval = interval;
}
@Override public void open() throws Exception {
super.open();
if (interval > 0 && batchSize > 1) {
processingTimeService = getProcessingTimeService();
long now=processingTimeService.getCurrentProcessingTime();
processingTimeService.registerTimer(now interval, this);
}
}
@Override public void initializeState(StateInitializationContext context) throws Exception{ super.initializeState(context);
this.list = new ArrayList<T>();
listState = context.getOperatorStateStore() .getSerializableListState("batch-interval-sink"); if (context.isRestored()) {
listState.get().forEach(x -> {
list.add(x);
});
}
}
@Override public void processElement(StreamRecord<T> element) throws Exception {
list.add(element.getValue());
if (list.size() >= batchSize) {
saveRecords(list);
}
}
@Override public void snapshotState(StateSnapshotContext context) throws Exception { super.snapshotState(context);
if (list.size() > 0) {
listState.clear();
listState.addAll(list);
}
}
@Override public void onProcessingTime(long timestamp)throws Exception { if (list.size() > 0) {
saveRecords(list);
list.clear();
}
long now = processingTimeService.getCurrentProcessingTime();
processingTimeService.registerTimer(now interval, this);
}
public abstract void saveRecords(List<T> datas);
}
定义一个CommonSinkOperator的抽象类,继承AbstractStreamOperator,并且实现ProcessingTimeCallback与OneInputStreamOperator接口,那么看下类里面的方法,
- 首先看构造函数传入批写大小batchSize与定时写入时间interval
- 重载了AbstractStreamOperator的open方法,在这个方法里面获取ProcessingTimeService,然后注册一个interval时长的定时器
- 重载AbstractStreamOperator的initializeState方法,用于恢复内存数据
- 重载AbstractStreamOperator的snapshotState方法,在checkpoint会将内存数据写入状态中容错
- 实现了OneInputStreamOperator接口的processElement方法,将结果数据写入到内存中,如果满足一定大小则输出
- 实现了ProcessingTimeCallback接口的onProcessingTime方法,注册定时器的执行方法,进行数据输出的同时,注册下一个定时器
- 定义了一个抽象saveRecords方法,实际输出操作
那么这个CommonSinkOperator就是一个模板方法,能够做到将任何类型的数据输出,只需要继承该类,并且实现saveRecords方法即可。
在这里也贴上一个测试的例子:
调用:
以上整个就是关于flink定时定量输出的实例分析,希望对大家有所帮助。