flink exactly-once系列之事务性输出实现

2022-04-18 11:27:35 浏览数 (1)

flink exactly-once系列目录: 一、两阶段提交概述 二、两阶段提交实现分析 三、StreamingFileSink分析 四、事务性输出实现 五、最终一致性实现 前几篇分析到Flink 是可以通过状态与checkpoint机制实现内部Exactly-Once 的语义,对于端到端的Exactly-Once语义,Flink 通过两阶段提交方式提供了对Kafka/HDFS输出支持,两阶段提交实现是结合checkpoint流程提供的hook来实现的,实现CheckpointedFunction与CheckpointListener接口: 1. initializeState 方法里面做事务状态的恢复与重新提交 2. snapshotState 方法里面开启事务与将需要输出的数据写到状态中容错 3. notifyCheckpointComplete方法提交事务 使用flink自带的实现要求继承TwoPhaseCommitSinkFunction类,并且实现beginTransaction、preCommit、commit、abort这几个方法,虽然说使用起来很方便,但是其有一个限制那就是所提供的事务hook(比喻Connection)能够被序列化,并且反序列化之后能够继续提交之前的事务,这个对于很多事务性的数据库是无法做到的,所以需要实现一套特有的事务提交。 之前分析到两阶段提交的主要问题是在第二阶段,commit有可能会存在部分成功与部分失败,所以才有了事务容错恢复,提交失败的重启继续提交,提交成功的重启再次提交是幂等的不会影响数据的结果,现在没有了这样一个可序列化的事务hook,另外需要提交的数据也做了状态容错。但是Flink 在checkpoint机制中提供了一个唯一的标识checkpointId, 它是用户可访问的、单调递增的、容错的,任务失败之后会从最近一次成功点继续递增,那么就可以使用checkpointId 来作为事务提交的句柄,首先看一下逻辑流程:

1. invoke 方法:将需要提交的数据添加到内存List中 2. snapshotState方法:将checkpointId与list存放在状态中 3. notifyCheckpointComplete方法:将list与checkpointId做事务性提交,并且使用checkpointId做CAS机制 4. initializeState方法:从状态中恢复checkpointId与list数据,同样做事务性提交

代码实现:

代码语言:javascript复制
public abstract class CommonTwoPhaseCommit<IN extends Serializable> extends RichSinkFunction<IN>

        implements CheckpointedFunction, CheckpointListener {





    private long checkpointId;

    private List<IN> dataList;



    private ListState<IN> dataListState;

    private ListState<Long> checkpointIdState;





    @Override public void initializeState(FunctionInitializationContext context) throws Exception {



        dataList=new ArrayList<>();

        dataListState=context.getOperatorStateStore().getSerializableListState("listdata");

        checkpointIdState=context.getOperatorStateStore().getListState(new ListStateDescriptor<Long>("checkpointI",Long.class));

        if(context.isRestored())

        {

            dataListState.get().forEach(x->{

                dataList.add(x);

            });

            Iterator<Long> ckIdIter=checkpointIdState.get().iterator();

            checkpointId=ckIdIter.next();

            commit(dataList,checkpointId);

        }

    }





    @Override public void invoke(IN value, Context context) throws Exception {

        dataList.add(value);

    }





    @Override public void snapshotState(FunctionSnapshotContext context) throws Exception {



        dataListState.clear();

        dataListState.addAll(dataList);

        dataList.clear();



        checkpointIdState.clear();

        checkpointId=context.getCheckpointId();

        checkpointIdState.addAll(Collections.singletonList(checkpointId));

    }



    @Override public void notifyCheckpointComplete(long checkpointId) throws Exception {

        commit(dataListState.get(),checkpointId);

    }



    /**

     * 使用checkpoint与数据库已经存在值进行比较,要求正好比其大1

     * @param data

     * @param checkpointId

     */

    public abstract void commit(Iterable<IN> data,long checkpointId);



}
 

那么只需要继承CommonTwoPhaseCommit 类,实现其commit方法,做事务提交即可。目前该方案用于对window窗口聚合的延时补偿处理中,输出端为MySql,后期将会研究对Redis等其他数据库如何做一致性处理。

0 人点赞