flink exactly-once系列目录: 一、两阶段提交概述 二、两阶段提交实现分析 三、StreamingFileSink分析 四、事务性输出实现 五、最终一致性实现 前几篇分析到Flink 是可以通过状态与checkpoint机制实现内部Exactly-Once 的语义,对于端到端的Exactly-Once语义,Flink 通过两阶段提交方式提供了对Kafka/HDFS输出支持,两阶段提交实现是结合checkpoint流程提供的hook来实现的,实现CheckpointedFunction与CheckpointListener接口: 1. initializeState 方法里面做事务状态的恢复与重新提交 2. snapshotState 方法里面开启事务与将需要输出的数据写到状态中容错 3. notifyCheckpointComplete方法提交事务 使用flink自带的实现要求继承TwoPhaseCommitSinkFunction类,并且实现beginTransaction、preCommit、commit、abort这几个方法,虽然说使用起来很方便,但是其有一个限制那就是所提供的事务hook(比喻Connection)能够被序列化,并且反序列化之后能够继续提交之前的事务,这个对于很多事务性的数据库是无法做到的,所以需要实现一套特有的事务提交。 之前分析到两阶段提交的主要问题是在第二阶段,commit有可能会存在部分成功与部分失败,所以才有了事务容错恢复,提交失败的重启继续提交,提交成功的重启再次提交是幂等的不会影响数据的结果,现在没有了这样一个可序列化的事务hook,另外需要提交的数据也做了状态容错。但是Flink 在checkpoint机制中提供了一个唯一的标识checkpointId, 它是用户可访问的、单调递增的、容错的,任务失败之后会从最近一次成功点继续递增,那么就可以使用checkpointId 来作为事务提交的句柄,首先看一下逻辑流程:
1. invoke 方法:将需要提交的数据添加到内存List中 2. snapshotState方法:将checkpointId与list存放在状态中 3. notifyCheckpointComplete方法:将list与checkpointId做事务性提交,并且使用checkpointId做CAS机制 4. initializeState方法:从状态中恢复checkpointId与list数据,同样做事务性提交
代码实现:
代码语言:javascript复制public abstract class CommonTwoPhaseCommit<IN extends Serializable> extends RichSinkFunction<IN>
implements CheckpointedFunction, CheckpointListener {
private long checkpointId;
private List<IN> dataList;
private ListState<IN> dataListState;
private ListState<Long> checkpointIdState;
@Override public void initializeState(FunctionInitializationContext context) throws Exception {
dataList=new ArrayList<>();
dataListState=context.getOperatorStateStore().getSerializableListState("listdata");
checkpointIdState=context.getOperatorStateStore().getListState(new ListStateDescriptor<Long>("checkpointI",Long.class));
if(context.isRestored())
{
dataListState.get().forEach(x->{
dataList.add(x);
});
Iterator<Long> ckIdIter=checkpointIdState.get().iterator();
checkpointId=ckIdIter.next();
commit(dataList,checkpointId);
}
}
@Override public void invoke(IN value, Context context) throws Exception {
dataList.add(value);
}
@Override public void snapshotState(FunctionSnapshotContext context) throws Exception {
dataListState.clear();
dataListState.addAll(dataList);
dataList.clear();
checkpointIdState.clear();
checkpointId=context.getCheckpointId();
checkpointIdState.addAll(Collections.singletonList(checkpointId));
}
@Override public void notifyCheckpointComplete(long checkpointId) throws Exception {
commit(dataListState.get(),checkpointId);
}
/**
* 使用checkpoint与数据库已经存在值进行比较,要求正好比其大1
* @param data
* @param checkpointId
*/
public abstract void commit(Iterable<IN> data,long checkpointId);
}
那么只需要继承CommonTwoPhaseCommit 类,实现其commit方法,做事务提交即可。目前该方案用于对window窗口聚合的延时补偿处理中,输出端为MySql,后期将会研究对Redis等其他数据库如何做一致性处理。