前言
flink-cdc源码地址 : https://github.com/ververica/flink-cdc-connectors flink-cdc不再flink项目中,在flink1.11之后flink引入cdc功能,下面我们以源码深入了解flink-cdc实现原理, 我们主要以flink-cdc-mysql为主,其余代码基本差不太多 事先需要先简单了解一下debezium相关原理,flink-cdc是基于debezium实现的
一点建议 :
- 在阅读源码的时候,我们应该带着问题去思考,然后一步一步去阅读源码,在阅读源码的过程中,不要被一些不重要的点给占用过多的时间精力,并且一遍两遍是不会让我有一个清晰的印象的,毕竟别人多少年多少人的开发,看一两遍就可以理解的,在阅读某个框架源码之前,我们应该已经对该框架原理有一定的理解,然后根据我们的理解去验证他是代码实现的样子,或者带着思考去阅读,为什么这么实现,这么实现的好处是什么等.,其实代码都是一样的,只不过是每个人的实现方式不同,考虑的问题不同而已,
- 要有一定的java基础,熟悉多线程,了解开发使用的相关接口(或者自己看了介绍之后很容易理解),如果基础不牢,更多的是建议先从基础学习,然后写一写代码测试,比如多线程的时候怎么做交互等,自己写一写,在后面阅读源码的时候会更容理解里面内容
- 该内容要首先对cdc有一定的了解,知道cdc的相关原理,flink-cdc的实现基于debezium实现,该框架是开源的,可以先去了解一下,这样对于我们后面内容会更容易理解
谨记: 阅读的时候抓住重点!!!!!!!! , 不要被不重要的内容占用时间
一.项目结构(mysql-cdc为主)
1. 目录结构
- 带有test项目都是用于测试的项目
- 后缀带有cdc的表示一个database的连接器,区分sql与api形式
- flink-format-changelog-json : 用于解析json成RowData的模块
- flink-connector-debezium : 该模块封装debezium以及相关核心代码实现,并且修改了debezium的部分源码
- 每个项目中都有test目录,里面有相关的测试代码,可以自行测试代码debug
2. mysql项目源码包结构
- debezium : debezium用到的相关类
- schema : mysql schema(表结构)相关代码
- source : mysql-cdc source实现代码,包括全量读mysql,分割器,读取器等相关
- table : cdc table实现代码主要以table dynamic factory的实现
- resrouces : 该目录用与spi方式动态加载table factory,用于sql创建table找到对应的工厂类
二.mysql-cdc源码-SourceFucntion的单并行度的实现
- 基于RichSourceFunction的,单并行读取 1,11之前的source接口,已被标记Deprecated
- 基于Source的,多并行度,1.11之后新出的srouce接口,实现要更复杂
我们主要根据单并行度源码基于讲解,这样更方便理解
具体入手我们可以根据文档中的创建source的类来一点一点走
代码语言:javascript复制MySqlSource通过构建者模式(23种设计模式)构建,我们只需要知道我们可以设置哪些参数即可,这个比较容易理解
// 通过构建者方式配置任务启动时候所需要的参数
public static class Builder<T> {// MysqlSourcen内部类
private int port = 3306; // default 3306 port
private String hostname;
private String[] databaseList;
private String username;
private String password;
private Integer serverId;
private String serverTimeZone; //时区
private String[] tableList;
private Properties dbzProperties; // 传入的dbz引擎所需的属性
private StartupOptions startupOptions = StartupOptions.initial(); // 用于控制开始binlog开始消费位置的参数
private DebeziumDeserializationSchema<T> deserializer; // 用于对数据解析成什么样子如json,String等,定义序列化方式
//上面参数配置完成,通过build构建sourceFunctionn,主要将配置信息封装带properties中,这里面的参数主要是debezium所需要启动参数,配置信息等,如果想要了解可以去debezium官网查看参数的具体细节
public DebeziumSourceFunction<T> build() {
Properties props = new Properties();
props.setProperty("connector.class", MySqlConnector.class.getCanonicalName());
// hard code server name, because we don't need to distinguish it, docs:
// Logical name that identifies and provides a namespace for the particular MySQL
// database
// server/cluster being monitored. The logical name should be unique across all other
// connectors,
// since it is used as a prefix for all Kafka topic names emanating from this connector.
// Only alphanumeric characters and underscores should be used.
props.setProperty("database.server.name", DATABASE_SERVER_NAME);
props.setProperty("database.hostname", checkNotNull(hostname));
props.setProperty("database.user", checkNotNull(username));
props.setProperty("database.password", checkNotNull(password));
props.setProperty("database.port", String.valueOf(port));
props.setProperty("database.history.skip.unparseable.ddl", String.valueOf(true));
// debezium use "long" mode to handle unsigned bigint by default,
// but it'll cause lose of precise when the value is larger than 2^63,
// so use "precise" mode to avoid it.
props.put("bigint.unsigned.handling.mode", "precise");
if (serverId != null) { props.setProperty("database.server.id", String.valueOf(serverId)); }
if (databaseList != null) { props.setProperty("database.whitelist", String.join(",", databaseList)); }
if (tableList != null) { props.setProperty("table.whitelist", String.join(",", tableList));}
if (serverTimeZone != null) { props.setProperty("database.serverTimezone", serverTimeZone); }
// 判断开始消费位置,在sqlSourceBuilder中构建的参数,没有则为null
DebeziumOffset specificOffset = null;
switch (startupOptions.startupMode) {
case INITIAL:
props.setProperty("snapshot.mode", "initial");
break;
case EARLIEST_OFFSET:
props.setProperty("snapshot.mode", "never");
break;
case LATEST_OFFSET:
props.setProperty("snapshot.mode", "schema_only");
break;
case SPECIFIC_OFFSETS:
props.setProperty("snapshot.mode", "schema_only_recovery");
specificOffset = new DebeziumOffset();
Map<String, String> sourcePartition = new HashMap<>();
sourcePartition.put("server", DATABASE_SERVER_NAME);
specificOffset.setSourcePartition(sourcePartition);
Map<String, Object> sourceOffset = new HashMap<>();
sourceOffset.put("file", startupOptions.specificOffsetFile);
sourceOffset.put("pos", startupOptions.specificOffsetPos);
specificOffset.setSourceOffset(sourceOffset);
break;
case TIMESTAMP:
checkNotNull(deserializer);
props.setProperty("snapshot.mode", "never");
deserializer =
new SeekBinlogToTimestampFilter<>(
startupOptions.startupTimestampMillis, deserializer);
break;
default:
throw new UnsupportedOperationException();
}
if (dbzProperties != null) {
props.putAll(dbzProperties);
// Add default configurations for compatibility when set the legacy mysql connector
// implementation
if (LEGACY_IMPLEMENTATION_VALUE.equals(
dbzProperties.get(LEGACY_IMPLEMENTATION_KEY))) {
props.put("transforms", "snapshotasinsert");
props.put(
"transforms.snapshotasinsert.type",
"io.debezium.connector.mysql.transforms.ReadToInsertEvent");
}
}
// 构建通用的cdc sourceFunction --> 基于richSourceFunction
return new DebeziumSourceFunction<>(
deserializer, props, specificOffset,
new MySqlValidator(props) // mysql校验器,版本信息,binlog是否为row等
);
}
}
上面内容主要是以构建source所需要的参数为主,具体我们进入到DebeziumSourceFunction中看看具体实现
代码语言:javascript复制// source代码,用于读取binlog,logminer等
// 实现richSourceFuntion完成source端代码的编写,实现ChecckpointFunction用于保证容错相关的内容,实现checkpointListener监听checkpoint的完成状态
public class DebeziumSourceFunction<T> extends RichSourceFunction<T>
implements CheckpointedFunction, CheckpointListener, ResultTypeQueryable<T> {
// ------------------------------列出一些比较重要的成员变量,不重要的忽略了------------------------------------------
// ----------------------------------State-------------------------------------------------
/* 主要用于状态的维护,当任务出现问题重启/手动重启后,维护的一些schema(record中的结构) 未消费的records(在queue中,后面会看到) offset等信息 */
private transient volatile String restoredOffsetState;
private transient ListState<byte[]> offsetState;
private transient ListState<String> schemaRecordsState;
// -----------------------------------Worker-----------------------------------------------
/* 一个单线程的线程池,一个debeziumEngine(一个runnable的实现类)用与读取binlog数据
TODO 所以设计到多线程的交互*/
private transient ExecutorService executor;
private transient DebeziumEngine<?> engine;
/* 一个consumer,用于从engine中读取数据的消费者,并将数据放入handover中 */
private transient DebeziumChangeConsumer changeConsumer;
/* 用于从handover中拿取数据 */
private transient DebeziumChangeFetcher<T> debeziumChangeFetcher;
/* 两个线程(source,engine)之间交互数据的一个桥梁 */
private transient Handover handover;
// ----------------------------------------我们主要介绍srouce的run方法,其他方法主要用于容错相关--------------------------------------
@Override
public void run(SourceContext<T> sourceContext) throws Exception {
// TODO 用于engine执行的一些相关参数,不是终点内容,如果感兴趣可官网看看说明
properties.setProperty("name", "engine");
properties.setProperty("offset.storage", FlinkOffsetBackingStore.class.getCanonicalName());
if (restoredOffsetState != null) {
properties.setProperty(FlinkOffsetBackingStore.OFFSET_STATE_VALUE, restoredOffsetState);
}
properties.setProperty("include.schema.changes", "false");
properties.setProperty("offset.flush.interval.ms", String.valueOf(Long.MAX_VALUE));
properties.setProperty("tombstones.on.delete", "false");
if (engineInstanceName == null) {
engineInstanceName = UUID.randomUUID().toString();
}
properties.setProperty(
FlinkDatabaseHistory.DATABASE_HISTORY_INSTANCE_NAME, engineInstanceName);
properties.setProperty("database.history", determineDatabase().getCanonicalName());
String dbzHeartbeatPrefix =
properties.getProperty(
Heartbeat.HEARTBEAT_TOPICS_PREFIX.name(),
Heartbeat.HEARTBEAT_TOPICS_PREFIX.defaultValueAsString());
this.debeziumChangeFetcher =
new DebeziumChangeFetcher<>(
sourceContext,
deserializer,
restoredOffsetState == null, // 是否是快照阶段或者state==null?
dbzHeartbeatPrefix,
handover);
// 创建并配置engine相关参数
this.engine =
DebeziumEngine.create(Connect.class)
.using(properties)// 参数
.notifying(changeConsumer) // 配饰consumer消费 engine读取的数据(binlog/历史数据)
.using(OffsetCommitPolicy.always()) // offset的提交策略
.using(
(success, message, error) -> {
if (success) {
handover.close();
} else {
handover.reportError(error);
}
})
.build();
// 将engine任务提交到线程池中执行
executor.execute(engine);
debeziumStarted = true;
// mertic相关配置i
MetricGroup metricGroup = getRuntimeContext().getMetricGroup();
// ....
// 启动fetcher,循环去hanover中拿取最新数据发送下游
debeziumChangeFetcher.runFetchLoop();
}
}
上面我们已经看了source.run的基本实现,他的主要处理逻辑在DebeziumChangeConsumer,DebeziumChangeFetcher,Handover中
简单介绍三个类的作用和 主要 方法
和参数
DebeziumChangeConsumer : 用于消费engine读取的数据
代码语言:javascript复制/* 该类实现 DebeziumEngine.ChangeConsumer接口,实现handlerBatch方法 相对比较简单, 另外两个成员方法主要是offset相关,非重点内容*/
// engine线程会调用handleBatch方法出传递引擎消费到的数据
public class DebeziumChangeConsumer
implements DebeziumEngine.ChangeConsumer<ChangeEvent<SourceRecord, SourceRecord>> {
@Override
public void handleBatch(
List<ChangeEvent<SourceRecord, SourceRecord>> events,
RecordCommitter<ChangeEvent<SourceRecord, SourceRecord>> recordCommitter) {
try {
currentCommitter = recordCommitter;
// 间接调用到handover的produce方法,该方法是阻塞的 嘻嘻嘻(如果有历史records未被消费则wait)
handover.produce(events);
} catch (Throwable e) {
// Hold this exception in handover and trigger the fetcher to exit
handover.reportError(e);
}
}
}
DebeziumChangeFetcher : 循环从handover中获取consumer从engine读取的最新数据
代码语言:javascript复制public class DebeziumChangeFetcher<T> {
private final SourceFunction.SourceContext<T> sourceContext;
/* 保证数据发送和状态更新的一把锁 */
private final Object checkpointLock;
/* 用于将数据转化成我们自定义的类型,如json,string等 */
private final DebeziumDeserializationSchema<T> deserialization;
/* 下面自定义的collector*/
private final DebeziumCollector debeziumCollector;
/* 见名知意,很好理解 */
private final DebeziumOffset debeziumOffset;
/* 用于存储在stateoffset的序列化器*/
private final DebeziumOffsetSerializer stateSerializer;
/* 心跳相关*/
private final String heartbeatTopicPrefix;
/* 是否恢复的状态,需要消费历史相关数据*/
private boolean isInDbSnapshotPhase;
private final Handover handover;
public void runFetchLoop() throws Exception {
try {
// 读取mysql历史的数据,不要被名字所迷惑
if (isInDbSnapshotPhase) {
List<ChangeEvent<SourceRecord, SourceRecord>> events = handover.pollNext();
synchronized (checkpointLock) {
LOG.info(
"Database snapshot phase can't perform checkpoint, acquired Checkpoint lock.");
handleBatch(events);
// 这里防止snapshot数据无法一次读取完毕,必须保证snapshot数据读取完毕才进入binlog的读取
while (isRunning && isInDbSnapshotPhase) {
handleBatch(handover.pollNext());
}
}
LOG.info("Received record from streaming binlog phase, released checkpoint lock.");
}
// 到这里表示snapshot的数据读取完毕,开始实时读取binlog数据
while (isRunning) {
// 具体的处理数据逻辑 pollNext会阻塞
handleBatch(handover.pollNext());
}
} catch (Handover.ClosedException e) {
// ignore
}
private void handleBatch(List<ChangeEvent<SourceRecord, SourceRecord>> changeEvents)
throws Exception {
if (CollectionUtils.isEmpty(changeEvents)) {
return;
}
this.processTime = System.currentTimeMillis();
for (ChangeEvent<SourceRecord, SourceRecord> event : changeEvents) {
SourceRecord record = event.value();
// time相关基本都是metric相关内容,不必较真
updateMessageTimestamp(record);
fetchDelay = processTime - messageTimestamp;
// 通过心跳机制来更新offset
if (isHeartbeatEvent(record)) {
synchronized (checkpointLock) {
debeziumOffset.setSourcePartition(record.sourcePartition());
debeziumOffset.setSourceOffset(record.sourceOffset());
}
continue;
}
// 根据不同的deserialization对数据做转换,---> 可以看这个,比较容易理解StringDebeziumDeserializationSchema, 内部直接 record.toString即可,就是将debezium读取的record转换成我们想要的格式或者类型,debeziumCollector 就是下面自定义的collector,在deserialize中,会将转换完成的数据放入queue中
deserialization.deserialize(record, debeziumCollector);
// 判断数据是否为snapshot的最后一条数据,如果是则在这条数据之后转换到binlog的streaming流程
if (!isSnapshotRecord(record)) {
LOG.debug("Snapshot phase finishes.");
isInDbSnapshotPhase = false;// runFetchLoop方法中使用
}
// 具体发送数据
emitRecordsUnderCheckpointLock(
debeziumCollector.records, record.sourcePartition(), record.sourceOffset());
}
}
private void emitRecordsUnderCheckpointLock(
Queue<T> records, Map<String, ?> sourcePartition, Map<String, ?> sourceOffset) {
// 同步是保证数据的发送和offset的更新是安全,lock是可重入的(不懂可以百度,java基础内容)
synchronized (checkpointLock) {
T record;
// 循环debeziumCollector的records队列,将队列中的数据依次发送到下游,
while ((record = records.poll()) != null) {
emitDelay = System.currentTimeMillis() - messageTimestamp;
// 通过source的context对象将其发送到下游operator,这里转入了flink的处理逻辑,不再cdc代码之内
sourceContext.collect(record);
}
debeziumOffset.setSourcePartition(sourcePartition);
debeziumOffset.setSourceOffset(sourceOffset);
}
}
// 心跳机制 ,用于更新offset的机制
private boolean isHeartbeatEvent(SourceRecord record) {
String topic = record.topic();
return topic != null && topic.startsWith(heartbeatTopicPrefix);
}
// --------------------------------自定义collector-------------------------------------------------------
private class DebeziumCollector implements Collector<T> {
private final Queue<T> records = new ArrayDeque<>();
@Override
public void collect(T record) {
// 将数据放入队列,queue会在别的地方进出列将数据发送下游
records.add(record);
}
}
}
Handover : source线程和engine线程执行中数据交互桥梁
代码语言:javascript复制/* 这个类由两个线程访问, pollNext由debeziumFetcher调用,produce有debeziumConsumer调用,因为涉及多线程的调用,单纯的讲代码可能不容易理解,可以去复习一下java多线程知识内容,或者自己debug一下看看调用流程就比较容易理解了 */
@ThreadSafe //表示类是线程安全的,这类涉及engine和source线程两个线程操作,内部的实现保证了线程安全
public class Handover implements Closeable {
private static final Logger LOG = LoggerFactory.getLogger(Handover.class);
private final Object lock = new Object();
@GuardedBy("lock") // 注解表示该变量受lock的保护, 不是重点勿关注
private List<ChangeEvent<SourceRecord, SourceRecord>> next;
@GuardedBy("lock")
private Throwable error;
private boolean wakeupProducer;
/* debeziumFetcher 调用,当没有数据的时候进入wait状态,wait状态的时候cpu是不会调用wait状态的线程,另一个线程就可以占用cpu的全部时间片*/
public List<ChangeEvent<SourceRecord, SourceRecord>> pollNext() throws Exception {
// 同步代码块才可以使用wait和notifyAll,为什么使用这种方式,因为只有两个线程,所以这种方式实现简单,如果线程多可以通过juc的lock去做或者其他方式也可以
synchronized (lock) {
// 没有数据没有异常则持续循环进入wait状态,为了防止虚假唤醒的情况
while (next == null && error == null) {
lock.wait();
}
List<ChangeEvent<SourceRecord, SourceRecord>> n = next;
// 上面的循环可以退出的时候,说明一定是有数据或者有异常,不存在其他的情况
if (n != null) {
// 将next置为null 下面会根据此条件作为判断条件
next = null;
// 唤醒其他等待线程,当然只可能是engine线程
lock.notifyAll();
return n;
} else {
// 将异常抛出
ExceptionUtils.rethrowException(error, error.getMessage());
// 上面方法一定会抛出异常,改代码只是为了去掉编译警告...
return Collections.emptyList();
}
}
}
public void produce(final List<ChangeEvent<SourceRecord, SourceRecord>> element)
throws InterruptedException {
checkNotNull(element);
synchronized (lock) {
// next不等一直进入wait状态
while (next != null && !wakeupProducer) {
lock.wait();
}
wakeupProducer = false;
// 有异常抛出异常,没异常将接受新数据,并唤醒fetcher线程
if (error != null) {
ExceptionUtils.rethrow(error, error.getMessage());
} else {
next = element;
lock.notifyAll();
}
}
}
}
上面代码即是基于RichSourceFunction实现的cdc主要
代码,其实不算难,但是前人写的代码是已经把很多问题已经考虑进入,对代码的抽象也很好,扩展起来很方便,api设计对与我们开发者来说很容易使用
三.mysql-cdc源码-新Source接口的实现
1.11版本之后flink提供了新的source接口,可以提前预习一波https://issues.apache.org/jira/browse/FLINK-10740
代码语言:javascript复制简单介绍一下 SourceReader : 对split的的数据进行读取操作,比如: 读取一个分区,一个块等,当然不只局限与一个分区,根据自己的实现来 SplitEnumerator : 负责对数据源进行切分或者发现分区等,比如: 发现kafka的分区,对文件划分块等 上述的比较简单,实际上比这复杂一点,所以在新的source接口实现一个source是比较难的事情,不过熟悉之后都一样 提前说明 : 一个split我们可以认为是一个切片,mysql-cdc中, 假想情况下 : 一张的一部分中, 比如 开始主键 1 到 结束主键 10 ,那么该split就表示这些数据,在具体读取数据的时候是有readTask来去读,那么他就会通过split标记的点位来进行数据的读取,当然一个readTask不止会执行一个split; snapshot表示的是读取数据库的历史全量数据 binlog 表示当我们snapshot阶段结束后开始binlog阶段,即我们开始读取的binlog数据了 先执行snapshot阶段,后执行binlog阶段
代码的生成和旧版是相同的,只不过是内部执行的逻辑存在变化,新的source接口实现的cdc代码比较复杂,涉及的内容比较多,可能比较晕,后面自己可以根据源码debug走一走 由于代码过多,主要讲解重点的内容,不重要的跳过了
// 实现了两个接口 source,和 resultTypeQueryable(比较简单就一个获取结果类型信息的接口) , 主要代码还是在source接口的实现
// T 为输出类型,MySqlSplit是mysql的分割器,PendingSplitsState表示Enumerator的状态对象
public class MySqlSource<T>
implements Source<T, MySqlSplit, PendingSplitsState>, ResultTypeQueryable<T> {
private final MySqlSourceConfigFactory configFactory;
private final DebeziumDeserializationSchema<T> deserializationSchema;
/* 通过构造者模式构建source所需要的参数,简单说明一下,里面的参数,通过MySqlSourceConfigFactory添加参数,在build方法中,将factory作为参数构建出MySqlSource
-------------------------------------讲解一下对应关系------------------------------------------------
MySqlSourceConfigFactory 可以根据不同的subtask创建对应的MySqlSourceConfig
MySqlSourceConfig 可以构建 MySqlConnectorConfig
MySqlConnection 通过 DebeziumUtil.createMySqlConnection(mySqlSourceConfig.getDbzConfiguration())方法构建
上面的一个config比较混乱,名字也比较不容易理解,后面用到的时候会简单提一下,这里主要是有一个印象,不要被一些配置搞混
*/
public static <T> MySqlSourceBuilder<T> builder() {
return new MySqlSourceBuilder<>();
}
// 由MySqlSourceBuilder.build方法创建
MySqlSource(
MySqlSourceConfigFactory configFactory,
DebeziumDeserializationSchema<T> deserializationSchema // 与老版source的deserialization一样
) {
this.configFactory = configFactory;
this.deserializationSchema = deserializationSchema;
}
@Override // 流批一体的source,表示有界性,新source接口的特性
public Boundedness getBoundedness() {return Boundedness.CONTINUOUS_UNBOUNDED; }
/*构建sourceReader */
@Override
public SourceReader<T, MySqlSplit> createReader(SourceReaderContext readerContext)
throws Exception {
// 前面提到了,根据subtask索引创建对应的config
MySqlSourceConfig sourceConfig =
configFactory.createConfig(readerContext.getIndexOfSubtask());
// 一个阻塞队列,多线程交互用的,不必深入
FutureCompletingBlockingQueue<RecordsWithSplitIds<SourceRecord>> elementsQueue =
new FutureCompletingBlockingQueue<>();
// metric相关
final MySqlSourceReaderMetrics sourceReaderMetrics =
new MySqlSourceReaderMetrics(readerContext.metricGroup());
sourceReaderMetrics.registerMetrics();
// 通过supplier函数构建一个SplitReader,解耦的作用,主要看里面的MySqlSplitReader实现即可
Supplier<MySqlSplitReader> splitReaderSupplier =
// 拿到每个reader的config和对应的subtask index
() -> new MySqlSplitReader(sourceConfig, readerContext.getIndexOfSubtask());
// 构建了一个具体的sourceReader
return new MySqlSourceReader<>(
elementsQueue,
splitReaderSupplier,
new MySqlRecordEmitter<>(
deserializationSchema,
sourceReaderMetrics,
sourceConfig.isIncludeSchemaChanges()),
readerContext.getConfiguration(),
readerContext,
sourceConfig);
}
@Override
public SplitEnumerator<MySqlSplit, PendingSplitsState> createEnumerator(
SplitEnumeratorContext<MySqlSplit> enumContext) {
// 因为只会生成一次所以生成一个sourceConfig即可
MySqlSourceConfig sourceConfig = configFactory.createConfig(0);
// 检验mysql
final MySqlValidator validator = new MySqlValidator(sourceConfig);
validator.validate();
final MySqlSplitAssigner splitAssigner;
// 判断开始条件如果是initial则先读取mysql table的数据(代码中叫做snapshot),然后再继续读取binlog的数据,如果不是initial状态,则直接从binlog开始读取
if (sourceConfig.getStartupOptions().startupMode == StartupMode.INITIAL) {
try (JdbcConnection jdbc = openJdbcConnection(sourceConfig)) {
final List<TableId> remainingTables = discoverCapturedTables(jdbc, sourceConfig);
boolean isTableIdCaseSensitive = DebeziumUtils.isTableIdCaseSensitive(jdbc);
splitAssigner =
// 里面包含 snapshot和binlog的split逻辑
new MySqlHybridSplitAssigner(
sourceConfig,
enumContext.currentParallelism(),
remainingTables,
isTableIdCaseSensitive);
} catch (Exception e) {
throw new FlinkRuntimeException(
"Failed to discover captured tables for enumerator", e);
}
} else {
// 之有binlog的split逻辑
splitAssigner = new MySqlBinlogSplitAssigner(sourceConfig);
}
// 创建对应发的SplitEnumerator,用于构建split给reader读取
return new MySqlSourceEnumerator(enumContext, sourceConfig, splitAssigner);
}
// 恢复SplitEnumerato,比如任务故障重启,会根据不同的checkpoint恢复SplitEnumerator,用于继续之前未完成的读取操作
@Override
public SplitEnumerator<MySqlSplit, PendingSplitsState> restoreEnumerator(
SplitEnumeratorContext<MySqlSplit> enumContext, PendingSplitsState checkpoint) {
MySqlSourceConfig sourceConfig = configFactory.createConfig(0);
final MySqlSplitAssigner splitAssigner;
if (checkpoint instanceof HybridPendingSplitsState) {
splitAssigner =
new MySqlHybridSplitAssigner(
sourceConfig,
enumContext.currentParallelism(),
(HybridPendingSplitsState) checkpoint);
} else if (checkpoint instanceof BinlogPendingSplitsState) {
splitAssigner =
new MySqlBinlogSplitAssigner( sourceConfig, (BinlogPendingSplitsState) checkpoint);
} else {
throw new UnsupportedOperationException( "Unsupported restored PendingSplitsState: " checkpoint);
}
return new MySqlSourceEnumerator(enumContext, sourceConfig, splitAssigner);
}
// -----------------容错相关,不是重点-----------------
@Override
public SimpleVersionedSerializer<MySqlSplit> getSplitSerializer() { return MySqlSplitSerializer.INSTANCE; }
@Override
public SimpleVersionedSerializer<PendingSplitsState> getEnumeratorCheckpointSerializer() { return new PendingSplitsStateSerializer(getSplitSerializer());}
// 返回值类型的提取
@Override
public TypeInformation<T> getProducedType() {return deserializationSchema.getProducedType();}
}
上面的代码中我们可以看到source的实现,主要是构建sourceReader和splitEnumerator,以及容错内容,相关的处理逻辑也封装在相应的对象中,下面我们对其内部逐步剖析
代码语言:javascript复制/*在看其他内容之前,我们可以看看如何对mysql进行split操作,在snapshot是通过主键来split的,binlog的只从当前offset位置开始消费,
这里是混合的一个split,另外还存在binlog和snapshot的splitAssigner,不过我们根据主要看看大致逻辑,具体到某一直可以自己阅读理解,
解释一下 : 先读取mysql历史数据即snapshot阶段,然后再进行当前mysql-binlog的位置开始消费,所以这个混合的意义就是先读取全量数据,然后从最新的binlog开始读取,完成cdc读取数据的过程*/
public class MySqlHybridSplitAssigner implements MySqlSplitAssigner {
private final int splitMetaGroupSize;
private boolean isBinlogSplitAssigned;
private final MySqlSnapshotSplitAssigner snapshotSplitAssigner;
public MySqlHybridSplitAssigner(
MySqlSourceConfig sourceConfig,
int currentParallelism,
List<TableId> remainingTables,
boolean isTableIdCaseSensitive) {
this(
// 创建snapshot split
new MySqlSnapshotSplitAssigner(
sourceConfig, currentParallelism, remainingTables, isTableIdCaseSensitive),
false,
sourceConfig.getSplitMetaGroupSize());
}
public MySqlHybridSplitAssigner(
MySqlSourceConfig sourceConfig,
int currentParallelism,
HybridPendingSplitsState checkpoint) {
this(
new MySqlSnapshotSplitAssigner(
sourceConfig, currentParallelism, checkpoint.getSnapshotPendingSplits()),
checkpoint.isBinlogSplitAssigned(),
sourceConfig.getSplitMetaGroupSize());
}
private MySqlHybridSplitAssigner(
MySqlSnapshotSplitAssigner snapshotSplitAssigner,
boolean isBinlogSplitAssigned,
int splitMetaGroupSize) {
this.snapshotSplitAssigner = snapshotSplitAssigner;
this.isBinlogSplitAssigned = isBinlogSplitAssigned;
this.splitMetaGroupSize = splitMetaGroupSize;
}
@Override
public void open() {
snapshotSplitAssigner.open();
}
// 主要返回下一个split,没有则返回一个空, optional可以jdk8的新特性,用于解决空指针的一个类
@Override
public Optional<MySqlSplit> getNext() {
// 下面的方法可以见名知意,自行理解即可
if (snapshotSplitAssigner.noMoreSplits()) {
if (isBinlogSplitAssigned) {
return Optional.empty();
} else if (snapshotSplitAssigner.isFinished()) { // 当snapshot完成后,开始binlog的split流程
// we need to wait snapshot-assigner to be finished before
// assigning the binlog split. Otherwise, records emitted from binlog split
// might be out-of-order in terms of same primary key with snapshot splits.
isBinlogSplitAssigned = true;
return Optional.of(createBinlogSplit());
} else {
// binlog split is not ready by now
return Optional.empty();
}
} else {
// snapshot assigner still have remaining splits, assign split from it
return snapshotSplitAssigner.getNext();
}
}
// splitAssigner是否在等待已完成split回调,即onFinishedSplits
@Override
public boolean waitingForFinishedSplits() {
return snapshotSplitAssigner.waitingForFinishedSplits();
}
// 获取已完成的split并且包含他的元数据,可以根据已经完成snapshot(snapshot的某一个split)生成对应binlog的split
@Override
public List<FinishedSnapshotSplitInfo> getFinishedSplitInfos() {
return snapshotSplitAssigner.getFinishedSplitInfos();
}
// 使用已完成的binlog偏移量来处理已完成的split,用于确定何时生成binlog split以及生成什么binlog split,就是回调
@Override
public void onFinishedSplits(Map<String, BinlogOffset> splitFinishedOffsets) {
snapshotSplitAssigner.onFinishedSplits(splitFinishedOffsets);
}
// 向此splitAssigner添加一组spilt,当某些split处理失败,则需要重新添加分割时调用此方法
@Override
public void addSplits(Collection<MySqlSplit> splits) {
List<MySqlSplit> snapshotSplits = new ArrayList<>();
for (MySqlSplit split : splits) {
if (split.isSnapshotSplit()) {
snapshotSplits.add(split);
} else {
// we don't store the split, but will re-create binlog split later
isBinlogSplitAssigned = false;
}
}
snapshotSplitAssigner.addSplits(snapshotSplits);
}
// ----------------------------checkpoint 容错相关----------------------------------------
@Override
public PendingSplitsState snapshotState(long checkpointId) {
return new HybridPendingSplitsState(
snapshotSplitAssigner.snapshotState(checkpointId), isBinlogSplitAssigned);
}
@Override
public void notifyCheckpointComplete(long checkpointId) {
snapshotSplitAssigner.notifyCheckpointComplete(checkpointId);
}
@Override
public void close() {
snapshotSplitAssigner.close();
}
// ------------------------------------binlog split部分-------------------------------------------
// 构建biglog split, 就是根据已经完成snapshot split来构建binlog split的一个过程,split代码比较简单可以自行阅读
// 简单介绍一下 就是描述binlog的split,snapshot的split相关内容,比如snapshot,会按照主键去做split,已经table的schemas相关信息
private MySqlBinlogSplit createBinlogSplit() {
final List<MySqlSnapshotSplit> assignedSnapshotSplit =
snapshotSplitAssigner.getAssignedSplits().values().stream()
.sorted(Comparator.comparing(MySqlSplit::splitId))
.collect(Collectors.toList());
Map<String, BinlogOffset> splitFinishedOffsets =
snapshotSplitAssigner.getSplitFinishedOffsets();
final List<FinishedSnapshotSplitInfo> finishedSnapshotSplitInfos = new ArrayList<>();
BinlogOffset minBinlogOffset = null;
for (MySqlSnapshotSplit split : assignedSnapshotSplit) {
// find the min binlog offset
BinlogOffset binlogOffset = splitFinishedOffsets.get(split.splitId());
if (minBinlogOffset == null || binlogOffset.isBefore(minBinlogOffset)) {
minBinlogOffset = binlogOffset;
}
finishedSnapshotSplitInfos.add(
new FinishedSnapshotSplitInfo(
split.getTableId(),
split.splitId(),
split.getSplitStart(),
split.getSplitEnd(),
binlogOffset));
}
boolean divideMetaToGroups = finishedSnapshotSplitInfos.size() > splitMetaGroupSize;
return new MySqlBinlogSplit(
BINLOG_SPLIT_ID,
minBinlogOffset == null ? BinlogOffset.INITIAL_OFFSET : minBinlogOffset,
BinlogOffset.NO_STOPPING_OFFSET,
divideMetaToGroups ? new ArrayList<>() : finishedSnapshotSplitInfos,
new HashMap<>(),
finishedSnapshotSplitInfos.size());
}
}
现在我们开始介绍sourceReader和SplitEnumerator
sourceReader :
代码语言:javascript复制
/* SingleThreadMultiplexSourceReaderBase */
public class MySqlSourceReader<T>
extends SingleThreadMultiplexSourceReaderBase<SourceRecord, T, MySqlSplit, MySqlSplitState> {
private static final Logger LOG = LoggerFactory.getLogger(MySqlSourceReader.class);
private final MySqlSourceConfig sourceConfig;
private final Map<String, MySqlSnapshotSplit> finishedUnackedSplits;
private final Map<String, MySqlBinlogSplit> uncompletedBinlogSplits;
private final int subtaskId;
public MySqlSourceReader(
FutureCompletingBlockingQueue<RecordsWithSplitIds<SourceRecord>> elementQueue,
Supplier<MySqlSplitReader> splitReaderSupplier,
RecordEmitter<SourceRecord, T, MySqlSplitState> recordEmitter,
Configuration config,
SourceReaderContext context,
MySqlSourceConfig sourceConfig) {
super(
elementQueue,
// 一个单线程的fetcher管理器,做一些读取操作
// 简单描述一下流程
// SingleThreadFetcherManager.createSplitFetcher 构建一个SplitFetcher(实现了Runnable),在SplitFetcher中会构建一个fetcherTask,SplitFetcher.run方法中,循环调用this.runOnce(),this.runOnce()会持续调用fetcherTask.run()读取数据,run()会调用MySqlSplitReader.fetch方法,返回reader读取的数据,并将数据放入到elementQueue中,只要涉及都多线程的代码,都比较晦涩难懂
new SingleThreadFetcherManager<>(elementQueue, splitReaderSupplier::get),
recordEmitter,
config,
context);
this.sourceConfig = sourceConfig;
this.finishedUnackedSplits = new HashMap<>();
this.uncompletedBinlogSplits = new HashMap<>();
this.subtaskId = context.getIndexOfSubtask();
}
// 启动reader
@Override
public void start() {
if (getNumberOfCurrentlyAssignedSplits() == 0) {
// 发送split的请求到splitEnumerator,会调用到SplitEnumerator.handleSplitRequest(int, String)方法,会带这并行的reader的subtask id 和hostname
context.sendSplitRequest();
}
}
// 当reader分配到新的split的时候,会初始化一个split的state
@Override
protected MySqlSplitState initializedState(MySqlSplit split) {
if (split.isSnapshotSplit()) {
return new MySqlSnapshotSplitState(split.asSnapshotSplit());
} else {
return new MySqlBinlogSplitState(split.asBinlogSplit());
}
}
@Override // 容错相关, skip
public List<MySqlSplit> snapshotState(long checkpointId) {
// unfinished splits
List<MySqlSplit> stateSplits = super.snapshotState(checkpointId);
// add finished snapshot splits that didn't receive ack yet
stateSplits.addAll(finishedUnackedSplits.values());
// add binlog splits who are uncompleted
stateSplits.addAll(uncompletedBinlogSplits.values());
return stateSplits;
}
// 清理处理已完成的split状态,非重点
@Override
protected void onSplitFinished(Map<String, MySqlSplitState> finishedSplitIds) {
for (MySqlSplitState mySqlSplitState : finishedSplitIds.values()) {
MySqlSplit mySqlSplit = mySqlSplitState.toMySqlSplit();
checkState(
mySqlSplit.isSnapshotSplit(),
String.format(
"Only snapshot split could finish, but the actual split is binlog split %s",
mySqlSplit));
finishedUnackedSplits.put(mySqlSplit.splitId(), mySqlSplit.asSnapshotSplit());
}
reportFinishedSnapshotSplitsIfNeed();
context.sendSplitRequest();
}
/*添加此reader要read的split列表,当splitEnumerator通过splitEnumeratorContext分配一个splut时,将调用此方法
即调用 context.assignSplit(SourceSplit, int) 或者 context.assignSplits(SplitsAssignment).
*/
@Override
public void addSplits(List<MySqlSplit> splits) {
List<MySqlSplit> unfinishedSplits = new ArrayList<>();
for (MySqlSplit split : splits) {
// 判断是否是snapshot还是binlog split
if (split.isSnapshotSplit()) {
// 如果split已经read完成放入完成集合,否则放入未完成的集合中
MySqlSnapshotSplit snapshotSplit = split.asSnapshotSplit();
if (snapshotSplit.isSnapshotReadFinished()) {
finishedUnackedSplits.put(snapshotSplit.splitId(), snapshotSplit);
} else {
unfinishedSplits.add(split);
}
} else {
if (!split.asBinlogSplit().isCompletedSplit()) {
//如果binlog split未完成则加入未完成的列表中,并想spluitEnumerator发送请求binlog split meta的事件
uncompletedBinlogSplits.put(split.splitId(), split.asBinlogSplit());
requestBinlogSplitMetaIfNeeded(split.asBinlogSplit());
} else {
// 未完成的split集合删除该split ,未完成的集合表示没有split meta信息
uncompletedBinlogSplits.remove(split.splitId());
// 创建binlog split, 带有table schema信息
MySqlBinlogSplit mySqlBinlogSplit =
discoverTableSchemasForBinlogSplit(split.asBinlogSplit());
// 添加到未完成的splits,后续会进行read操作
unfinishedSplits.add(mySqlBinlogSplit);
}
}
}
// notify split enumerator again about the finished unacked snapshot splits
reportFinishedSnapshotSplitsIfNeed();
// add all un-finished splits (including binlog split) to SourceReaderBase
// TODO 当调用spuer.addSplits的时候,会启动fetcherManager,开始读取数据的操作
super.addSplits(unfinishedSplits);
}
private MySqlBinlogSplit discoverTableSchemasForBinlogSplit(MySqlBinlogSplit split) {
final String splitId = split.splitId();
// 如果tableSchema不存在则填充,如果已经存在,则直接返回split即可
if (split.getTableSchemas().isEmpty()) {
try (MySqlConnection jdbc =
// 静态方法,构建一个mysqlConnection,可以认为就是一个jdbc连接 ,不必深入
DebeziumUtils.createMySqlConnection(sourceConfig.getDbzConfiguration())) {
Map<TableId, TableChanges.TableChange> tableSchemas =
// 静态方法,根据我们sourceBuilder构建的时候给定的database和tablelist来构建对应的tableId和TableChange,然后我们在面read的时候需要, 不必深入工具类
TableDiscoveryUtils.discoverCapturedTableSchemas(sourceConfig, jdbc);
LOG.info("The table schema discovery for binlog split {} success", splitId);
// 静态方法,构建一个带有tableSchema的MysqlBinlogSpilt,不必深入
return MySqlBinlogSplit.fillTableSchemas(split, tableSchemas);
} catch (SQLException e) {
LOG.error("Failed to obtains table schemas due to {}", e.getMessage());
throw new FlinkRuntimeException(e);
}
} else {
LOG.warn("The binlog split {} has table schemas yet, skip the table schema discovery",split);
return split;
}
}
// 处理source自定义事件,接收来自splitEumumerator,与splitEumumerator类似
@Override
public void handleSourceEvents(SourceEvent sourceEvent) {
if (sourceEvent instanceof FinishedSnapshotSplitsAckEvent) {
FinishedSnapshotSplitsAckEvent ackEvent = (FinishedSnapshotSplitsAckEvent) sourceEvent;
LOG.debug(
"The subtask {} receives ack event for {} from enumerator.",
subtaskId,
ackEvent.getFinishedSplits());
for (String splitId : ackEvent.getFinishedSplits()) {
this.finishedUnackedSplits.remove(splitId);
}
} else if (sourceEvent instanceof FinishedSnapshotSplitsRequestEvent) {
// report finished snapshot splits
LOG.debug(
"The subtask {} receives request to report finished snapshot splits.",
subtaskId);
reportFinishedSnapshotSplitsIfNeed();
} else if (sourceEvent instanceof BinlogSplitMetaEvent) {
LOG.debug(
"The subtask {} receives binlog meta with group id {}.",
subtaskId,
((BinlogSplitMetaEvent) sourceEvent).getMetaGroupId());
fillMetaDataForBinlogSplit((BinlogSplitMetaEvent) sourceEvent);
} else {
super.handleSourceEvents(sourceEvent);
}
}
// 发送请求binlogSplit meta的事件
private void requestBinlogSplitMetaIfNeeded(MySqlBinlogSplit binlogSplit) {
final String splitId = binlogSplit.splitId();
if (!binlogSplit.isCompletedSplit()) {
final int nextMetaGroupId =
getNextMetaGroupId(
binlogSplit.getFinishedSnapshotSplitInfos().size(),
sourceConfig.getSplitMetaGroupSize());
BinlogSplitMetaRequestEvent splitMetaRequestEvent =
new BinlogSplitMetaRequestEvent(splitId, nextMetaGroupId);
context.sendSourceEventToCoordinator(splitMetaRequestEvent);
} else {
LOG.info("The meta of binlog split {} has been collected success", splitId);
this.addSplits(Arrays.asList(binlogSplit));
}
}
// 我们发送了请求meta的event后,会收到binlog split meta,我们需要填充至binlogSplit中
private void fillMetaDataForBinlogSplit(BinlogSplitMetaEvent metadataEvent) {
MySqlBinlogSplit binlogSplit = uncompletedBinlogSplits.get(metadataEvent.getSplitId());
if (binlogSplit != null) {
final int receivedMetaGroupId = metadataEvent.getMetaGroupId();
final int expectedMetaGroupId =
getNextMetaGroupId(
binlogSplit.getFinishedSnapshotSplitInfos().size(),
sourceConfig.getSplitMetaGroupSize());
if (receivedMetaGroupId == expectedMetaGroupId) {
List<FinishedSnapshotSplitInfo> metaDataGroup =
metadataEvent.getMetaGroup().stream()
.map(FinishedSnapshotSplitInfo::deserialize)
.collect(Collectors.toList());
uncompletedBinlogSplits.put(
binlogSplit.splitId(),
MySqlBinlogSplit.appendFinishedSplitInfos(binlogSplit, metaDataGroup));
LOG.info("Fill meta data of group {} to binlog split", metaDataGroup.size());
} else {
LOG.warn("Received out of oder binlog meta event for split {}, the received meta group id is {}, but expected is {}, ignore it",metadataEvent.getSplitId(), receivedMetaGroupId,expectedMetaGroupId);
}
// 继续发送请求meta event
requestBinlogSplitMetaIfNeeded(binlogSplit);
} else {
LOG.warn( "Received binlog meta event for split {}, but the uncompleted split map does not contain it", metadataEvent.getSplitId());
}
}
// state变成不可变的state
@Override
protected MySqlSplit toSplitType(String splitId, MySqlSplitState splitState) { return splitState.toMySqlSplit(); }
}
splitEnumerator :
- 处理sourceReader的split请求
- 将split分配给sourceReader
// 继承SplitEnumerator,并重写其方法
public class MySqlSourceEnumerator implements SplitEnumerator<MySqlSplit, PendingSplitsState> {
private static final long CHECK_EVENT_INTERVAL = 30_000L;
private final SplitEnumeratorContext<MySqlSplit> context;
private final MySqlSourceConfig sourceConfig;
private final MySqlSplitAssigner splitAssigner;
// using TreeSet to prefer assigning binlog split to task-0 for easier debug
private final TreeSet<Integer> readersAwaitingSplit;
private List<List<FinishedSnapshotSplitInfo>> binlogSplitMeta;
public MySqlSourceEnumerator(
SplitEnumeratorContext<MySqlSplit> context,
MySqlSourceConfig sourceConfig,
MySqlSplitAssigner splitAssigner) {
// source.createEnumerator传入的context对象
this.context = context;
this.sourceConfig = sourceConfig;
this.splitAssigner = splitAssigner;
this.readersAwaitingSplit = new TreeSet<>();
}
@Override
public void start() {
splitAssigner.open(); //调用splitAssigner的open方法,可以具体看看每个splitAssigner的实现
// 注册一个Callable,定期调用,主要的作用就是当reader出现通信失败或者故障重启之后,检查是否有错过的通知时间,不是终点
this.context.callAsync(
this::getRegisteredReader,
this::syncWithReaders,
CHECK_EVENT_INTERVAL,
CHECK_EVENT_INTERVAL);
}
// 处理split的请求,当有具体给定子subtask id的reader调用SourceReaderContext.sendSplitRequest()方法时,将调用此方法。
@Override
public void handleSplitRequest(int subtaskId, @Nullable String requesterHostname) {
if (!context.registeredReaders().containsKey(subtaskId)) {
// reader failed between sending the request and now. skip this request.
return;
}
// 将请求的taskId放入等待列表
readersAwaitingSplit.add(subtaskId);
// 对等待列表的subtask进行fen'pei
assignSplits();
}
// 将split添加至splitEnumerator,只有在最后一个成功的checkpoint之后,分配的spilt才会出现此情况,说明需要重新处理.
@Override
public void addSplitsBack(List<MySqlSplit> splits, int subtaskId) {
LOG.debug("MySQL Source Enumerator adds splits back: {}", splits);
splitAssigner.addSplits(splits);
}
// 处理sourceReader的自定义event
@Override
public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {
// sourceReader发送给splitEnumerator的SourceEvent通知snapshot的split已经读取完成,binlog的位置是一致的
if (sourceEvent instanceof FinishedSnapshotSplitsReportEvent) {
LOG.info(
"The enumerator receives finished split offsets {} from subtask {}.",
sourceEvent,
subtaskId);
FinishedSnapshotSplitsReportEvent reportEvent =
(FinishedSnapshotSplitsReportEvent) sourceEvent;
Map<String, BinlogOffset> finishedOffsets = reportEvent.getFinishedOffsets();
// 上面splitAssigner介绍过
splitAssigner.onFinishedSplits(finishedOffsets);
// 返回ACK事件返回给redaer的表示已经确认了snapshot
FinishedSnapshotSplitsAckEvent ackEvent =
new FinishedSnapshotSplitsAckEvent(new ArrayList<>(finishedOffsets.keySet()));
context.sendEventToSourceReader(subtaskId, ackEvent);
}
// sourceReader发送给splitEnumerator的SourceEvent用来拉取binlog元数据,也就是发送BinlogSplitMetaEvent
else if (sourceEvent instanceof BinlogSplitMetaRequestEvent) {
LOG.debug(
"The enumerator receives request for binlog split meta from subtask {}.",
subtaskId);
// 发送binlog meta
sendBinlogMeta(subtaskId, (BinlogSplitMetaRequestEvent) sourceEvent);
}
}
@Override
public PendingSplitsState snapshotState(long checkpointId) {
return splitAssigner.snapshotState(checkpointId);
}
@Override
public void notifyCheckpointComplete(long checkpointId) {
splitAssigner.notifyCheckpointComplete(checkpointId);
// binlog split may be available after checkpoint complete
assignSplits();
}
// ------------------------------------------------------------------------------------------
// 为等待列表的subtask分配
private void assignSplits() {
// treeSet返回的iter是排好序的,即按照subtask id顺序依次处理
final Iterator<Integer> awaitingReader = readersAwaitingSplit.iterator();
while (awaitingReader.hasNext()) {
int nextAwaiting = awaitingReader.next();
// 如果reader再次请求的split在此期间失败,则将其从等待列表中删除
if (!context.registeredReaders().containsKey(nextAwaiting)) {
awaitingReader.remove();
continue;
}
Optional<MySqlSplit> split = splitAssigner.getNext();
if (split.isPresent()) {
final MySqlSplit mySqlSplit = split.get();
// 为subtask分配split
context.assignSplit(mySqlSplit, nextAwaiting);
awaitingReader.remove();
LOG.info("Assign split {} to subtask {}", mySqlSplit, nextAwaiting);
} else {
// there is no available splits by now, skip assigning
// 前面splitAssigner中会分配空值,在这里被过滤掉
break;
}
}
}
// 发送给binlog meta event到reader
private void sendBinlogMeta(int subTask, BinlogSplitMetaRequestEvent requestEvent) {
// 如果binlog meta ==null 则进行meta的初始化操作
if (binlogSplitMeta == null) {
final List<FinishedSnapshotSplitInfo> finishedSnapshotSplitInfos =
splitAssigner.getFinishedSplitInfos();
if (finishedSnapshotSplitInfos.isEmpty()) {
LOG.error(
"The assigner offer empty finished split information, this should not happen");
throw new FlinkRuntimeException(
"The assigner offer empty finished split information, this should not happen");
}
binlogSplitMeta =
Lists.partition(
finishedSnapshotSplitInfos, sourceConfig.getSplitMetaGroupSize());
}
final int requestMetaGroupId = requestEvent.getRequestMetaGroupId();
if (binlogSplitMeta.size() > requestMetaGroupId) {
// 获取对应的FinishedSnapshotSplitInfo列表,并将其封序列化,生成meta event
List<FinishedSnapshotSplitInfo> metaToSend = binlogSplitMeta.get(requestMetaGroupId);
BinlogSplitMetaEvent metadataEvent =
new BinlogSplitMetaEvent(
requestEvent.getSplitId(),
requestMetaGroupId,
metaToSend.stream()
.map(FinishedSnapshotSplitInfo::serialize)
.collect(Collectors.toList()));
// 将生成的meta evnet 发送给reader
context.sendEventToSourceReader(subTask, metadataEvent);
} else {
LOG.error(
"Received invalid request meta group id {}, the invalid meta group id range is [0, {}]",
requestMetaGroupId,
binlogSplitMeta.size() - 1);
}
}
}
上面两个类中我们没有看到具体的读数据逻辑,实际上当系统调用addSplit()的时候就开始启动任务了,由于调用链比较长,为了方便观看,我这里直接截图看代码,看看代码是怎么开始进入执行逻辑的,
- sourceReader中创建的fetcherManager,存入父类成员变量中
- 当sourceReader调用addSplits的会调用父类的addSplits方法
- 调用我们传入的fetcherManager的addSplits方法
- 调用fetcherManager的addSplits方法时,子类没有覆写父类方法,直接进入父类方法,这里直接进入父类的splits方法,如果fetcher没有启动,则创建fetcher(一个runnable对象),然后提交到线程池执行任务
上面可以看到我们的fetcher已经启动了,那我们就看看fetcher具体做了什么样子的事情(要记住上面传入了一个队列,fetcher中读取的数据会放入队列中),createFetcher时候,实际是创建的SplitFetcher,有flink新source中提供类
代码语言:javascript复制/*由于SplitFetcher是一个runnable对象,所以我们直接进入run方法看看做了什么即可
先介绍一下流程 :
1. 当构建fetcher的时候在构造方法中,我们传递了一个splitReader,这个是负责真实读取数据的(实际上是mysqlSplitReader)
2. fetcher构造方法中构建了一个FetcherTask,run之后会开始task的执行,如果还记得的话 我们在startFetcher()之后调用了一个fetcher的addSplit方法,该方法会将splits构建成tasks加入的taskQueue
3. 里面会有一些空闲,唤醒等不重要的逻辑,我给删除掉了,不重要,不要占用过多时间,因为非cdc内容
*/
private static final Logger LOG = LoggerFactory.getLogger(SplitFetcher.class);
private static final SplitFetcherTask WAKEUP_TASK = new DummySplitFetcherTask("WAKEUP_TASK");
private final int id;
private final BlockingDeque<SplitFetcherTask> taskQueue;
// track the assigned splits so we can suspend the reader when there is no splits assigned.
private final Map<String, SplitT> assignedSplits;
private final FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue;
private final SplitReader<E, SplitT> splitReader;
private final Consumer<Throwable> errorHandler;
private final Runnable shutdownHook;
private final AtomicBoolean wakeUp;
private final AtomicBoolean closed;
private final FetchTask<E, SplitT> fetchTask;
private volatile SplitFetcherTask runningTask = null;
private final Object lock = new Object();
SplitFetcher(
int id,
FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue,
SplitReader<E, SplitT> splitReader,
Consumer<Throwable> errorHandler,
Runnable shutdownHook,
Consumer<Collection<String>> splitFinishedHook) {
this.id = id;
// task队列,包含WAKEUP_TASK(特定情况下唤醒fetcher线程用),以及我们读取任务的task
this.taskQueue = new LinkedBlockingDeque<>();
// 读取的数据会放入该队列
this.elementsQueue = elementsQueue;
this.assignedSplits = new HashMap<>();
this.splitReader = splitReader;
this.errorHandler = errorHandler;
this.shutdownHook = shutdownHook;
this.isIdle = true;
this.wakeUp = new AtomicBoolean(false);
this.closed = new AtomicBoolean(false);
// 对传入的splitReader封装到fetcherTask,以便任务启动的时候直接执行任务
this.fetchTask =
new FetchTask<>(
splitReader,
elementsQueue,
ids -> {
ids.forEach(assignedSplits::remove);
splitFinishedHook.accept(ids);
LOG.info("Finished reading from splits {}", ids);
},
id);
}
@Override
public void run() {
LOG.info("Starting split fetcher {}", id);
try {
while (!closed.get()) {
// 每次循环的距离逻辑
runOnce();
}
} catch (Throwable t) {
errorHandler.accept(t);
} finally {
try {
splitReader.close();
} catch (Exception e) {
errorHandler.accept(e);
}
LOG.info("Split fetcher {} exited.", id);
// This executes after possible errorHandler.accept(t). If these operations bear
// a happens-before relation, then we can checking side effect of errorHandler.accept(t)
// to know whether it happened after observing side effect of shutdownHook.run().
shutdownHook.run();
}
}
/** Package private method to help unit test. */
void runOnce() {
try {
if (shouldRunFetchTask()) {
runningTask = fetchTask;
} else {
runningTask = taskQueue.take();
}
LOG.debug("Prepare to run {}", runningTask);
// 这里运行task,我们下面直接去task中看看具体的操作逻辑即可
if (!wakeUp.get() && runningTask.run()) {
LOG.debug("Finished running task {}", runningTask);
// the task has finished running. Set it to null so it won't be enqueued.
runningTask = null;
checkAndSetIdle();
}
} catch (Exception e) {
throw new RuntimeException(
String.format(
"SplitFetcher thread %d received unexpected exception while polling the records",
id),
e);
}
// If the task is not null that means this task needs to be re-executed. This only
// happens when the task is the fetching task or the task was interrupted.
maybeEnqueueTask(runningTask);
synchronized (wakeUp) {
// Set the running task to null. It is necessary for the shutdown method to avoid
// unnecessarily interrupt the running task.
runningTask = null;
// Set the wakeUp flag to false.
wakeUp.set(false);
LOG.debug("Cleaned wakeup flag.");
}
}
/* 在fetcher创建的时候调用了该方法,或者已经运行之后调用的该方法在上面截图的流程中有代码 */
public void addSplits(List<SplitT> splitsToAdd) {
enqueueTask(new AddSplitsTask<>(splitReader, splitsToAdd, assignedSplits));
wakeUp(true);
}
public void enqueueTask(SplitFetcherTask task) {
synchronized (lock) {
taskQueue.offer(task);
isIdle = false;
}
}
}
我们进入fetcherTask中,只看task逻辑即可
代码语言:javascript复制class FetcherTask{
@Override
public boolean run() throws IOException {
try {
if (!isWakenUp() && lastRecords == null) {
// 返回的是该对象 public final class MySqlRecords implements RecordsWithSplitIds<SourceRecord>
// 调用了我们在创建fetcherTask的时候传入的splitReader对象,实际上还是调用reader的fetch方法来真正的获取数据
lastRecords = splitReader.fetch();
}
if (!isWakenUp()) {
// The order matters here. We must first put the last records into the queue.
// This ensures the handling of the fetched records is atomic to wakeup.
// 将读取的数据放入到队列汇总
if (elementsQueue.put(fetcherIndex, lastRecords)) {
if (!lastRecords.finishedSplits().isEmpty()) {
// The callback does not throw InterruptedException.
splitFinishedCallback.accept(lastRecords.finishedSplits());
}
lastRecords = null;
}
}
} catch (InterruptedException e) {
throw new IOException("Source fetch execution was interrupted", e);
if (isWakenUp()) {
wakeup = false;
}
}
return true;
}
}
通过上面我们基本上已经清楚了在flink层面是怎么最终调用了cdc读取数据的代码,现在我们根据主要的读取代码看看是怎么样子实现的
代码语言:javascript复制currentReader.pollSplitRecords() ,我们简单介绍一下currentReader(BinlogSplitReader/SnapshotSplitReader)主要两种实现,大概的思路这里面会根据不同的性质区分进行读取数据,在submitSplit的时候会创建readRask读取指定split的数据,结果会放入StatefulTaskContext的queue中,在fetch方法会先提交split,让其执行read数据,然后通过pollSplitRecords方法在调用queue.poll拉取数据,这是一个阻塞的操作,如果超时则抛出中断异常
public class MySqlSplitReader implements SplitReader<SourceRecord, MySqlSplit> {
private final Queue<MySqlSplit> splits;
private final MySqlSourceConfig sourceConfig;
private final int subtaskId;
@Nullable private DebeziumReader<SourceRecord, MySqlSplit> currentReader;
@Nullable private String currentSplitId;
@Override
public RecordsWithSplitIds<SourceRecord> fetch() throws IOException {
// 执行fetch的时候提前检查一下currentReader,并根据不同的split创建不同的对应的reader,binlog/snapshot
checkSplitOrStartNext();
Iterator<SourceRecord> dataIt = null;
try {
// 调用具体的debeziumReader执行任务
// 在reader中会调用StatefulTaskContext的queue的poll方法拉取数据,该方法会阻塞(也可以根据时间阻塞),如果时间间隔内没有返回数据则被中断,抛出InterruptedException
dataIt = currentReader.pollSplitRecords();
} catch (InterruptedException e) {
LOG.warn("fetch data failed.", e);
throw new IOException(e);
}
return dataIt == null
? finishedSnapshotSplit() // 如果没有读取到数据则返回一个空的,该方法执行后会将currentSplitId置位null,表示已经该split执行完成
: MySqlRecords.forRecords(currentSplitId, dataIt);
}
@Override
public void handleSplitsChanges(SplitsChange<MySqlSplit> splitsChanges) {
if (!(splitsChanges instanceof SplitsAddition)) {
throw new UnsupportedOperationException(
String.format(
"The SplitChange type of %s is not supported.",
splitsChanges.getClass()));
}
LOG.debug("Handling split change {}", splitsChanges);
splits.addAll(splitsChanges.splits());
}
private void checkSplitOrStartNext() throws IOException {
// the binlog reader should keep alive
if (currentReader instanceof BinlogSplitReader) {
return;
}
if (canAssignNextSplit()) {
final MySqlSplit nextSplit = splits.poll();
if (nextSplit == null) {
throw new IOException("Cannot fetch from another split - no split remaining");
}
currentSplitId = nextSplit.splitId();
if (nextSplit.isSnapshotSplit()) {
if (currentReader == null) {
final MySqlConnection jdbcConnection =
createMySqlConnection(sourceConfig.getDbzConfiguration());
final BinaryLogClient binaryLogClient =
createBinaryClient(sourceConfig.getDbzConfiguration());
final StatefulTaskContext statefulTaskContext =
new StatefulTaskContext(sourceConfig, binaryLogClient, jdbcConnection);
currentReader = new SnapshotSplitReader(statefulTaskContext, subtaskId);
}
} else {
// point from snapshot split to binlog split
if (currentReader != null) {
LOG.info("It's turn to read binlog split, close current snapshot reader");
currentReader.close();
}
final MySqlConnection jdbcConnection =
createMySqlConnection(sourceConfig.getDbzConfiguration());
final BinaryLogClient binaryLogClient =
createBinaryClient(sourceConfig.getDbzConfiguration());
final StatefulTaskContext statefulTaskContext =
new StatefulTaskContext(sourceConfig, binaryLogClient, jdbcConnection);
currentReader = new BinlogSplitReader(statefulTaskContext, subtaskId);
LOG.info("BinlogSplitReader is created.");
}
// 提交一个split到reader,reader会在在submitSplit方法创建ReadTask对象,进行读取数据,将数据放入StatefulTaskContext的queue中,readTask放入线程池执行任务
currentReader.submitSplit(nextSplit);
}
}
private boolean canAssignNextSplit() {
return currentReader == null || currentReader.isFinished();
}
}
上面基本运行流程已经走通了现在就差实际读取数据的阶段了,现在我们直接跟着代码一点一点走 看看实际的执行逻辑是什么样子的,怎么读取数据的,由于cdc的代码比较多,我们就过滤掉了,binlogReadTask太繁琐,我们就不一步一步讲解了,后面可以简单介绍一下流程看看
我们一次看看对于reader对应的处理逻辑
1.checkSplitWithSplitIds方法
在这个方法中最主要的是调用了submitSplit开始我们下面的读取数据的一个流程
代码语言:javascript复制
// ------------------------- SnapshotSplitReader.submitSplit方法 ------------------------------------------
public void submitSplit(MySqlSplit mySqlSplit) {
this.currentSnapshotSplit = mySqlSplit.asSnapshotSplit();
statefulTaskContext.configure(currentSnapshotSplit);
// 拿到context的queue,在pollSplitSrecords的时候需要
this.queue = statefulTaskContext.getQueue();
this.nameAdjuster = statefulTaskContext.getSchemaNameAdjuster();
this.hasNextElement.set(true);
this.reachEnd.set(false);
// 主要读取逻辑在readTask中
this.splitSnapshotReadTask =
new MySqlSnapshotSplitReadTask(
statefulTaskContext.getConnectorConfig(),
statefulTaskContext.getOffsetContext(),
statefulTaskContext.getSnapshotChangeEventSourceMetrics(),
statefulTaskContext.getDatabaseSchema(),
statefulTaskContext.getConnection(),
statefulTaskContext.getDispatcher(),
statefulTaskContext.getTopicSelector(),
StatefulTaskContext.getClock(),
currentSnapshotSplit);
// 提交一个runnable到线程中,主要是执行readTask的execute方法
executor.submit(
() -> {
try {
currentTaskRunning = true;
// 自己实现的contextImpl 主要记录高水位和低水位用
final SnapshotSplitChangeEventSourceContextImpl sourceContext =
new SnapshotSplitChangeEventSourceContextImpl();
// 执行readTask
SnapshotResult snapshotResult =
splitSnapshotReadTask.execute(sourceContext);
final MySqlBinlogSplit backfillBinlogSplit =
createBackfillBinlogSplit(sourceContext);
// optimization that skip the binlog read when the low watermark equals high
// watermark
// 如由于snapshot是并行读取的,所以当该读取该split的数据,低水位和高水位相同,说明在read数据中没有出现其他操作,所以可以退出binlog优化阶段,可以认为该split范围的数据没有变更,不需要在snapshot之后进行binlog的read
final boolean binlogBackfillRequired =
backfillBinlogSplit
.getEndingOffset()
.isAfter(backfillBinlogSplit.getStartingOffset());
if (!binlogBackfillRequired) {
dispatchHighWatermark(backfillBinlogSplit);
currentTaskRunning = false;
return;
}
// snapshot执行完成后,开始binlogReadTask的读取操作
if (snapshotResult.isCompletedOrSkipped()) {
// 根据snapshot read task读取结束后,会记录高低水位,水位线作为参数构建binlog read task
final MySqlBinlogSplitReadTask backfillBinlogReadTask =
createBackfillBinlogReadTask(backfillBinlogSplit);
// 执行binlog read task,由于里面的处理逻辑太复杂了,我们就不直接进行阅读了
// 我这里直接简单介绍一下流程,就是拿到snapshot的高水位,作为endOffset,在binlog read task中,会
// 以endOffset作为结束条件,小宇endOffset的数据都会被read,并发送下游
backfillBinlogReadTask.execute(
new SnapshotBinlogSplitChangeEventSourceContextImpl());
} else {
readException =
new IllegalStateException(
String.format(
"Read snapshot for mysql split %s fail",
currentSnapshotSplit));
}
} catch (Exception e) {
currentTaskRunning = false;
LOG.error(
String.format(
"Execute snapshot read task for mysql split %s fail",
currentSnapshotSplit),
e);
readException = e;
}
});
}
// ------------------------- MySqlSnapshotSplitReadTask.execute(sourceContext)方法 ------------------------------------------
@Override
public SnapshotResult execute(ChangeEventSourceContext context) throws InterruptedException {
SnapshottingTask snapshottingTask = getSnapshottingTask(previousOffset);//就是new了一个
final SnapshotContext ctx;
try {
ctx = prepare(context); //重新new了一个 context对象,比较无用
} catch (Exception e) {
LOG.error("Failed to initialize snapshot context.", e);
throw new RuntimeException(e);
}
try {
// 上面都是无用代码,这里直接调用了doExecute方法,我们进入该方法看主要逻辑即可
return doExecute(context, ctx, snapshottingTask);
} catch (InterruptedException e) {
LOG.warn("Snapshot was interrupted before completion");
throw e;
} catch (Exception t) {
throw new DebeziumException(t);
}
}
// ------------------------- MySqlSnapshotSplitReadTask.doExecute(sourceContext)方法 ------------------------------------------
@Override
protected SnapshotResult doExecute(
ChangeEventSourceContext context,
SnapshotContext snapshotContext,
SnapshottingTask snapshottingTask)
throws Exception {
final RelationalSnapshotChangeEventSource.RelationalSnapshotContext ctx =
(RelationalSnapshotChangeEventSource.RelationalSnapshotContext) snapshotContext;
ctx.offset = offsetContext;
// 一个dispatcher,用于记录水位线事件,后面会通过该dispatcher发射数据,当然是通过emitter发射了
final SignalEventDispatcher signalEventDispatcher =
new SignalEventDispatcher(
offsetContext.getPartition(),
topicSelector.topicNameFor(snapshotSplit.getTableId()),
dispatcher.getQueue());
// 其实log输出的日志就已经很清晰了
// 记录低水位
final BinlogOffset lowWatermark = currentBinlogOffset(jdbcConnection);
LOG.info(
"Snapshot step 1 - Determining low watermark {} for split {}",
lowWatermark,
snapshotSplit);
((SnapshotSplitReader.SnapshotSplitChangeEventSourceContextImpl) (context))
.setLowWatermark(lowWatermark);
signalEventDispatcher.dispatchWatermarkEvent(
snapshotSplit, lowWatermark, SignalEventDispatcher.WatermarkKind.LOW);
LOG.info("Snapshot step 2 - Snapshotting data");
// 读取数据 主要方法重点介绍的地方
createDataEvents(ctx, snapshotSplit.getTableId());
// 记录高水位
final BinlogOffset highWatermark = currentBinlogOffset(jdbcConnection);
LOG.info(
"Snapshot step 3 - Determining high watermark {} for split {}",
highWatermark,
snapshotSplit);
signalEventDispatcher.dispatchWatermarkEvent(
snapshotSplit, highWatermark, SignalEventDispatcher.WatermarkKind.HIGH);
((SnapshotSplitReader.SnapshotSplitChangeEventSourceContextImpl) (context))
.setHighWatermark(highWatermark);
return SnapshotResult.completed(ctx.offset);
}
// 我们看看createDataEvents 调用过程
private void createDataEvents(
RelationalSnapshotChangeEventSource.RelationalSnapshotContext snapshotContext,
TableId tableId)
throws Exception {
EventDispatcher.SnapshotReceiver snapshotReceiver =
dispatcher.getSnapshotChangeEventReceiver();
LOG.debug("Snapshotting table {}", tableId);
createDataEventsForTable(
snapshotContext, snapshotReceiver, databaseSchema.tableFor(tableId));
// receiver的逻辑我们就不看了,我这里介绍一下就好
// receiver通过changeRecord方法接收到数据后,通过一个成员变量(bufferedEvent)控制,如果!=null加入队列,然后创建一个新的SourceRecord,直到所有的数据读取完成,所以说最后一条数据创建成功之后,如果没有新的数据了,则不会调用changeRecord该方法,也就是说成员变量记录了最后一个record
// 这里调用completeSnapshot方法的时候会对bufferedEvent变量进行判断,如果不等于null做一些complete相关的工作最后加入队列中,如果不调用该方法,则当前split的snapshot阶段读取的数据少了一条,嘻嘻嘻
snapshotReceiver.completeSnapshot();
}
// createDataEvents中调用到本类的createDataEventsForTable,也就是开始了具体读取逻辑
private void createDataEventsForTable(
RelationalSnapshotChangeEventSource.RelationalSnapshotContext snapshotContext,
EventDispatcher.SnapshotReceiver snapshotReceiver,
Table table)
throws InterruptedException {
long exportStart = clock.currentTimeInMillis();
LOG.info("Exporting data from split '{}' of table {}", snapshotSplit.splitId(), table.id());
// 构建sql
final String selectSql =
StatementUtils.buildSplitScanQuery(
snapshotSplit.getTableId(),
snapshotSplit.getSplitKeyType(),
snapshotSplit.getSplitStart() == null,
snapshotSplit.getSplitEnd() == null);
LOG.info(
"For split '{}' of table {} using select statement: '{}'",
snapshotSplit.splitId(),
table.id(),
selectSql);
try (PreparedStatement selectStatement =
StatementUtils.readTableSplitDataStatement( // 创建statement,然后查询sql
jdbcConnection,
selectSql,
snapshotSplit.getSplitStart() == null,
snapshotSplit.getSplitEnd() == null, snapshotSplit.getSplitStart(),
snapshotSplit.getSplitEnd(),
snapshotSplit.getSplitKeyType().getFieldCount(),
connectorConfig.getQueryFetchSize());
// 然后对查询出来的数据进行封装成sourceRecord发送下游
ResultSet rs = selectStatement.executeQuery()) {
ColumnUtils.ColumnArray columnArray = ColumnUtils.toArray(rs, table);
long rows = 0;
Threads.Timer logTimer = getTableScanLogTimer();
while (rs.next()) {
rows ;
final Object[] row = new Object[columnArray.getGreatestColumnPosition()];
for (int i = 0; i < columnArray.getColumns().length; i ) {
Column actualColumn = table.columns().get(i);
row[columnArray.getColumns()[i].position() - 1] =
readField(rs, i 1, actualColumn, table);
}
if (logTimer.expired()) {
long stop = clock.currentTimeInMillis();
LOG.info(
"Exported {} records for split '{}' after {}",
rows,
snapshotSplit.splitId(),
Strings.duration(stop - exportStart));
snapshotProgressListener.rowsScanned(table.id(), rows);
logTimer = getTableScanLogTimer();
}
// 这里会将数据放入队列,通过receiver接收数据,然后再将数据放入其队列的一个过程,其实不必深入,就是封装的比较好,难以理解
dispatcher.dispatchSnapshotEvent(
table.id(),
getChangeRecordEmitter(snapshotContext, table.id(), row),// 就是new了一个
snapshotReceiver);
}
LOG.info(
"Finished exporting {} records for split '{}', total duration '{}'",
rows,
snapshotSplit.splitId(),
Strings.duration(clock.currentTimeInMillis() - exportStart));
} catch (SQLException e) {
throw new ConnectException("Snapshotting of table " table.id() " failed", e);
}
}
// ------------------------- dispatcher.dispatchSnapshotEvent方法之后的流程 ----------------------------------
// 进入evnentDisptcher.dispatchSnapshotEvent方法
public void dispatchSnapshotEvent(T dataCollectionId, ChangeRecordEmitter changeRecordEmitter, SnapshotReceiver receiver) throws InterruptedException {
DataCollectionSchema dataCollectionSchema = schema.schemaFor(dataCollectionId);
if (dataCollectionSchema == null) {
errorOnMissingSchema(dataCollectionId, changeRecordEmitter);
}
changeRecordEmitter.emitChangeRecords(dataCollectionSchema, new Receiver() {
@Override
public void changeRecord(DataCollectionSchema schema,
Operation operation,
Object key, Struct value,
OffsetContext offset,
ConnectHeaders headers)
throws InterruptedException {
eventListener.onEvent(dataCollectionSchema.id(), offset, key, value);
// 真正的放入队列的逻辑在这里调用
// receiver使我们传入的 对应BufferingSnapshotChangeRecordReceiver类
receiver.changeRecord(dataCollectionSchema, operation, key, value, offset, headers);
}
});
}
// BufferingSnapshotChangeRecordReceiver的changeRecord方法
// 前面简单介绍过他的处理逻辑了,就不必多做介绍了
@Override
public void changeRecord(DataCollectionSchema dataCollectionSchema,
Operation operation,
Object key, Struct value,
OffsetContext offsetContext,
ConnectHeaders headers)
throws InterruptedException {
Objects.requireNonNull(value, "value must not be null");
LOGGER.trace("Received change record for {} operation on key {}", operation, key);
if (bufferedEvent != null) {
queue.enqueue(bufferedEvent.get());
}
Schema keySchema = dataCollectionSchema.keySchema();
String topicName = topicSelector.topicNameFor((T) dataCollectionSchema.id());
// the record is produced lazily, so to have the correct offset as per the pre/post completion callbacks
bufferedEvent = () -> {
SourceRecord record = new SourceRecord(
offsetContext.getPartition(),
offsetContext.getOffset(),
topicName, null,
keySchema, key,
dataCollectionSchema.getEnvelopeSchema().schema(), value,
null, headers);
return changeEventCreator.createDataChangeEvent(record);
};
}
2.pollSplitRecords方法,这个方法是拉取queue中的数据
上面把数据读取后写入到queue的流程已经捋清楚了,我们现在看看reader是在什么时候读取了queue的数据
代码语言:javascript复制public Iterator<SourceRecord> pollSplitRecords() throws InterruptedException {
checkReadException();
if (hasNextElement.get()) {
// data input: [low watermark event][snapshot events][high watermark event][binlog
// events][binlog-end event]
// data output: [low watermark event][normalized events][high watermark event]
boolean reachBinlogEnd = false;
final List<SourceRecord> sourceRecords = new ArrayList<>();
while (!reachBinlogEnd) {
// 可以看到这里直接queue.poll直接拉取数据即可,在这里会判断一下当前evnet是否是到达了结束的水位线,实际上就是高水位的位置,到达结束水位线之后,我们就可以停止了
List<DataChangeEvent> batch = queue.poll();
for (DataChangeEvent event : batch) {
sourceRecords.add(event.getRecord());
if (RecordUtils.isEndWatermarkEvent(event.getRecord())) {
reachBinlogEnd = true;
break;
}
}
}
// snapshot split return its data once
hasNextElement.set(false);
return normalizedSplitRecords(currentSnapshotSplit, sourceRecords, nameAdjuster)
.iterator();
}
// the data has been polled, no more data
reachEnd.compareAndSet(false, true);
return null;
}
通过上面的阅读我们已经看到了数据从读取到,再到放入队列中的一个过程,这里对队列做一个说明,在上面的介绍中存在两个队列,一个queue,一个elementQueue,这个的区别在于,queue是在读取数据阶段,将数据放入queue,在FetcherTask中调用了reader的fetch方法,将queue中的数据拉取到,并将其加入到elementQueue
在上面的操作中数据已经放入了elementQueue中,现在我们看看elementQueue中的数据是在什么时候发送到下游的
这里我们需要重新回到MysqlSource中通过一张图来看看
我们在创建Reader的时候传入了一个MysqlRecordEmitter,在后面发送数据的时候是通过这个类
对于发送数据到下游的逻辑是在MysqlSourceReader的父类(SourceReaderBase)中,但是发送的类是有Emitter完成的
由于相关方法是有上层调用执行,我们就不多看了,就简单说明一下,系统调用SourceReaderBase.pollNext(),开始触发数据collect的操作,将其发送至下游节点
我们这里直接阅读MysqlRecordEmitter源码看看他发送数据的逻辑,其实到这里跟sourceFucntion实现的原理基本差不多了我们这里简单过一下即可
代码语言:javascript复制public final class MySqlRecordEmitter<T> implements RecordEmitter<SourceRecord, T, MySqlSplitState> {
private static final FlinkJsonTableChangeSerializer TABLE_CHANGE_SERIALIZER =
new FlinkJsonTableChangeSerializer();
private final DebeziumDeserializationSchema<T> debeziumDeserializationSchema;
private final boolean includeSchemaChanges;
private final OutputCollector<T> outputCollector;
public MySqlRecordEmitter(
DebeziumDeserializationSchema<T> debeziumDeserializationSchema,
MySqlSourceReaderMetrics sourceReaderMetrics,
boolean includeSchemaChanges) {
// 对数据Deserialization的一个对象,与单并行度的类是同一个,具体内部逻辑可以自己看
this.debeziumDeserializationSchema = debeziumDeserializationSchema;
this.sourceReaderMetrics = sourceReaderMetrics;
this.includeSchemaChanges = includeSchemaChanges;
this.outputCollector = new OutputCollector<>();
}
@Override
public void emitRecord(SourceRecord element, SourceOutput<T> output, MySqlSplitState splitState)
throws Exception {
// 判断一下消息的事件类型,如果是一个sourceReocrd则发送下游,否则对应其事件的相关操作
if (isWatermarkEvent(element)) {
BinlogOffset watermark = getWatermark(element);
if (isHighWatermarkEvent(element) && splitState.isSnapshotSplitState()) {
splitState.asSnapshotSplitState().setHighWatermark(watermark);
}
} else if (isSchemaChangeEvent(element) && splitState.isBinlogSplitState()) {
HistoryRecord historyRecord = getHistoryRecord(element);
Array tableChanges =
historyRecord.document().getArray(HistoryRecord.Fields.TABLE_CHANGES);
TableChanges changes = TABLE_CHANGE_SERIALIZER.deserialize(tableChanges, true);
for (TableChanges.TableChange tableChange : changes) {
splitState.asBinlogSplitState().recordSchema(tableChange.getId(), tableChange);
}
if (includeSchemaChanges) {
emitElement(element, output);
}
} else if (isDataChangeRecord(element)) {
if (splitState.isBinlogSplitState()) {
BinlogOffset position = getBinlogPosition(element);
splitState.asBinlogSplitState().setStartingOffset(position);
}
reportMetrics(element);
emitElement(element, output);
} else {
// unknown element
LOG.info("Meet unknown element {}, just skip.", element);
}
}
private void emitElement(SourceRecord element, SourceOutput<T> output) throws Exception {
outputCollector.output = output;
// 调用不同的Deserialization.deserialize方法完成数据的转换以及发送到下游
debeziumDeserializationSchema.deserialize(element, outputCollector);
}
private static class OutputCollector<T> implements Collector<T> {
private SourceOutput<T> output;
@Override
public void collect(T record) {
output.collect(record);
}
}
}
四.题外话 <Table相关内容>
代码语言:javascript复制// RowData的DeserializeSchema,对应上面使用到的DeserializeSchema 做一个简单的介绍
public final class RowDataDebeziumDeserializeSchema
implements DebeziumDeserializationSchema<RowData> {
@Override
public void deserialize(SourceRecord record, Collector<RowData> out) throws Exception {
Envelope.Operation op = Envelope.operationFor(record);
Struct value = (Struct) record.value();
Schema valueSchema = record.valueSchema();
// 针对不同的操作类型,我们需要器对应的数据
// after表示更改之后的数据结果 before表示更改之前的数据
// 只有update的时候才需要同时使用 before和after(动动聪明脑袋瓜为啥呢)
// 针对不同的操作使用RowKind进行表示,在sql层面会根据数据的标识来进行对应处理,比如insert,比如update操作
// 所以对于table内容,我们只需要将数据转换成对应的RowData类型,并对其表示RowKind类型,框架便会帮我在sink的时候
// 做出对应的操作,我们无需编写相关代码来实现
// 所以 sql中使用 cdc 我们需要将其加上RowKind,对于后面的操作我们就无需关心了
// 我们在sql中 formt格式是json,实际上走的逻辑这里我也没看,因为我不太关心,如果大家关心的话或者想了解的,大家可以自己去看看
// 具体的实现方式
if (op == Envelope.Operation.CREATE || op == Envelope.Operation.READ) {
// 构建RowData的方法,我们有加进来,里面内容比较繁琐
// 讲讲大概内容
// 就是对于我们字段的类型,转换成对应的java中类型,当然这里面我说的是flink中,因为他对用到的类型都重新做了一层封装,这样才能支持我们db中的所有类型
GenericRowData insert = extractAfterRow(value, valueSchema);
validator.validate(insert, RowKind.INSERT);
insert.setRowKind(RowKind.INSERT);
// 具体的发送下游方法
emit(record, insert, out);
} else if (op == Envelope.Operation.DELETE) {
GenericRowData delete = extractBeforeRow(value, valueSchema);
validator.validate(delete, RowKind.DELETE);
delete.setRowKind(RowKind.DELETE);
emit(record, delete, out);
} else {
GenericRowData before = extractBeforeRow(value, valueSchema);
validator.validate(before, RowKind.UPDATE_BEFORE);
before.setRowKind(RowKind.UPDATE_BEFORE);
emit(record, before, out);
GenericRowData after = extractAfterRow(value, valueSchema);
validator.validate(after, RowKind.UPDATE_AFTER);
after.setRowKind(RowKind.UPDATE_AFTER);
emit(record, after, out);
}
}
}
对于我们想实现一个tableSource的话,我们需要继承DynamicTableSourceFactory,实现下面的方法,然后通过spi的方法将其动态的加载
- createDynamicTableSource : 创建一个具体的source
- factoryIdentifier : 可以认为是我们描述connector的名字,比如kafka
- requiredOptions : with后面所必须要有参数,比如 username,password等,如果没有抛出异常
- optionalOptions : with后面的配置项,这是可选择的可有可无