【万字长文】Flink cdc源码精讲(推荐收藏)

2022-05-17 16:18:52 浏览数 (1)

前言

flink-cdc源码地址 : https://github.com/ververica/flink-cdc-connectors flink-cdc不再flink项目中,在flink1.11之后flink引入cdc功能,下面我们以源码深入了解flink-cdc实现原理, 我们主要以flink-cdc-mysql为主,其余代码基本差不太多 事先需要先简单了解一下debezium相关原理,flink-cdc是基于debezium实现的

一点建议 :

  • 在阅读源码的时候,我们应该带着问题去思考,然后一步一步去阅读源码,在阅读源码的过程中,不要被一些不重要的点给占用过多的时间精力,并且一遍两遍是不会让我有一个清晰的印象的,毕竟别人多少年多少人的开发,看一两遍就可以理解的,在阅读某个框架源码之前,我们应该已经对该框架原理有一定的理解,然后根据我们的理解去验证他是代码实现的样子,或者带着思考去阅读,为什么这么实现,这么实现的好处是什么等.,其实代码都是一样的,只不过是每个人的实现方式不同,考虑的问题不同而已,
  • 要有一定的java基础,熟悉多线程,了解开发使用的相关接口(或者自己看了介绍之后很容易理解),如果基础不牢,更多的是建议先从基础学习,然后写一写代码测试,比如多线程的时候怎么做交互等,自己写一写,在后面阅读源码的时候会更容理解里面内容
  • 该内容要首先对cdc有一定的了解,知道cdc的相关原理,flink-cdc的实现基于debezium实现,该框架是开源的,可以先去了解一下,这样对于我们后面内容会更容易理解

谨记: 阅读的时候抓住重点!!!!!!!! , 不要被不重要的内容占用时间

一.项目结构(mysql-cdc为主)

1. 目录结构

  • 带有test项目都是用于测试的项目
  • 后缀带有cdc的表示一个database的连接器,区分sql与api形式
  • flink-format-changelog-json : 用于解析json成RowData的模块
  • flink-connector-debezium : 该模块封装debezium以及相关核心代码实现,并且修改了debezium的部分源码
  • 每个项目中都有test目录,里面有相关的测试代码,可以自行测试代码debug

2. mysql项目源码包结构

  • debezium : debezium用到的相关类
  • schema : mysql schema(表结构)相关代码
  • source : mysql-cdc source实现代码,包括全量读mysql,分割器,读取器等相关
  • table : cdc table实现代码主要以table dynamic factory的实现
  • resrouces : 该目录用与spi方式动态加载table factory,用于sql创建table找到对应的工厂类

二.mysql-cdc源码-SourceFucntion的单并行度的实现

  • 基于RichSourceFunction的,单并行读取 1,11之前的source接口,已被标记Deprecated
  • 基于Source的,多并行度,1.11之后新出的srouce接口,实现要更复杂

我们主要根据单并行度源码基于讲解,这样更方便理解

具体入手我们可以根据文档中的创建source的类来一点一点走

代码语言:javascript复制
MySqlSource通过构建者模式(23种设计模式)构建,我们只需要知道我们可以设置哪些参数即可,这个比较容易理解
 
 // 通过构建者方式配置任务启动时候所需要的参数
  public static class Builder<T> {// MysqlSourcen内部类

       private int port = 3306; // default 3306 port
       private String hostname;
       private String[] databaseList;
       private String username;
       private String password;
       private Integer serverId;
       private String serverTimeZone; //时区
       private String[] tableList;
       private Properties dbzProperties; // 传入的dbz引擎所需的属性
       private StartupOptions startupOptions = StartupOptions.initial(); // 用于控制开始binlog开始消费位置的参数
       private DebeziumDeserializationSchema<T> deserializer; // 用于对数据解析成什么样子如json,String等,定义序列化方式
   
     //上面参数配置完成,通过build构建sourceFunctionn,主要将配置信息封装带properties中,这里面的参数主要是debezium所需要启动参数,配置信息等,如果想要了解可以去debezium官网查看参数的具体细节
    public DebeziumSourceFunction<T> build() {
           Properties props = new Properties();
           props.setProperty("connector.class", MySqlConnector.class.getCanonicalName());
           // hard code server name, because we don't need to distinguish it, docs:
           // Logical name that identifies and provides a namespace for the particular MySQL
           // database
           // server/cluster being monitored. The logical name should be unique across all other
           // connectors,
           // since it is used as a prefix for all Kafka topic names emanating from this connector.
           // Only alphanumeric characters and underscores should be used.
           props.setProperty("database.server.name", DATABASE_SERVER_NAME);
           props.setProperty("database.hostname", checkNotNull(hostname));
           props.setProperty("database.user", checkNotNull(username));
           props.setProperty("database.password", checkNotNull(password));
           props.setProperty("database.port", String.valueOf(port));
           props.setProperty("database.history.skip.unparseable.ddl", String.valueOf(true));
           // debezium use "long" mode to handle unsigned bigint by default,
           // but it'll cause lose of precise when the value is larger than 2^63,
           // so use "precise" mode to avoid it.
           props.put("bigint.unsigned.handling.mode", "precise");

           if (serverId != null) { props.setProperty("database.server.id", String.valueOf(serverId)); }
           if (databaseList != null) {  props.setProperty("database.whitelist", String.join(",", databaseList)); }
           if (tableList != null) { props.setProperty("table.whitelist", String.join(",", tableList));}
           if (serverTimeZone != null) {  props.setProperty("database.serverTimezone", serverTimeZone);   }

      // 判断开始消费位置,在sqlSourceBuilder中构建的参数,没有则为null
           DebeziumOffset specificOffset = null;
           switch (startupOptions.startupMode) {
               case INITIAL:
                   props.setProperty("snapshot.mode", "initial");
                   break;

               case EARLIEST_OFFSET:
                   props.setProperty("snapshot.mode", "never");
                   break;

               case LATEST_OFFSET:
                   props.setProperty("snapshot.mode", "schema_only");
                   break;

               case SPECIFIC_OFFSETS:      
                   props.setProperty("snapshot.mode", "schema_only_recovery");
                   specificOffset = new DebeziumOffset();
                   Map<String, String> sourcePartition = new HashMap<>();
                   sourcePartition.put("server", DATABASE_SERVER_NAME);
                   specificOffset.setSourcePartition(sourcePartition);

                   Map<String, Object> sourceOffset = new HashMap<>();
                   sourceOffset.put("file", startupOptions.specificOffsetFile);
                   sourceOffset.put("pos", startupOptions.specificOffsetPos);
                   specificOffset.setSourceOffset(sourceOffset);
                   break;

               case TIMESTAMP:
                   checkNotNull(deserializer);
                   props.setProperty("snapshot.mode", "never");
                   deserializer =
                           new SeekBinlogToTimestampFilter<>(
                                   startupOptions.startupTimestampMillis, deserializer);
                   break;

               default:
                   throw new UnsupportedOperationException();
          }

           if (dbzProperties != null) {
               props.putAll(dbzProperties);
               // Add default configurations for compatibility when set the legacy mysql connector
               // implementation
               if (LEGACY_IMPLEMENTATION_VALUE.equals(
                       dbzProperties.get(LEGACY_IMPLEMENTATION_KEY))) {
                   props.put("transforms", "snapshotasinsert");
                   props.put(
                           "transforms.snapshotasinsert.type",
                           "io.debezium.connector.mysql.transforms.ReadToInsertEvent");
              }
          }

       // 构建通用的cdc sourceFunction --> 基于richSourceFunction
           return new DebeziumSourceFunction<>(
                   deserializer, props, specificOffset,
             new MySqlValidator(props) // mysql校验器,版本信息,binlog是否为row等
          );
      }
  }

上面内容主要是以构建source所需要的参数为主,具体我们进入到DebeziumSourceFunction中看看具体实现

代码语言:javascript复制
// source代码,用于读取binlog,logminer等
// 实现richSourceFuntion完成source端代码的编写,实现ChecckpointFunction用于保证容错相关的内容,实现checkpointListener监听checkpoint的完成状态
public class DebeziumSourceFunction<T> extends RichSourceFunction<T>
       implements CheckpointedFunction, CheckpointListener, ResultTypeQueryable<T> {
 
 // ------------------------------列出一些比较重要的成员变量,不重要的忽略了------------------------------------------

   // ----------------------------------State-------------------------------------------------
/* 主要用于状态的维护,当任务出现问题重启/手动重启后,维护的一些schema(record中的结构) 未消费的records(在queue中,后面会看到) offset等信息 */
   private transient volatile String restoredOffsetState;
   private transient ListState<byte[]> offsetState;
   private transient ListState<String> schemaRecordsState;

   // -----------------------------------Worker-----------------------------------------------
/* 一个单线程的线程池,一个debeziumEngine(一个runnable的实现类)用与读取binlog数据  
TODO 所以设计到多线程的交互*/
   private transient ExecutorService executor;
   private transient DebeziumEngine<?> engine;

   /* 一个consumer,用于从engine中读取数据的消费者,并将数据放入handover中 */
   private transient DebeziumChangeConsumer changeConsumer;

   /* 用于从handover中拿取数据 */
   private transient DebeziumChangeFetcher<T> debeziumChangeFetcher;

   /* 两个线程(source,engine)之间交互数据的一个桥梁 */
   private transient Handover handover;

 // ----------------------------------------我们主要介绍srouce的run方法,其他方法主要用于容错相关--------------------------------------
 
   @Override
   public void run(SourceContext<T> sourceContext) throws Exception {
      // TODO 用于engine执行的一些相关参数,不是终点内容,如果感兴趣可官网看看说明
       properties.setProperty("name", "engine");
       properties.setProperty("offset.storage", FlinkOffsetBackingStore.class.getCanonicalName());
       if (restoredOffsetState != null) {
           properties.setProperty(FlinkOffsetBackingStore.OFFSET_STATE_VALUE, restoredOffsetState);
      }
       properties.setProperty("include.schema.changes", "false");
       properties.setProperty("offset.flush.interval.ms", String.valueOf(Long.MAX_VALUE));
       properties.setProperty("tombstones.on.delete", "false");
       if (engineInstanceName == null) {
           engineInstanceName = UUID.randomUUID().toString();
      }    
       properties.setProperty(
               FlinkDatabaseHistory.DATABASE_HISTORY_INSTANCE_NAME, engineInstanceName);  
       properties.setProperty("database.history", determineDatabase().getCanonicalName());
       String dbzHeartbeatPrefix =
               properties.getProperty(
                       Heartbeat.HEARTBEAT_TOPICS_PREFIX.name(),
                       Heartbeat.HEARTBEAT_TOPICS_PREFIX.defaultValueAsString());
       this.debeziumChangeFetcher =
               new DebeziumChangeFetcher<>(
                       sourceContext,
                       deserializer,
                       restoredOffsetState == null, // 是否是快照阶段或者state==null?
                       dbzHeartbeatPrefix,
                       handover);

       // 创建并配置engine相关参数
       this.engine =
               DebeziumEngine.create(Connect.class)
                      .using(properties)// 参数
                      .notifying(changeConsumer) // 配饰consumer消费 engine读取的数据(binlog/历史数据)
                      .using(OffsetCommitPolicy.always()) // offset的提交策略
                      .using(
                              (success, message, error) -> {
                                   if (success) {
                                       handover.close();
                                  } else {
                                       handover.reportError(error);
                                  }
                              })
                      .build();

       // 将engine任务提交到线程池中执行
       executor.execute(engine);
       debeziumStarted = true;

       // mertic相关配置i
       MetricGroup metricGroup = getRuntimeContext().getMetricGroup();
       // ....
     
       // 启动fetcher,循环去hanover中拿取最新数据发送下游
       debeziumChangeFetcher.runFetchLoop();
  }
   
}

上面我们已经看了source.run的基本实现,他的主要处理逻辑在DebeziumChangeConsumer,DebeziumChangeFetcher,Handover中

简单介绍三个类的作用和 主要 方法参数

DebeziumChangeConsumer : 用于消费engine读取的数据

代码语言:javascript复制
/* 该类实现 DebeziumEngine.ChangeConsumer接口,实现handlerBatch方法 相对比较简单, 另外两个成员方法主要是offset相关,非重点内容*/
// engine线程会调用handleBatch方法出传递引擎消费到的数据
public class DebeziumChangeConsumer
   implements DebeziumEngine.ChangeConsumer<ChangeEvent<SourceRecord, SourceRecord>> {
@Override
public void handleBatch(
   List<ChangeEvent<SourceRecord, SourceRecord>> events,
   RecordCommitter<ChangeEvent<SourceRecord, SourceRecord>> recordCommitter) {
try {
   currentCommitter = recordCommitter;
  // 间接调用到handover的produce方法,该方法是阻塞的 嘻嘻嘻(如果有历史records未被消费则wait)
   handover.produce(events);
} catch (Throwable e) {
   // Hold this exception in handover and trigger the fetcher to exit
   handover.reportError(e);
      }
 }
} 

DebeziumChangeFetcher : 循环从handover中获取consumer从engine读取的最新数据

代码语言:javascript复制
public class DebeziumChangeFetcher<T> {

private final SourceFunction.SourceContext<T> sourceContext;
/* 保证数据发送和状态更新的一把锁 */
private final Object checkpointLock;
/* 用于将数据转化成我们自定义的类型,如json,string等 */
private final DebeziumDeserializationSchema<T> deserialization;

/* 下面自定义的collector*/
private final DebeziumCollector debeziumCollector;
 /* 见名知意,很好理解 */
private final DebeziumOffset debeziumOffset;
/* 用于存储在stateoffset的序列化器*/
private final DebeziumOffsetSerializer stateSerializer;
/* 心跳相关*/
private final String heartbeatTopicPrefix;
/* 是否恢复的状态,需要消费历史相关数据*/
private boolean isInDbSnapshotPhase;
private final Handover handover;

public void runFetchLoop() throws Exception {
  try {
      // 读取mysql历史的数据,不要被名字所迷惑
      if (isInDbSnapshotPhase) {
          List<ChangeEvent<SourceRecord, SourceRecord>> events = handover.pollNext();
             
             synchronized (checkpointLock) {
                 LOG.info(
                         "Database snapshot phase can't perform checkpoint, acquired Checkpoint lock.");              
                 handleBatch(events);
                 // 这里防止snapshot数据无法一次读取完毕,必须保证snapshot数据读取完毕才进入binlog的读取
                 while (isRunning && isInDbSnapshotPhase) {
                     handleBatch(handover.pollNext());
                 }
             }
             LOG.info("Received record from streaming binlog phase, released checkpoint lock.");
             }

         // 到这里表示snapshot的数据读取完毕,开始实时读取binlog数据
         while (isRunning) {
             // 具体的处理数据逻辑   pollNext会阻塞
             handleBatch(handover.pollNext());
         }
         } catch (Handover.ClosedException e) {
         // ignore
         }


 private void handleBatch(List<ChangeEvent<SourceRecord, SourceRecord>> changeEvents)
        throws Exception {
    if (CollectionUtils.isEmpty(changeEvents)) {
        return;
    }
    this.processTime = System.currentTimeMillis();

     for (ChangeEvent<SourceRecord, SourceRecord> event : changeEvents) {
         SourceRecord record = event.value();
         // time相关基本都是metric相关内容,不必较真
         updateMessageTimestamp(record);
         fetchDelay = processTime - messageTimestamp;

         // 通过心跳机制来更新offset
         if (isHeartbeatEvent(record)) {
             synchronized (checkpointLock) {
                 debeziumOffset.setSourcePartition(record.sourcePartition());
                 debeziumOffset.setSourceOffset(record.sourceOffset());
             }
             continue;
         }
               // 根据不同的deserialization对数据做转换,---> 可以看这个,比较容易理解StringDebeziumDeserializationSchema, 内部直接 record.toString即可,就是将debezium读取的record转换成我们想要的格式或者类型,debeziumCollector 就是下面自定义的collector,在deserialize中,会将转换完成的数据放入queue中
         deserialization.deserialize(record, debeziumCollector);

       // 判断数据是否为snapshot的最后一条数据,如果是则在这条数据之后转换到binlog的streaming流程
         if (!isSnapshotRecord(record)) {
             LOG.debug("Snapshot phase finishes.");
             isInDbSnapshotPhase = false;// runFetchLoop方法中使用
         }
         // 具体发送数据
         emitRecordsUnderCheckpointLock(
                 debeziumCollector.records, record.sourcePartition(), record.sourceOffset());
       }
       }

 private void emitRecordsUnderCheckpointLock(
        Queue<T> records, Map<String, ?> sourcePartition, Map<String, ?> sourceOffset) {

     // 同步是保证数据的发送和offset的更新是安全,lock是可重入的(不懂可以百度,java基础内容)
     synchronized (checkpointLock) {
         T record;
         // 循环debeziumCollector的records队列,将队列中的数据依次发送到下游,
         while ((record = records.poll()) != null) {
             emitDelay = System.currentTimeMillis() - messageTimestamp;
             // 通过source的context对象将其发送到下游operator,这里转入了flink的处理逻辑,不再cdc代码之内
             sourceContext.collect(record);
         }
         debeziumOffset.setSourcePartition(sourcePartition);
         debeziumOffset.setSourceOffset(sourceOffset);
     }
     }

   // 心跳机制 ,用于更新offset的机制
   private boolean isHeartbeatEvent(SourceRecord record) {
     String topic = record.topic();
     return topic != null && topic.startsWith(heartbeatTopicPrefix);
   }

 // --------------------------------自定义collector-------------------------------------------------------

 private class DebeziumCollector implements Collector<T> {
   private final Queue<T> records = new ArrayDeque<>();
   @Override
   public void collect(T record) {
     // 将数据放入队列,queue会在别的地方进出列将数据发送下游
       records.add(record);
   }
 }
 }

Handover : source线程和engine线程执行中数据交互桥梁

代码语言:javascript复制
/* 这个类由两个线程访问, pollNext由debeziumFetcher调用,produce有debeziumConsumer调用,因为涉及多线程的调用,单纯的讲代码可能不容易理解,可以去复习一下java多线程知识内容,或者自己debug一下看看调用流程就比较容易理解了 */
@ThreadSafe //表示类是线程安全的,这类涉及engine和source线程两个线程操作,内部的实现保证了线程安全
public class Handover implements Closeable {

  private static final Logger LOG = LoggerFactory.getLogger(Handover.class);
  private final Object lock = new Object();

  @GuardedBy("lock") // 注解表示该变量受lock的保护, 不是重点勿关注
  private List<ChangeEvent<SourceRecord, SourceRecord>> next;

  @GuardedBy("lock")
  private Throwable error;

  private boolean wakeupProducer;

  /* debeziumFetcher 调用,当没有数据的时候进入wait状态,wait状态的时候cpu是不会调用wait状态的线程,另一个线程就可以占用cpu的全部时间片*/
  public List<ChangeEvent<SourceRecord, SourceRecord>> pollNext() throws Exception {
     // 同步代码块才可以使用wait和notifyAll,为什么使用这种方式,因为只有两个线程,所以这种方式实现简单,如果线程多可以通过juc的lock去做或者其他方式也可以
      synchronized (lock) {
          // 没有数据没有异常则持续循环进入wait状态,为了防止虚假唤醒的情况
          while (next == null && error == null) {
              lock.wait();
          }
          List<ChangeEvent<SourceRecord, SourceRecord>> n = next;
          // 上面的循环可以退出的时候,说明一定是有数据或者有异常,不存在其他的情况
          if (n != null) {
            // 将next置为null 下面会根据此条件作为判断条件
              next = null;
            // 唤醒其他等待线程,当然只可能是engine线程
              lock.notifyAll();
              return n;
          } else {
            // 将异常抛出
              ExceptionUtils.rethrowException(error, error.getMessage());
         // 上面方法一定会抛出异常,改代码只是为了去掉编译警告...
              return Collections.emptyList();
          }
      }
  }

  public void produce(final List<ChangeEvent<SourceRecord, SourceRecord>> element)
          throws InterruptedException {

      checkNotNull(element);

      synchronized (lock) {
        // next不等一直进入wait状态
          while (next != null && !wakeupProducer) {
              lock.wait();
          }

          wakeupProducer = false;

          // 有异常抛出异常,没异常将接受新数据,并唤醒fetcher线程
          if (error != null) {
              ExceptionUtils.rethrow(error, error.getMessage());
          } else {
              next = element;
              lock.notifyAll();
          }
      }
  }
}

上面代码即是基于RichSourceFunction实现的cdc主要代码,其实不算难,但是前人写的代码是已经把很多问题已经考虑进入,对代码的抽象也很好,扩展起来很方便,api设计对与我们开发者来说很容易使用

三.mysql-cdc源码-新Source接口的实现

1.11版本之后flink提供了新的source接口,可以提前预习一波https://issues.apache.org/jira/browse/FLINK-10740

简单介绍一下 SourceReader : 对split的的数据进行读取操作,比如: 读取一个分区,一个块等,当然不只局限与一个分区,根据自己的实现来 SplitEnumerator : 负责对数据源进行切分或者发现分区等,比如: 发现kafka的分区,对文件划分块等 上述的比较简单,实际上比这复杂一点,所以在新的source接口实现一个source是比较难的事情,不过熟悉之后都一样 提前说明 : 一个split我们可以认为是一个切片,mysql-cdc中, 假想情况下 : 一张的一部分中, 比如 开始主键 1 到 结束主键 10 ,那么该split就表示这些数据,在具体读取数据的时候是有readTask来去读,那么他就会通过split标记的点位来进行数据的读取,当然一个readTask不止会执行一个split; snapshot表示的是读取数据库的历史全量数据 binlog 表示当我们snapshot阶段结束后开始binlog阶段,即我们开始读取的binlog数据了 先执行snapshot阶段,后执行binlog阶段

代码的生成和旧版是相同的,只不过是内部执行的逻辑存在变化,新的source接口实现的cdc代码比较复杂,涉及的内容比较多,可能比较晕,后面自己可以根据源码debug走一走 由于代码过多,主要讲解重点的内容,不重要的跳过了

代码语言:javascript复制
// 实现了两个接口 source,和 resultTypeQueryable(比较简单就一个获取结果类型信息的接口) , 主要代码还是在source接口的实现
// T 为输出类型,MySqlSplit是mysql的分割器,PendingSplitsState表示Enumerator的状态对象
public class MySqlSource<T>
       implements Source<T, MySqlSplit, PendingSplitsState>, ResultTypeQueryable<T> {
 

   private final MySqlSourceConfigFactory configFactory;
   private final DebeziumDeserializationSchema<T> deserializationSchema;

   /* 通过构造者模式构建source所需要的参数,简单说明一下,里面的参数,通过MySqlSourceConfigFactory添加参数,在build方法中,将factory作为参数构建出MySqlSource
    -------------------------------------讲解一下对应关系------------------------------------------------
  MySqlSourceConfigFactory 可以根据不同的subtask创建对应的MySqlSourceConfig
  MySqlSourceConfig 可以构建 MySqlConnectorConfig
  MySqlConnection 通过 DebeziumUtil.createMySqlConnection(mySqlSourceConfig.getDbzConfiguration())方法构建
     上面的一个config比较混乱,名字也比较不容易理解,后面用到的时候会简单提一下,这里主要是有一个印象,不要被一些配置搞混
   */
   public static <T> MySqlSourceBuilder<T> builder() {
       return new MySqlSourceBuilder<>();
  }

 // 由MySqlSourceBuilder.build方法创建
   MySqlSource(
           MySqlSourceConfigFactory configFactory,
           DebeziumDeserializationSchema<T> deserializationSchema // 与老版source的deserialization一样
  ) {
       this.configFactory = configFactory;
       this.deserializationSchema = deserializationSchema;
  }

   @Override  // 流批一体的source,表示有界性,新source接口的特性
   public Boundedness getBoundedness() {return Boundedness.CONTINUOUS_UNBOUNDED; }

  /*构建sourceReader */
   @Override
   public SourceReader<T, MySqlSplit> createReader(SourceReaderContext readerContext)
           throws Exception {
    // 前面提到了,根据subtask索引创建对应的config
       MySqlSourceConfig sourceConfig =
               configFactory.createConfig(readerContext.getIndexOfSubtask());
       // 一个阻塞队列,多线程交互用的,不必深入
       FutureCompletingBlockingQueue<RecordsWithSplitIds<SourceRecord>> elementsQueue =
               new FutureCompletingBlockingQueue<>();
    // metric相关
       final MySqlSourceReaderMetrics sourceReaderMetrics =
               new MySqlSourceReaderMetrics(readerContext.metricGroup());
       sourceReaderMetrics.registerMetrics();
    // 通过supplier函数构建一个SplitReader,解耦的作用,主要看里面的MySqlSplitReader实现即可
       Supplier<MySqlSplitReader> splitReaderSupplier =
        // 拿到每个reader的config和对应的subtask index
              () -> new MySqlSplitReader(sourceConfig, readerContext.getIndexOfSubtask());
   
    // 构建了一个具体的sourceReader
       return new MySqlSourceReader<>(
               elementsQueue,
               splitReaderSupplier,
               new MySqlRecordEmitter<>(
                       deserializationSchema,
                       sourceReaderMetrics,
                       sourceConfig.isIncludeSchemaChanges()),
               readerContext.getConfiguration(),
               readerContext,
               sourceConfig);
  }

   @Override
   public SplitEnumerator<MySqlSplit, PendingSplitsState> createEnumerator(
           SplitEnumeratorContext<MySqlSplit> enumContext) {
    // 因为只会生成一次所以生成一个sourceConfig即可
       MySqlSourceConfig sourceConfig = configFactory.createConfig(0);
// 检验mysql
       final MySqlValidator validator = new MySqlValidator(sourceConfig);
       validator.validate();

       final MySqlSplitAssigner splitAssigner;
    // 判断开始条件如果是initial则先读取mysql table的数据(代码中叫做snapshot),然后再继续读取binlog的数据,如果不是initial状态,则直接从binlog开始读取
       if (sourceConfig.getStartupOptions().startupMode == StartupMode.INITIAL) {
           try (JdbcConnection jdbc = openJdbcConnection(sourceConfig)) {
               final List<TableId> remainingTables = discoverCapturedTables(jdbc, sourceConfig);
               boolean isTableIdCaseSensitive = DebeziumUtils.isTableIdCaseSensitive(jdbc);
               splitAssigner =
                // 里面包含 snapshot和binlog的split逻辑
                       new MySqlHybridSplitAssigner(
                               sourceConfig,
                               enumContext.currentParallelism(),
                               remainingTables,
                               isTableIdCaseSensitive);
          } catch (Exception e) {
               throw new FlinkRuntimeException(
                       "Failed to discover captured tables for enumerator", e);
          }
      } else {
        // 之有binlog的split逻辑
           splitAssigner = new MySqlBinlogSplitAssigner(sourceConfig);
      }
// 创建对应发的SplitEnumerator,用于构建split给reader读取
       return new MySqlSourceEnumerator(enumContext, sourceConfig, splitAssigner);
  }

// 恢复SplitEnumerato,比如任务故障重启,会根据不同的checkpoint恢复SplitEnumerator,用于继续之前未完成的读取操作
   @Override
   public SplitEnumerator<MySqlSplit, PendingSplitsState> restoreEnumerator(
           SplitEnumeratorContext<MySqlSplit> enumContext, PendingSplitsState checkpoint) {
       MySqlSourceConfig sourceConfig = configFactory.createConfig(0);

       final MySqlSplitAssigner splitAssigner;
       if (checkpoint instanceof HybridPendingSplitsState) {
           splitAssigner =
                   new MySqlHybridSplitAssigner(
                           sourceConfig,
                           enumContext.currentParallelism(),
                          (HybridPendingSplitsState) checkpoint);
      } else if (checkpoint instanceof BinlogPendingSplitsState) {
           splitAssigner =
                   new MySqlBinlogSplitAssigner( sourceConfig, (BinlogPendingSplitsState) checkpoint);
      } else {
           throw new UnsupportedOperationException( "Unsupported restored PendingSplitsState: "   checkpoint);
      }
       return new MySqlSourceEnumerator(enumContext, sourceConfig, splitAssigner);
  }

 // -----------------容错相关,不是重点-----------------
   @Override
   public SimpleVersionedSerializer<MySqlSplit> getSplitSerializer() { return MySqlSplitSerializer.INSTANCE; }

   @Override
   public SimpleVersionedSerializer<PendingSplitsState> getEnumeratorCheckpointSerializer() { return new PendingSplitsStateSerializer(getSplitSerializer());}

// 返回值类型的提取
   @Override
   public TypeInformation<T> getProducedType() {return deserializationSchema.getProducedType();}
}

上面的代码中我们可以看到source的实现,主要是构建sourceReader和splitEnumerator,以及容错内容,相关的处理逻辑也封装在相应的对象中,下面我们对其内部逐步剖析

代码语言:javascript复制
/*在看其他内容之前,我们可以看看如何对mysql进行split操作,在snapshot是通过主键来split的,binlog的只从当前offset位置开始消费,
这里是混合的一个split,另外还存在binlog和snapshot的splitAssigner,不过我们根据主要看看大致逻辑,具体到某一直可以自己阅读理解,
解释一下 : 先读取mysql历史数据即snapshot阶段,然后再进行当前mysql-binlog的位置开始消费,所以这个混合的意义就是先读取全量数据,然后从最新的binlog开始读取,完成cdc读取数据的过程*/
public class MySqlHybridSplitAssigner implements MySqlSplitAssigner {

    private final int splitMetaGroupSize;

    private boolean isBinlogSplitAssigned;

    private final MySqlSnapshotSplitAssigner snapshotSplitAssigner;

    public MySqlHybridSplitAssigner(
            MySqlSourceConfig sourceConfig,
            int currentParallelism,
            List<TableId> remainingTables,
            boolean isTableIdCaseSensitive) {
        this(
             // 创建snapshot split
                new MySqlSnapshotSplitAssigner(
                        sourceConfig, currentParallelism, remainingTables, isTableIdCaseSensitive),
                false,
                sourceConfig.getSplitMetaGroupSize());
    }

    public MySqlHybridSplitAssigner(
            MySqlSourceConfig sourceConfig,
            int currentParallelism,
            HybridPendingSplitsState checkpoint) {
        this(
                new MySqlSnapshotSplitAssigner(
                        sourceConfig, currentParallelism, checkpoint.getSnapshotPendingSplits()),
                checkpoint.isBinlogSplitAssigned(),
                sourceConfig.getSplitMetaGroupSize());
    }

    private MySqlHybridSplitAssigner(
            MySqlSnapshotSplitAssigner snapshotSplitAssigner,
            boolean isBinlogSplitAssigned,
            int splitMetaGroupSize) {
        this.snapshotSplitAssigner = snapshotSplitAssigner;
        this.isBinlogSplitAssigned = isBinlogSplitAssigned;
        this.splitMetaGroupSize = splitMetaGroupSize;
    }

    @Override
    public void open() {
        snapshotSplitAssigner.open();
    }
  
  // 主要返回下一个split,没有则返回一个空, optional可以jdk8的新特性,用于解决空指针的一个类
    @Override
    public Optional<MySqlSplit> getNext() {
      // 下面的方法可以见名知意,自行理解即可
        if (snapshotSplitAssigner.noMoreSplits()) {
            if (isBinlogSplitAssigned) {
                return Optional.empty();
            } else if (snapshotSplitAssigner.isFinished()) { // 当snapshot完成后,开始binlog的split流程
                // we need to wait snapshot-assigner to be finished before
                // assigning the binlog split. Otherwise, records emitted from binlog split
                // might be out-of-order in terms of same primary key with snapshot splits.
                isBinlogSplitAssigned = true;
                return Optional.of(createBinlogSplit());
            } else {
                // binlog split is not ready by now
                return Optional.empty();
            }
        } else {
            // snapshot assigner still have remaining splits, assign split from it
            return snapshotSplitAssigner.getNext();
        }
    }
  
   // splitAssigner是否在等待已完成split回调,即onFinishedSplits
    @Override
    public boolean waitingForFinishedSplits() {
        return snapshotSplitAssigner.waitingForFinishedSplits();
    }
  // 获取已完成的split并且包含他的元数据,可以根据已经完成snapshot(snapshot的某一个split)生成对应binlog的split
    @Override
    public List<FinishedSnapshotSplitInfo> getFinishedSplitInfos() {
        return snapshotSplitAssigner.getFinishedSplitInfos();
    }
  // 使用已完成的binlog偏移量来处理已完成的split,用于确定何时生成binlog split以及生成什么binlog split,就是回调
    @Override
    public void onFinishedSplits(Map<String, BinlogOffset> splitFinishedOffsets) {
        snapshotSplitAssigner.onFinishedSplits(splitFinishedOffsets);
    }

   // 向此splitAssigner添加一组spilt,当某些split处理失败,则需要重新添加分割时调用此方法
    @Override
    public void addSplits(Collection<MySqlSplit> splits) {
        List<MySqlSplit> snapshotSplits = new ArrayList<>();
        for (MySqlSplit split : splits) {
            if (split.isSnapshotSplit()) {
                snapshotSplits.add(split);
            } else {
                // we don't store the split, but will re-create binlog split later
                isBinlogSplitAssigned = false;
            }
        }
        snapshotSplitAssigner.addSplits(snapshotSplits);
    }
 // ----------------------------checkpoint 容错相关----------------------------------------
    @Override
    public PendingSplitsState snapshotState(long checkpointId) {
        return new HybridPendingSplitsState(
                snapshotSplitAssigner.snapshotState(checkpointId), isBinlogSplitAssigned);
    }

    @Override
    public void notifyCheckpointComplete(long checkpointId) {
        snapshotSplitAssigner.notifyCheckpointComplete(checkpointId);
    }

    @Override
    public void close() {
        snapshotSplitAssigner.close();
    }

    // ------------------------------------binlog split部分-------------------------------------------
  // 构建biglog split, 就是根据已经完成snapshot split来构建binlog split的一个过程,split代码比较简单可以自行阅读
   // 简单介绍一下 就是描述binlog的split,snapshot的split相关内容,比如snapshot,会按照主键去做split,已经table的schemas相关信息
    private MySqlBinlogSplit createBinlogSplit() {
        final List<MySqlSnapshotSplit> assignedSnapshotSplit =
                snapshotSplitAssigner.getAssignedSplits().values().stream()
                        .sorted(Comparator.comparing(MySqlSplit::splitId))
                        .collect(Collectors.toList());

        Map<String, BinlogOffset> splitFinishedOffsets =
                snapshotSplitAssigner.getSplitFinishedOffsets();
        final List<FinishedSnapshotSplitInfo> finishedSnapshotSplitInfos = new ArrayList<>();

        BinlogOffset minBinlogOffset = null;
        for (MySqlSnapshotSplit split : assignedSnapshotSplit) {
            // find the min binlog offset
            BinlogOffset binlogOffset = splitFinishedOffsets.get(split.splitId());
            if (minBinlogOffset == null || binlogOffset.isBefore(minBinlogOffset)) {
                minBinlogOffset = binlogOffset;
            }
            finishedSnapshotSplitInfos.add(
                    new FinishedSnapshotSplitInfo(
                            split.getTableId(),
                            split.splitId(),
                            split.getSplitStart(),
                            split.getSplitEnd(),
                            binlogOffset));
        }

        boolean divideMetaToGroups = finishedSnapshotSplitInfos.size() > splitMetaGroupSize;
        return new MySqlBinlogSplit(
                BINLOG_SPLIT_ID,
                minBinlogOffset == null ? BinlogOffset.INITIAL_OFFSET : minBinlogOffset,
                BinlogOffset.NO_STOPPING_OFFSET,
                divideMetaToGroups ? new ArrayList<>() : finishedSnapshotSplitInfos,
                new HashMap<>(),
                finishedSnapshotSplitInfos.size());
    }
}

现在我们开始介绍sourceReader和SplitEnumerator

sourceReader :

代码语言:javascript复制

/* SingleThreadMultiplexSourceReaderBase */
public class MySqlSourceReader<T>
     extends SingleThreadMultiplexSourceReaderBase<SourceRecord, T, MySqlSplit, MySqlSplitState> {

 private static final Logger LOG = LoggerFactory.getLogger(MySqlSourceReader.class);

 private final MySqlSourceConfig sourceConfig;
 private final Map<String, MySqlSnapshotSplit> finishedUnackedSplits;
 private final Map<String, MySqlBinlogSplit> uncompletedBinlogSplits;
 private final int subtaskId;

 public MySqlSourceReader(
         FutureCompletingBlockingQueue<RecordsWithSplitIds<SourceRecord>> elementQueue,
         Supplier<MySqlSplitReader> splitReaderSupplier,
         RecordEmitter<SourceRecord, T, MySqlSplitState> recordEmitter,
         Configuration config,
         SourceReaderContext context,
         MySqlSourceConfig sourceConfig) {
     super(
             elementQueue,
          // 一个单线程的fetcher管理器,做一些读取操作
          // 简单描述一下流程 
          // SingleThreadFetcherManager.createSplitFetcher 构建一个SplitFetcher(实现了Runnable),在SplitFetcher中会构建一个fetcherTask,SplitFetcher.run方法中,循环调用this.runOnce(),this.runOnce()会持续调用fetcherTask.run()读取数据,run()会调用MySqlSplitReader.fetch方法,返回reader读取的数据,并将数据放入到elementQueue中,只要涉及都多线程的代码,都比较晦涩难懂
             new SingleThreadFetcherManager<>(elementQueue, splitReaderSupplier::get),
             recordEmitter,
             config,
             context);
     this.sourceConfig = sourceConfig;
     this.finishedUnackedSplits = new HashMap<>();
     this.uncompletedBinlogSplits = new HashMap<>();
     this.subtaskId = context.getIndexOfSubtask();
 }
  // 启动reader
 @Override
 public void start() {
     if (getNumberOfCurrentlyAssignedSplits() == 0) {
        // 发送split的请求到splitEnumerator,会调用到SplitEnumerator.handleSplitRequest(int, String)方法,会带这并行的reader的subtask id 和hostname
         context.sendSplitRequest();
     }
 }
  // 当reader分配到新的split的时候,会初始化一个split的state
 @Override
 protected MySqlSplitState initializedState(MySqlSplit split) {
     if (split.isSnapshotSplit()) {
         return new MySqlSnapshotSplitState(split.asSnapshotSplit());
     } else {
         return new MySqlBinlogSplitState(split.asBinlogSplit());
     }
 }

 @Override // 容错相关, skip
 public List<MySqlSplit> snapshotState(long checkpointId) {
     // unfinished splits
     List<MySqlSplit> stateSplits = super.snapshotState(checkpointId);
     // add finished snapshot splits that didn't receive ack yet
     stateSplits.addAll(finishedUnackedSplits.values());
     // add binlog splits who are uncompleted
     stateSplits.addAll(uncompletedBinlogSplits.values());
     return stateSplits;
 }
  // 清理处理已完成的split状态,非重点
 @Override
 protected void onSplitFinished(Map<String, MySqlSplitState> finishedSplitIds) {
     for (MySqlSplitState mySqlSplitState : finishedSplitIds.values()) {
         MySqlSplit mySqlSplit = mySqlSplitState.toMySqlSplit();
         checkState(
                 mySqlSplit.isSnapshotSplit(),
                 String.format(
                         "Only snapshot split could finish, but the actual split is binlog split %s",
                         mySqlSplit));
         finishedUnackedSplits.put(mySqlSplit.splitId(), mySqlSplit.asSnapshotSplit());
     }
     reportFinishedSnapshotSplitsIfNeed();
     context.sendSplitRequest();
 }
  /*添加此reader要read的split列表,当splitEnumerator通过splitEnumeratorContext分配一个splut时,将调用此方法
即调用 context.assignSplit(SourceSplit, int) 或者 context.assignSplits(SplitsAssignment).
 */
 @Override
 public void addSplits(List<MySqlSplit> splits) {
     List<MySqlSplit> unfinishedSplits = new ArrayList<>();
     for (MySqlSplit split : splits) {
       // 判断是否是snapshot还是binlog split
         if (split.isSnapshotSplit()) {
            // 如果split已经read完成放入完成集合,否则放入未完成的集合中
             MySqlSnapshotSplit snapshotSplit = split.asSnapshotSplit();
             if (snapshotSplit.isSnapshotReadFinished()) {
                 finishedUnackedSplits.put(snapshotSplit.splitId(), snapshotSplit);
             } else {
                 unfinishedSplits.add(split);
             }
         } else {
             if (!split.asBinlogSplit().isCompletedSplit()) {
                //如果binlog split未完成则加入未完成的列表中,并想spluitEnumerator发送请求binlog split meta的事件
                 uncompletedBinlogSplits.put(split.splitId(), split.asBinlogSplit());
                 requestBinlogSplitMetaIfNeeded(split.asBinlogSplit());
             } else {
                // 未完成的split集合删除该split ,未完成的集合表示没有split meta信息
                 uncompletedBinlogSplits.remove(split.splitId());
                // 创建binlog split, 带有table schema信息
                 MySqlBinlogSplit mySqlBinlogSplit =
                         discoverTableSchemasForBinlogSplit(split.asBinlogSplit());
                // 添加到未完成的splits,后续会进行read操作
                 unfinishedSplits.add(mySqlBinlogSplit);
             }
         }
     }
     // notify split enumerator again about the finished unacked snapshot splits
     reportFinishedSnapshotSplitsIfNeed();
     // add all un-finished splits (including binlog split) to SourceReaderBase
     // TODO 当调用spuer.addSplits的时候,会启动fetcherManager,开始读取数据的操作
     super.addSplits(unfinishedSplits);
 }

 private MySqlBinlogSplit discoverTableSchemasForBinlogSplit(MySqlBinlogSplit split) {
     final String splitId = split.splitId();
   // 如果tableSchema不存在则填充,如果已经存在,则直接返回split即可
     if (split.getTableSchemas().isEmpty()) {
         try (MySqlConnection jdbc =
              // 静态方法,构建一个mysqlConnection,可以认为就是一个jdbc连接 ,不必深入
                 DebeziumUtils.createMySqlConnection(sourceConfig.getDbzConfiguration())) {
             Map<TableId, TableChanges.TableChange> tableSchemas =
                // 静态方法,根据我们sourceBuilder构建的时候给定的database和tablelist来构建对应的tableId和TableChange,然后我们在面read的时候需要, 不必深入工具类
              TableDiscoveryUtils.discoverCapturedTableSchemas(sourceConfig, jdbc);
             LOG.info("The table schema discovery for binlog split {} success", splitId);
            // 静态方法,构建一个带有tableSchema的MysqlBinlogSpilt,不必深入
             return MySqlBinlogSplit.fillTableSchemas(split, tableSchemas);
         } catch (SQLException e) {
             LOG.error("Failed to obtains table schemas due to {}", e.getMessage());
             throw new FlinkRuntimeException(e);
         }
     } else {

         LOG.warn("The binlog split {} has table schemas yet, skip the table schema discovery",split);
         return split;
     }
 }

// 处理source自定义事件,接收来自splitEumumerator,与splitEumumerator类似
 @Override
 public void handleSourceEvents(SourceEvent sourceEvent) {
     if (sourceEvent instanceof FinishedSnapshotSplitsAckEvent) {
         FinishedSnapshotSplitsAckEvent ackEvent = (FinishedSnapshotSplitsAckEvent) sourceEvent;
         LOG.debug(
                 "The subtask {} receives ack event for {} from enumerator.",
                 subtaskId,
                 ackEvent.getFinishedSplits());
         for (String splitId : ackEvent.getFinishedSplits()) {
             this.finishedUnackedSplits.remove(splitId);
         }
     } else if (sourceEvent instanceof FinishedSnapshotSplitsRequestEvent) {
         // report finished snapshot splits
         LOG.debug(
                 "The subtask {} receives request to report finished snapshot splits.",
                 subtaskId);
         reportFinishedSnapshotSplitsIfNeed();
     } else if (sourceEvent instanceof BinlogSplitMetaEvent) {
         LOG.debug(
                 "The subtask {} receives binlog meta with group id {}.",
                 subtaskId,
                 ((BinlogSplitMetaEvent) sourceEvent).getMetaGroupId());
         fillMetaDataForBinlogSplit((BinlogSplitMetaEvent) sourceEvent);
     } else {
         super.handleSourceEvents(sourceEvent);
     }
 }
   // 发送请求binlogSplit meta的事件
 private void requestBinlogSplitMetaIfNeeded(MySqlBinlogSplit binlogSplit) {
     final String splitId = binlogSplit.splitId();
     if (!binlogSplit.isCompletedSplit()) {
         final int nextMetaGroupId =
                 getNextMetaGroupId(
                         binlogSplit.getFinishedSnapshotSplitInfos().size(),
                         sourceConfig.getSplitMetaGroupSize());
         BinlogSplitMetaRequestEvent splitMetaRequestEvent =
                 new BinlogSplitMetaRequestEvent(splitId, nextMetaGroupId);
         context.sendSourceEventToCoordinator(splitMetaRequestEvent);
     } else {
         LOG.info("The meta of binlog split {} has been collected success", splitId);
         this.addSplits(Arrays.asList(binlogSplit));
     }
 }
  // 我们发送了请求meta的event后,会收到binlog split meta,我们需要填充至binlogSplit中
 private void fillMetaDataForBinlogSplit(BinlogSplitMetaEvent metadataEvent) {
     MySqlBinlogSplit binlogSplit = uncompletedBinlogSplits.get(metadataEvent.getSplitId());
     if (binlogSplit != null) {
         final int receivedMetaGroupId = metadataEvent.getMetaGroupId();
         final int expectedMetaGroupId =
                 getNextMetaGroupId(
                         binlogSplit.getFinishedSnapshotSplitInfos().size(),
                         sourceConfig.getSplitMetaGroupSize());
         if (receivedMetaGroupId == expectedMetaGroupId) {
             List<FinishedSnapshotSplitInfo> metaDataGroup =
                     metadataEvent.getMetaGroup().stream()
                             .map(FinishedSnapshotSplitInfo::deserialize)
                             .collect(Collectors.toList());
             uncompletedBinlogSplits.put(
                     binlogSplit.splitId(),
                     MySqlBinlogSplit.appendFinishedSplitInfos(binlogSplit, metaDataGroup));
             LOG.info("Fill meta data of group {} to binlog split", metaDataGroup.size());
         } else {
             LOG.warn("Received out of oder binlog meta event for split {}, the received meta group id is {}, but expected is {}, ignore it",metadataEvent.getSplitId(), receivedMetaGroupId,expectedMetaGroupId);
         }
       // 继续发送请求meta event
         requestBinlogSplitMetaIfNeeded(binlogSplit);
     } else {
         LOG.warn( "Received binlog meta event for split {}, but the uncompleted split map does not contain it", metadataEvent.getSplitId());
     }
 }
   // state变成不可变的state
 @Override
 protected MySqlSplit toSplitType(String splitId, MySqlSplitState splitState) { return splitState.toMySqlSplit(); }
}

splitEnumerator :

  • 处理sourceReader的split请求
  • 将split分配给sourceReader
代码语言:javascript复制
// 继承SplitEnumerator,并重写其方法
public class MySqlSourceEnumerator implements SplitEnumerator<MySqlSplit, PendingSplitsState> {
    private static final long CHECK_EVENT_INTERVAL = 30_000L;
    private final SplitEnumeratorContext<MySqlSplit> context;
    private final MySqlSourceConfig sourceConfig;
    private final MySqlSplitAssigner splitAssigner;
    // using TreeSet to prefer assigning binlog split to task-0 for easier debug
    private final TreeSet<Integer> readersAwaitingSplit;
    private List<List<FinishedSnapshotSplitInfo>> binlogSplitMeta;

    public MySqlSourceEnumerator(
            SplitEnumeratorContext<MySqlSplit> context,
            MySqlSourceConfig sourceConfig,
            MySqlSplitAssigner splitAssigner) {
       // source.createEnumerator传入的context对象
        this.context = context;
        this.sourceConfig = sourceConfig;
        this.splitAssigner = splitAssigner;
        this.readersAwaitingSplit = new TreeSet<>();
    }

    @Override
    public void start() {
        splitAssigner.open(); //调用splitAssigner的open方法,可以具体看看每个splitAssigner的实现
        // 注册一个Callable,定期调用,主要的作用就是当reader出现通信失败或者故障重启之后,检查是否有错过的通知时间,不是终点
        this.context.callAsync(
                this::getRegisteredReader,
                this::syncWithReaders,
                CHECK_EVENT_INTERVAL,
                CHECK_EVENT_INTERVAL);
    }

    // 处理split的请求,当有具体给定子subtask id的reader调用SourceReaderContext.sendSplitRequest()方法时,将调用此方法。
    @Override
    public void handleSplitRequest(int subtaskId, @Nullable String requesterHostname) {
        if (!context.registeredReaders().containsKey(subtaskId)) {
            // reader failed between sending the request and now. skip this request.
            return;
        }
    // 将请求的taskId放入等待列表
        readersAwaitingSplit.add(subtaskId);
       // 对等待列表的subtask进行fen'pei
        assignSplits();
    }
  // 将split添加至splitEnumerator,只有在最后一个成功的checkpoint之后,分配的spilt才会出现此情况,说明需要重新处理.
    @Override
    public void addSplitsBack(List<MySqlSplit> splits, int subtaskId) {
        LOG.debug("MySQL Source Enumerator adds splits back: {}", splits);
        splitAssigner.addSplits(splits);
    }

    // 处理sourceReader的自定义event
    @Override
    public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {
       // sourceReader发送给splitEnumerator的SourceEvent通知snapshot的split已经读取完成,binlog的位置是一致的
        if (sourceEvent instanceof FinishedSnapshotSplitsReportEvent) {
            LOG.info(
                    "The enumerator receives finished split offsets {} from subtask {}.",
                    sourceEvent,
                    subtaskId);
            FinishedSnapshotSplitsReportEvent reportEvent =
                    (FinishedSnapshotSplitsReportEvent) sourceEvent;
            Map<String, BinlogOffset> finishedOffsets = reportEvent.getFinishedOffsets();
            // 上面splitAssigner介绍过
            splitAssigner.onFinishedSplits(finishedOffsets);
            // 返回ACK事件返回给redaer的表示已经确认了snapshot
            FinishedSnapshotSplitsAckEvent ackEvent =
                    new FinishedSnapshotSplitsAckEvent(new ArrayList<>(finishedOffsets.keySet()));
            context.sendEventToSourceReader(subtaskId, ackEvent);
        } 
       // sourceReader发送给splitEnumerator的SourceEvent用来拉取binlog元数据,也就是发送BinlogSplitMetaEvent
      else if (sourceEvent instanceof BinlogSplitMetaRequestEvent) {
            LOG.debug(
                    "The enumerator receives request for binlog split meta from subtask {}.",
                    subtaskId);
          // 发送binlog meta
            sendBinlogMeta(subtaskId, (BinlogSplitMetaRequestEvent) sourceEvent);
        }
    }

    @Override
    public PendingSplitsState snapshotState(long checkpointId) {
        return splitAssigner.snapshotState(checkpointId);
    }

    @Override
    public void notifyCheckpointComplete(long checkpointId) {
        splitAssigner.notifyCheckpointComplete(checkpointId);
        // binlog split may be available after checkpoint complete
        assignSplits();
    }

    // ------------------------------------------------------------------------------------------
  // 为等待列表的subtask分配
    private void assignSplits() {
      // treeSet返回的iter是排好序的,即按照subtask id顺序依次处理
        final Iterator<Integer> awaitingReader = readersAwaitingSplit.iterator();
        while (awaitingReader.hasNext()) {
            int nextAwaiting = awaitingReader.next();
            // 如果reader再次请求的split在此期间失败,则将其从等待列表中删除
            if (!context.registeredReaders().containsKey(nextAwaiting)) {
                awaitingReader.remove();
                continue;
            }
            Optional<MySqlSplit> split = splitAssigner.getNext();
            if (split.isPresent()) {
                final MySqlSplit mySqlSplit = split.get();
                // 为subtask分配split
                context.assignSplit(mySqlSplit, nextAwaiting);
                awaitingReader.remove();
                LOG.info("Assign split {} to subtask {}", mySqlSplit, nextAwaiting);
            } else {
                // there is no available splits by now, skip assigning
               // 前面splitAssigner中会分配空值,在这里被过滤掉
                break;
            }
        }
    }

  // 发送给binlog meta event到reader
    private void sendBinlogMeta(int subTask, BinlogSplitMetaRequestEvent requestEvent) {
        // 如果binlog meta ==null 则进行meta的初始化操作
        if (binlogSplitMeta == null) {
            final List<FinishedSnapshotSplitInfo> finishedSnapshotSplitInfos =
                    splitAssigner.getFinishedSplitInfos();
            if (finishedSnapshotSplitInfos.isEmpty()) {
                LOG.error(
                        "The assigner offer empty finished split information, this should not happen");
                throw new FlinkRuntimeException(
                        "The assigner offer empty finished split information, this should not happen");
            }
            binlogSplitMeta =
                    Lists.partition(
                            finishedSnapshotSplitInfos, sourceConfig.getSplitMetaGroupSize());
        }
        final int requestMetaGroupId = requestEvent.getRequestMetaGroupId();

        if (binlogSplitMeta.size() > requestMetaGroupId) {
           // 获取对应的FinishedSnapshotSplitInfo列表,并将其封序列化,生成meta event
            List<FinishedSnapshotSplitInfo> metaToSend = binlogSplitMeta.get(requestMetaGroupId);
            BinlogSplitMetaEvent metadataEvent =
                    new BinlogSplitMetaEvent(
                            requestEvent.getSplitId(),
                            requestMetaGroupId,
                            metaToSend.stream()
                                    .map(FinishedSnapshotSplitInfo::serialize)
                                    .collect(Collectors.toList()));
           // 将生成的meta evnet 发送给reader
            context.sendEventToSourceReader(subTask, metadataEvent);
        } else {
            LOG.error(
                    "Received invalid request meta group id {}, the invalid meta group id range is [0, {}]",
                    requestMetaGroupId,
                    binlogSplitMeta.size() - 1);
        }
    }
}

上面两个类中我们没有看到具体的读数据逻辑,实际上当系统调用addSplit()的时候就开始启动任务了,由于调用链比较长,为了方便观看,我这里直接截图看代码,看看代码是怎么开始进入执行逻辑的,

  • sourceReader中创建的fetcherManager,存入父类成员变量中
  • 当sourceReader调用addSplits的会调用父类的addSplits方法
  • 调用我们传入的fetcherManager的addSplits方法
  • 调用fetcherManager的addSplits方法时,子类没有覆写父类方法,直接进入父类方法,这里直接进入父类的splits方法,如果fetcher没有启动,则创建fetcher(一个runnable对象),然后提交到线程池执行任务

上面可以看到我们的fetcher已经启动了,那我们就看看fetcher具体做了什么样子的事情(要记住上面传入了一个队列,fetcher中读取的数据会放入队列中),createFetcher时候,实际是创建的SplitFetcher,有flink新source中提供类

代码语言:javascript复制
/*由于SplitFetcher是一个runnable对象,所以我们直接进入run方法看看做了什么即可
  先介绍一下流程 :
   1. 当构建fetcher的时候在构造方法中,我们传递了一个splitReader,这个是负责真实读取数据的(实际上是mysqlSplitReader)
   2. fetcher构造方法中构建了一个FetcherTask,run之后会开始task的执行,如果还记得的话 我们在startFetcher()之后调用了一个fetcher的addSplit方法,该方法会将splits构建成tasks加入的taskQueue
   3. 里面会有一些空闲,唤醒等不重要的逻辑,我给删除掉了,不重要,不要占用过多时间,因为非cdc内容
*/
   private static final Logger LOG = LoggerFactory.getLogger(SplitFetcher.class);
    private static final SplitFetcherTask WAKEUP_TASK = new DummySplitFetcherTask("WAKEUP_TASK");

    private final int id;
    private final BlockingDeque<SplitFetcherTask> taskQueue;
    // track the assigned splits so we can suspend the reader when there is no splits assigned.
    private final Map<String, SplitT> assignedSplits;
    private final FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue;
    private final SplitReader<E, SplitT> splitReader;
    private final Consumer<Throwable> errorHandler;
    private final Runnable shutdownHook;
    private final AtomicBoolean wakeUp;
    private final AtomicBoolean closed;
    private final FetchTask<E, SplitT> fetchTask;
    private volatile SplitFetcherTask runningTask = null;

    private final Object lock = new Object();

 
    SplitFetcher(
            int id,
            FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue,
            SplitReader<E, SplitT> splitReader,
            Consumer<Throwable> errorHandler,
            Runnable shutdownHook,
            Consumer<Collection<String>> splitFinishedHook) {
        this.id = id;
       // task队列,包含WAKEUP_TASK(特定情况下唤醒fetcher线程用),以及我们读取任务的task
        this.taskQueue = new LinkedBlockingDeque<>();
       // 读取的数据会放入该队列
        this.elementsQueue = elementsQueue;
        this.assignedSplits = new HashMap<>();
        this.splitReader = splitReader;
        this.errorHandler = errorHandler;
        this.shutdownHook = shutdownHook;
        this.isIdle = true;
        this.wakeUp = new AtomicBoolean(false);
        this.closed = new AtomicBoolean(false);
    // 对传入的splitReader封装到fetcherTask,以便任务启动的时候直接执行任务
        this.fetchTask =
                new FetchTask<>(
                        splitReader,
                        elementsQueue,
                        ids -> {
                            ids.forEach(assignedSplits::remove);
                            splitFinishedHook.accept(ids);
                            LOG.info("Finished reading from splits {}", ids);
                        },
                        id);
    }

    @Override
    public void run() {
        LOG.info("Starting split fetcher {}", id);
        try {
            while (!closed.get()) {
              // 每次循环的距离逻辑
                runOnce();
            }
        } catch (Throwable t) {
            errorHandler.accept(t);
        } finally {
            try {
                splitReader.close();
            } catch (Exception e) {
                errorHandler.accept(e);
            }
            LOG.info("Split fetcher {} exited.", id);
            // This executes after possible errorHandler.accept(t). If these operations bear
            // a happens-before relation, then we can checking side effect of errorHandler.accept(t)
            // to know whether it happened after observing side effect of shutdownHook.run().
            shutdownHook.run();
        }
    }

    /** Package private method to help unit test. */
    void runOnce() {
        try {
            if (shouldRunFetchTask()) {
                runningTask = fetchTask;
            } else {
                runningTask = taskQueue.take();
            }
            
            LOG.debug("Prepare to run {}", runningTask);
           // 这里运行task,我们下面直接去task中看看具体的操作逻辑即可
            if (!wakeUp.get() && runningTask.run()) {
                LOG.debug("Finished running task {}", runningTask);
                // the task has finished running. Set it to null so it won't be enqueued.
                runningTask = null;
                checkAndSetIdle();
            }
        } catch (Exception e) {
            throw new RuntimeException(
                    String.format(
                            "SplitFetcher thread %d received unexpected exception while polling the records",
                            id),
                    e);
        }
        // If the task is not null that means this task needs to be re-executed. This only
        // happens when the task is the fetching task or the task was interrupted.
        maybeEnqueueTask(runningTask);
        synchronized (wakeUp) {
            // Set the running task to null. It is necessary for the shutdown method to avoid
            // unnecessarily interrupt the running task.
            runningTask = null;
            // Set the wakeUp flag to false.
            wakeUp.set(false);
            LOG.debug("Cleaned wakeup flag.");
        }
    }

    /*  在fetcher创建的时候调用了该方法,或者已经运行之后调用的该方法在上面截图的流程中有代码   */
    public void addSplits(List<SplitT> splitsToAdd) {
        enqueueTask(new AddSplitsTask<>(splitReader, splitsToAdd, assignedSplits));
        wakeUp(true);
    }
  public void enqueueTask(SplitFetcherTask task) {
        synchronized (lock) {
            taskQueue.offer(task);
            isIdle = false;
        }
    }
}

我们进入fetcherTask中,只看task逻辑即可

代码语言:javascript复制
class FetcherTask{
 @Override
    public boolean run() throws IOException {
        try {
            if (!isWakenUp() && lastRecords == null) {
              // 返回的是该对象 public final class MySqlRecords implements RecordsWithSplitIds<SourceRecord>
              // 调用了我们在创建fetcherTask的时候传入的splitReader对象,实际上还是调用reader的fetch方法来真正的获取数据
                lastRecords = splitReader.fetch();
            }

            if (!isWakenUp()) {
                // The order matters here. We must first put the last records into the queue.
                // This ensures the handling of the fetched records is atomic to wakeup.
               // 将读取的数据放入到队列汇总
                if (elementsQueue.put(fetcherIndex, lastRecords)) {
                    if (!lastRecords.finishedSplits().isEmpty()) {
                        // The callback does not throw InterruptedException.
                        splitFinishedCallback.accept(lastRecords.finishedSplits());
                    }
                    lastRecords = null;
                }
            }
        } catch (InterruptedException e) {
            throw new IOException("Source fetch execution was interrupted", e);
            if (isWakenUp()) {
                wakeup = false;
            }
        }
        return true;
    }
}

通过上面我们基本上已经清楚了在flink层面是怎么最终调用了cdc读取数据的代码,现在我们根据主要的读取代码看看是怎么样子实现的

currentReader.pollSplitRecords() ,我们简单介绍一下currentReader(BinlogSplitReader/SnapshotSplitReader)主要两种实现,大概的思路这里面会根据不同的性质区分进行读取数据,在submitSplit的时候会创建readRask读取指定split的数据,结果会放入StatefulTaskContext的queue中,在fetch方法会先提交split,让其执行read数据,然后通过pollSplitRecords方法在调用queue.poll拉取数据,这是一个阻塞的操作,如果超时则抛出中断异常

代码语言:javascript复制
public class MySqlSplitReader implements SplitReader<SourceRecord, MySqlSplit> {
    private final Queue<MySqlSplit> splits;
    private final MySqlSourceConfig sourceConfig;
    private final int subtaskId;
    @Nullable private DebeziumReader<SourceRecord, MySqlSplit> currentReader;
    @Nullable private String currentSplitId;
  
    @Override
    public RecordsWithSplitIds<SourceRecord> fetch() throws IOException {
        // 执行fetch的时候提前检查一下currentReader,并根据不同的split创建不同的对应的reader,binlog/snapshot
        checkSplitOrStartNext();
        Iterator<SourceRecord> dataIt = null;
        try {
           // 调用具体的debeziumReader执行任务
           // 在reader中会调用StatefulTaskContext的queue的poll方法拉取数据,该方法会阻塞(也可以根据时间阻塞),如果时间间隔内没有返回数据则被中断,抛出InterruptedException
            dataIt = currentReader.pollSplitRecords(); 
        } catch (InterruptedException e) {
            LOG.warn("fetch data failed.", e);
            throw new IOException(e);
        }
        return dataIt == null
                ? finishedSnapshotSplit() // 如果没有读取到数据则返回一个空的,该方法执行后会将currentSplitId置位null,表示已经该split执行完成
                : MySqlRecords.forRecords(currentSplitId, dataIt);
    }

    @Override
    public void handleSplitsChanges(SplitsChange<MySqlSplit> splitsChanges) {
        if (!(splitsChanges instanceof SplitsAddition)) {
            throw new UnsupportedOperationException(
                    String.format(
                            "The SplitChange type of %s is not supported.",
                            splitsChanges.getClass()));
        }
        LOG.debug("Handling split change {}", splitsChanges);
        splits.addAll(splitsChanges.splits());
    }

 
    private void checkSplitOrStartNext() throws IOException {
        // the binlog reader should keep alive
        if (currentReader instanceof BinlogSplitReader) {
            return;
        }

        if (canAssignNextSplit()) {
            final MySqlSplit nextSplit = splits.poll();
            if (nextSplit == null) {
                throw new IOException("Cannot fetch from another split - no split remaining");
            }
            currentSplitId = nextSplit.splitId();

            if (nextSplit.isSnapshotSplit()) {
                if (currentReader == null) {
                    final MySqlConnection jdbcConnection =
                            createMySqlConnection(sourceConfig.getDbzConfiguration());
                    final BinaryLogClient binaryLogClient =
                            createBinaryClient(sourceConfig.getDbzConfiguration());
                    final StatefulTaskContext statefulTaskContext =
                            new StatefulTaskContext(sourceConfig, binaryLogClient, jdbcConnection);
                    currentReader = new SnapshotSplitReader(statefulTaskContext, subtaskId);
                }
            } else {
                // point from snapshot split to binlog split
                if (currentReader != null) {
                    LOG.info("It's turn to read binlog split, close current snapshot reader");
                    currentReader.close();
                }
                final MySqlConnection jdbcConnection =
                        createMySqlConnection(sourceConfig.getDbzConfiguration());
                final BinaryLogClient binaryLogClient =
                        createBinaryClient(sourceConfig.getDbzConfiguration());
                final StatefulTaskContext statefulTaskContext =
                        new StatefulTaskContext(sourceConfig, binaryLogClient, jdbcConnection);
                currentReader = new BinlogSplitReader(statefulTaskContext, subtaskId);
                LOG.info("BinlogSplitReader is created.");
            }
           // 提交一个split到reader,reader会在在submitSplit方法创建ReadTask对象,进行读取数据,将数据放入StatefulTaskContext的queue中,readTask放入线程池执行任务
            currentReader.submitSplit(nextSplit);
        }
    }
    private boolean canAssignNextSplit() {
        return currentReader == null || currentReader.isFinished();
    }
}

上面基本运行流程已经走通了现在就差实际读取数据的阶段了,现在我们直接跟着代码一点一点走 看看实际的执行逻辑是什么样子的,怎么读取数据的,由于cdc的代码比较多,我们就过滤掉了,binlogReadTask太繁琐,我们就不一步一步讲解了,后面可以简单介绍一下流程看看

我们一次看看对于reader对应的处理逻辑

1.checkSplitWithSplitIds方法

在这个方法中最主要的是调用了submitSplit开始我们下面的读取数据的一个流程

代码语言:javascript复制

// -------------------------   SnapshotSplitReader.submitSplit方法  ------------------------------------------

public void submitSplit(MySqlSplit mySqlSplit) {
        this.currentSnapshotSplit = mySqlSplit.asSnapshotSplit();
        statefulTaskContext.configure(currentSnapshotSplit);
     // 拿到context的queue,在pollSplitSrecords的时候需要
        this.queue = statefulTaskContext.getQueue();
        this.nameAdjuster = statefulTaskContext.getSchemaNameAdjuster();
        this.hasNextElement.set(true);
        this.reachEnd.set(false);
     // 主要读取逻辑在readTask中
        this.splitSnapshotReadTask =
                new MySqlSnapshotSplitReadTask(
                        statefulTaskContext.getConnectorConfig(),
                        statefulTaskContext.getOffsetContext(),
                        statefulTaskContext.getSnapshotChangeEventSourceMetrics(),
                        statefulTaskContext.getDatabaseSchema(),
                        statefulTaskContext.getConnection(),
                        statefulTaskContext.getDispatcher(),
                        statefulTaskContext.getTopicSelector(),
                        StatefulTaskContext.getClock(),
                        currentSnapshotSplit);
     // 提交一个runnable到线程中,主要是执行readTask的execute方法
        executor.submit(
                () -> {
                    try {
                        currentTaskRunning = true;
                       // 自己实现的contextImpl 主要记录高水位和低水位用
                        final SnapshotSplitChangeEventSourceContextImpl sourceContext =
                                new SnapshotSplitChangeEventSourceContextImpl();
                       // 执行readTask
                        SnapshotResult snapshotResult =
                                splitSnapshotReadTask.execute(sourceContext);

                        final MySqlBinlogSplit backfillBinlogSplit =
                                createBackfillBinlogSplit(sourceContext);
                        // optimization that skip the binlog read when the low watermark equals high
                        // watermark
                       // 如由于snapshot是并行读取的,所以当该读取该split的数据,低水位和高水位相同,说明在read数据中没有出现其他操作,所以可以退出binlog优化阶段,可以认为该split范围的数据没有变更,不需要在snapshot之后进行binlog的read
                        final boolean binlogBackfillRequired =
                                backfillBinlogSplit
                                        .getEndingOffset()
                                        .isAfter(backfillBinlogSplit.getStartingOffset());
                        if (!binlogBackfillRequired) {
                            dispatchHighWatermark(backfillBinlogSplit);
                            currentTaskRunning = false;
                            return;
                        }

                        // snapshot执行完成后,开始binlogReadTask的读取操作
                        if (snapshotResult.isCompletedOrSkipped()) {
                           // 根据snapshot read task读取结束后,会记录高低水位,水位线作为参数构建binlog read task
                            final MySqlBinlogSplitReadTask backfillBinlogReadTask =
                                    createBackfillBinlogReadTask(backfillBinlogSplit);
                           // 执行binlog read task,由于里面的处理逻辑太复杂了,我们就不直接进行阅读了
                           // 我这里直接简单介绍一下流程,就是拿到snapshot的高水位,作为endOffset,在binlog read task中,会
                           // 以endOffset作为结束条件,小宇endOffset的数据都会被read,并发送下游
                            backfillBinlogReadTask.execute(
                                    new SnapshotBinlogSplitChangeEventSourceContextImpl());
                        } else {
                            readException =
                                    new IllegalStateException(
                                            String.format(
                                                    "Read snapshot for mysql split %s fail",
                                                    currentSnapshotSplit));
                        }
                    } catch (Exception e) {
                        currentTaskRunning = false;
                        LOG.error(
                                String.format(
                                        "Execute snapshot read task for mysql split %s fail",
                                        currentSnapshotSplit),
                                e);
                        readException = e;
                    }
                });
    }


// -------------------------   MySqlSnapshotSplitReadTask.execute(sourceContext)方法  ------------------------------------------
 
 @Override
    public SnapshotResult execute(ChangeEventSourceContext context) throws InterruptedException {
        SnapshottingTask snapshottingTask = getSnapshottingTask(previousOffset);//就是new了一个
        final SnapshotContext ctx;
        try {
            ctx = prepare(context); //重新new了一个 context对象,比较无用
        } catch (Exception e) {
            LOG.error("Failed to initialize snapshot context.", e);
            throw new RuntimeException(e);
        }
        try {
           // 上面都是无用代码,这里直接调用了doExecute方法,我们进入该方法看主要逻辑即可
            return doExecute(context, ctx, snapshottingTask);
        } catch (InterruptedException e) {
            LOG.warn("Snapshot was interrupted before completion");
            throw e;
        } catch (Exception t) {
            throw new DebeziumException(t);
        }
    }

// -------------------------   MySqlSnapshotSplitReadTask.doExecute(sourceContext)方法  ------------------------------------------

 @Override
    protected SnapshotResult doExecute(
            ChangeEventSourceContext context,
            SnapshotContext snapshotContext,
            SnapshottingTask snapshottingTask)
            throws Exception {
        final RelationalSnapshotChangeEventSource.RelationalSnapshotContext ctx =
                (RelationalSnapshotChangeEventSource.RelationalSnapshotContext) snapshotContext;
        ctx.offset = offsetContext;
       // 一个dispatcher,用于记录水位线事件,后面会通过该dispatcher发射数据,当然是通过emitter发射了
        final SignalEventDispatcher signalEventDispatcher =
                new SignalEventDispatcher(
                        offsetContext.getPartition(),
                        topicSelector.topicNameFor(snapshotSplit.getTableId()),
                        dispatcher.getQueue());
    // 其实log输出的日志就已经很清晰了
       // 记录低水位
        final BinlogOffset lowWatermark = currentBinlogOffset(jdbcConnection);
        LOG.info(
                "Snapshot step 1 - Determining low watermark {} for split {}",
                lowWatermark,
                snapshotSplit);
        ((SnapshotSplitReader.SnapshotSplitChangeEventSourceContextImpl) (context))
                .setLowWatermark(lowWatermark);
        signalEventDispatcher.dispatchWatermarkEvent(
                snapshotSplit, lowWatermark, SignalEventDispatcher.WatermarkKind.LOW);

        LOG.info("Snapshot step 2 - Snapshotting data");
       // 读取数据  主要方法重点介绍的地方
        createDataEvents(ctx, snapshotSplit.getTableId());
    // 记录高水位
        final BinlogOffset highWatermark = currentBinlogOffset(jdbcConnection);
        LOG.info(
                "Snapshot step 3 - Determining high watermark {} for split {}",
                highWatermark,
                snapshotSplit);
        signalEventDispatcher.dispatchWatermarkEvent(
                snapshotSplit, highWatermark, SignalEventDispatcher.WatermarkKind.HIGH);
        ((SnapshotSplitReader.SnapshotSplitChangeEventSourceContextImpl) (context))
                .setHighWatermark(highWatermark);

        return SnapshotResult.completed(ctx.offset);
    }
 
// 我们看看createDataEvents 调用过程

private void createDataEvents(
            RelationalSnapshotChangeEventSource.RelationalSnapshotContext snapshotContext,
            TableId tableId)
            throws Exception {
        EventDispatcher.SnapshotReceiver snapshotReceiver =
                dispatcher.getSnapshotChangeEventReceiver();
        LOG.debug("Snapshotting table {}", tableId);
        createDataEventsForTable(
                snapshotContext, snapshotReceiver, databaseSchema.tableFor(tableId));
     // receiver的逻辑我们就不看了,我这里介绍一下就好
     // receiver通过changeRecord方法接收到数据后,通过一个成员变量(bufferedEvent)控制,如果!=null加入队列,然后创建一个新的SourceRecord,直到所有的数据读取完成,所以说最后一条数据创建成功之后,如果没有新的数据了,则不会调用changeRecord该方法,也就是说成员变量记录了最后一个record
     // 这里调用completeSnapshot方法的时候会对bufferedEvent变量进行判断,如果不等于null做一些complete相关的工作最后加入队列中,如果不调用该方法,则当前split的snapshot阶段读取的数据少了一条,嘻嘻嘻
        snapshotReceiver.completeSnapshot();
    }

// createDataEvents中调用到本类的createDataEventsForTable,也就是开始了具体读取逻辑

    private void createDataEventsForTable(
            RelationalSnapshotChangeEventSource.RelationalSnapshotContext snapshotContext,
            EventDispatcher.SnapshotReceiver snapshotReceiver,
            Table table)
            throws InterruptedException {

        long exportStart = clock.currentTimeInMillis();
        LOG.info("Exporting data from split '{}' of table {}", snapshotSplit.splitId(), table.id());
    
       // 构建sql
        final String selectSql =
                StatementUtils.buildSplitScanQuery(
                        snapshotSplit.getTableId(),
                        snapshotSplit.getSplitKeyType(),
                        snapshotSplit.getSplitStart() == null,
                        snapshotSplit.getSplitEnd() == null);
        LOG.info(
                "For split '{}' of table {} using select statement: '{}'",
                snapshotSplit.splitId(),
                table.id(),
                selectSql);
    
        try (PreparedStatement selectStatement =
                        StatementUtils.readTableSplitDataStatement( // 创建statement,然后查询sql
                                jdbcConnection,
                                selectSql,
                                snapshotSplit.getSplitStart() == null,
                                snapshotSplit.getSplitEnd() == null, snapshotSplit.getSplitStart(),
                                snapshotSplit.getSplitEnd(),
                                snapshotSplit.getSplitKeyType().getFieldCount(),
                                connectorConfig.getQueryFetchSize());
             // 然后对查询出来的数据进行封装成sourceRecord发送下游
                ResultSet rs = selectStatement.executeQuery()) {

            ColumnUtils.ColumnArray columnArray = ColumnUtils.toArray(rs, table);
            long rows = 0;
            Threads.Timer logTimer = getTableScanLogTimer();

            while (rs.next()) {
                rows  ;
                final Object[] row = new Object[columnArray.getGreatestColumnPosition()];
                for (int i = 0; i < columnArray.getColumns().length; i  ) {
                    Column actualColumn = table.columns().get(i);
                    row[columnArray.getColumns()[i].position() - 1] =
                            readField(rs, i   1, actualColumn, table);
                }
                if (logTimer.expired()) {
                    long stop = clock.currentTimeInMillis();
                    LOG.info(
                            "Exported {} records for split '{}' after {}",
                            rows,
                            snapshotSplit.splitId(),
                            Strings.duration(stop - exportStart));
                    snapshotProgressListener.rowsScanned(table.id(), rows);
                    logTimer = getTableScanLogTimer();
                }
                // 这里会将数据放入队列,通过receiver接收数据,然后再将数据放入其队列的一个过程,其实不必深入,就是封装的比较好,难以理解
                dispatcher.dispatchSnapshotEvent(
                        table.id(),
                        getChangeRecordEmitter(snapshotContext, table.id(), row),// 就是new了一个
                        snapshotReceiver);
            }
            LOG.info(
                    "Finished exporting {} records for split '{}', total duration '{}'",
                    rows,
                    snapshotSplit.splitId(),
                    Strings.duration(clock.currentTimeInMillis() - exportStart));
        } catch (SQLException e) {
            throw new ConnectException("Snapshotting of table "   table.id()   " failed", e);
        }
    }


// -------------------------   dispatcher.dispatchSnapshotEvent方法之后的流程  ----------------------------------

 // 进入evnentDisptcher.dispatchSnapshotEvent方法
   public void dispatchSnapshotEvent(T dataCollectionId, ChangeRecordEmitter changeRecordEmitter, SnapshotReceiver receiver) throws InterruptedException {
        DataCollectionSchema dataCollectionSchema = schema.schemaFor(dataCollectionId);
        if (dataCollectionSchema == null) {
            errorOnMissingSchema(dataCollectionId, changeRecordEmitter);
        }

        changeRecordEmitter.emitChangeRecords(dataCollectionSchema, new Receiver() {
            @Override
            public void changeRecord(DataCollectionSchema schema,
                                     Operation operation,
                                     Object key, Struct value,
                                     OffsetContext offset,
                                     ConnectHeaders headers)
                    throws InterruptedException {
                eventListener.onEvent(dataCollectionSchema.id(), offset, key, value);
               // 真正的放入队列的逻辑在这里调用
               // receiver使我们传入的  对应BufferingSnapshotChangeRecordReceiver类
                receiver.changeRecord(dataCollectionSchema, operation, key, value, offset, headers);
            }
        });
    }

  // BufferingSnapshotChangeRecordReceiver的changeRecord方法
 // 前面简单介绍过他的处理逻辑了,就不必多做介绍了
  @Override
        public void changeRecord(DataCollectionSchema dataCollectionSchema,
                                 Operation operation,
                                 Object key, Struct value,
                                 OffsetContext offsetContext,
                                 ConnectHeaders headers)
                throws InterruptedException {
            Objects.requireNonNull(value, "value must not be null");

            LOGGER.trace("Received change record for {} operation on key {}", operation, key);

            if (bufferedEvent != null) {
                queue.enqueue(bufferedEvent.get());
            }

            Schema keySchema = dataCollectionSchema.keySchema();
            String topicName = topicSelector.topicNameFor((T) dataCollectionSchema.id());

            // the record is produced lazily, so to have the correct offset as per the pre/post completion callbacks
            bufferedEvent = () -> {
                SourceRecord record = new SourceRecord(
                        offsetContext.getPartition(),
                        offsetContext.getOffset(),
                        topicName, null,
                        keySchema, key,
                        dataCollectionSchema.getEnvelopeSchema().schema(), value,
                        null, headers);
                return changeEventCreator.createDataChangeEvent(record);
            };
        }
2.pollSplitRecords方法,这个方法是拉取queue中的数据

上面把数据读取后写入到queue的流程已经捋清楚了,我们现在看看reader是在什么时候读取了queue的数据

代码语言:javascript复制
public Iterator<SourceRecord> pollSplitRecords() throws InterruptedException {
        checkReadException();

        if (hasNextElement.get()) {
            // data input: [low watermark event][snapshot events][high watermark event][binlog
            // events][binlog-end event]
            // data output: [low watermark event][normalized events][high watermark event]
            boolean reachBinlogEnd = false;
            final List<SourceRecord> sourceRecords = new ArrayList<>();
            while (!reachBinlogEnd) {
               // 可以看到这里直接queue.poll直接拉取数据即可,在这里会判断一下当前evnet是否是到达了结束的水位线,实际上就是高水位的位置,到达结束水位线之后,我们就可以停止了
                List<DataChangeEvent> batch = queue.poll();
                for (DataChangeEvent event : batch) {
                    sourceRecords.add(event.getRecord());
                    if (RecordUtils.isEndWatermarkEvent(event.getRecord())) {
                        reachBinlogEnd = true;
                        break;
                    }
                }
            }
            // snapshot split return its data once
            hasNextElement.set(false);
            return normalizedSplitRecords(currentSnapshotSplit, sourceRecords, nameAdjuster)
                    .iterator();
        }
        // the data has been polled, no more data
        reachEnd.compareAndSet(false, true);
        return null;
    }

通过上面的阅读我们已经看到了数据从读取到,再到放入队列中的一个过程,这里对队列做一个说明,在上面的介绍中存在两个队列,一个queue,一个elementQueue,这个的区别在于,queue是在读取数据阶段,将数据放入queue,在FetcherTask中调用了reader的fetch方法,将queue中的数据拉取到,并将其加入到elementQueue

在上面的操作中数据已经放入了elementQueue中,现在我们看看elementQueue中的数据是在什么时候发送到下游的

这里我们需要重新回到MysqlSource中通过一张图来看看

我们在创建Reader的时候传入了一个MysqlRecordEmitter,在后面发送数据的时候是通过这个类

对于发送数据到下游的逻辑是在MysqlSourceReader的父类(SourceReaderBase)中,但是发送的类是有Emitter完成的

由于相关方法是有上层调用执行,我们就不多看了,就简单说明一下,系统调用SourceReaderBase.pollNext(),开始触发数据collect的操作,将其发送至下游节点

我们这里直接阅读MysqlRecordEmitter源码看看他发送数据的逻辑,其实到这里跟sourceFucntion实现的原理基本差不多了我们这里简单过一下即可

代码语言:javascript复制
public final class MySqlRecordEmitter<T> implements RecordEmitter<SourceRecord, T, MySqlSplitState> {

    private static final FlinkJsonTableChangeSerializer TABLE_CHANGE_SERIALIZER =
            new FlinkJsonTableChangeSerializer();

    private final DebeziumDeserializationSchema<T> debeziumDeserializationSchema;
    private final boolean includeSchemaChanges;
    private final OutputCollector<T> outputCollector;

    public MySqlRecordEmitter(
            DebeziumDeserializationSchema<T> debeziumDeserializationSchema,
            MySqlSourceReaderMetrics sourceReaderMetrics,
            boolean includeSchemaChanges) {
       // 对数据Deserialization的一个对象,与单并行度的类是同一个,具体内部逻辑可以自己看
        this.debeziumDeserializationSchema = debeziumDeserializationSchema;
        this.sourceReaderMetrics = sourceReaderMetrics;
        this.includeSchemaChanges = includeSchemaChanges;
        this.outputCollector = new OutputCollector<>();
    }

    @Override
    public void emitRecord(SourceRecord element, SourceOutput<T> output, MySqlSplitState splitState)
            throws Exception {
      
        // 判断一下消息的事件类型,如果是一个sourceReocrd则发送下游,否则对应其事件的相关操作
        if (isWatermarkEvent(element)) {
            BinlogOffset watermark = getWatermark(element);
            if (isHighWatermarkEvent(element) && splitState.isSnapshotSplitState()) {
                splitState.asSnapshotSplitState().setHighWatermark(watermark);
            }
        } else if (isSchemaChangeEvent(element) && splitState.isBinlogSplitState()) {
            HistoryRecord historyRecord = getHistoryRecord(element);
            Array tableChanges =
                    historyRecord.document().getArray(HistoryRecord.Fields.TABLE_CHANGES);
            TableChanges changes = TABLE_CHANGE_SERIALIZER.deserialize(tableChanges, true);
            for (TableChanges.TableChange tableChange : changes) {
                splitState.asBinlogSplitState().recordSchema(tableChange.getId(), tableChange);
            }
            if (includeSchemaChanges) {
                emitElement(element, output);
            }
        } else if (isDataChangeRecord(element)) {
            if (splitState.isBinlogSplitState()) {
                BinlogOffset position = getBinlogPosition(element);
                splitState.asBinlogSplitState().setStartingOffset(position);
            }
            reportMetrics(element);
            emitElement(element, output);
        } else {
            // unknown element
            LOG.info("Meet unknown element {}, just skip.", element);
        }
    }
 
    
    private void emitElement(SourceRecord element, SourceOutput<T> output) throws Exception {
        outputCollector.output = output;
       // 调用不同的Deserialization.deserialize方法完成数据的转换以及发送到下游
        debeziumDeserializationSchema.deserialize(element, outputCollector);
    }
  
    private static class OutputCollector<T> implements Collector<T> {
        private SourceOutput<T> output;
        @Override
        public void collect(T record) {
            output.collect(record);
        }
    }
}

四.题外话 <Table相关内容>

代码语言:javascript复制
// RowData的DeserializeSchema,对应上面使用到的DeserializeSchema 做一个简单的介绍
public final class RowDataDebeziumDeserializeSchema
        implements DebeziumDeserializationSchema<RowData> {
      @Override
    public void deserialize(SourceRecord record, Collector<RowData> out) throws Exception {
        Envelope.Operation op = Envelope.operationFor(record);
        Struct value = (Struct) record.value();
        Schema valueSchema = record.valueSchema();
       // 针对不同的操作类型,我们需要器对应的数据  
       // after表示更改之后的数据结果  before表示更改之前的数据
       // 只有update的时候才需要同时使用 before和after(动动聪明脑袋瓜为啥呢)
       // 针对不同的操作使用RowKind进行表示,在sql层面会根据数据的标识来进行对应处理,比如insert,比如update操作
        // 所以对于table内容,我们只需要将数据转换成对应的RowData类型,并对其表示RowKind类型,框架便会帮我在sink的时候
       // 做出对应的操作,我们无需编写相关代码来实现
       // 所以 sql中使用 cdc 我们需要将其加上RowKind,对于后面的操作我们就无需关心了
      
       // 我们在sql中 formt格式是json,实际上走的逻辑这里我也没看,因为我不太关心,如果大家关心的话或者想了解的,大家可以自己去看看
       // 具体的实现方式
        if (op == Envelope.Operation.CREATE || op == Envelope.Operation.READ) {
           // 构建RowData的方法,我们有加进来,里面内容比较繁琐
           // 讲讲大概内容
           // 就是对于我们字段的类型,转换成对应的java中类型,当然这里面我说的是flink中,因为他对用到的类型都重新做了一层封装,这样才能支持我们db中的所有类型
            GenericRowData insert = extractAfterRow(value, valueSchema);
            validator.validate(insert, RowKind.INSERT);
            insert.setRowKind(RowKind.INSERT);
           // 具体的发送下游方法
            emit(record, insert, out);
        } else if (op == Envelope.Operation.DELETE) {
            GenericRowData delete = extractBeforeRow(value, valueSchema);
            validator.validate(delete, RowKind.DELETE);
            delete.setRowKind(RowKind.DELETE);
            emit(record, delete, out);
        } else {
            GenericRowData before = extractBeforeRow(value, valueSchema);
            validator.validate(before, RowKind.UPDATE_BEFORE);
            before.setRowKind(RowKind.UPDATE_BEFORE);
            emit(record, before, out);

            GenericRowData after = extractAfterRow(value, valueSchema);
            validator.validate(after, RowKind.UPDATE_AFTER);
            after.setRowKind(RowKind.UPDATE_AFTER);
            emit(record, after, out);
        }
    }
  
}

对于我们想实现一个tableSource的话,我们需要继承DynamicTableSourceFactory,实现下面的方法,然后通过spi的方法将其动态的加载

  • createDynamicTableSource : 创建一个具体的source
  • factoryIdentifier : 可以认为是我们描述connector的名字,比如kafka
  • requiredOptions : with后面所必须要有参数,比如 username,password等,如果没有抛出异常
  • optionalOptions : with后面的配置项,这是可选择的可有可无

0 人点赞