1.示例代码
代码语言:javascript复制public class FlinkCDCSQL {
public static void main(String[] args) throws Exception {
// 1.设置流的环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// // 2.设置并行度
env.setParallelism(1);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
//2.创建 Flink-MySQL-CDC 的 Source
tableEnv.executeSql("CREATE TABLE student2 ("
" id STRING,"
" name STRING,"
" address STRING"
") WITH ("
" 'connector' = 'mysql-cdc',"
" 'scan.startup.mode' = 'latest-offset',"
" 'scan.incremental.snapshot.enabled' = 'false',"
" 'hostname' = 'localhost',"
" 'port' = '3306',"
" 'username' = 'root',"
" 'password' = 'root',"
" 'database-name' = 'test',"
" 'table-name' = 'student'"
")");
tableEnv.executeSql("select * from student2").print();
env.execute();
}
}
2.遇到的问题
问题1
代码语言:javascript复制Caused by: java.lang.NoSuchMethodError: org.apache.flink.table.factories.DynamicTableFactory$Context.getCatalogTable()Lorg/apache/flink/table/catalog/ResolvedCatalogTable;at com.ververica.cdc.connectors.mysql.table.MySqlTableSourceFactory.createDynamicTableSource(MySqlTableSourceFactory.java:80)
at org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:122)
... 19 more
解决办法:
代码语言:javascript复制Maven 将Flink版本换成
<flink-version>1.13.0</flink-version>
问题2
代码语言:javascript复制Caused by: org.apache.flink.table.api.ValidationException: The primary key is necessary when enable 'Key: 'scan.incremental.snapshot.enabled' , default: true (fallback keys: [])' to 'true'
at com.ververica.cdc.connectors.mysql.table.MySqlTableSourceFactory.validatePrimaryKeyIfEnableParallel(MySqlTableSourceFactory.java:182)
at com.ververica.cdc.connectors.mysql.table.MySqlTableSourceFactory.createDynamicTableSource(MySqlTableSourceFactory.java:85)
at org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:128)
... 19 more
解决方案
代码语言:javascript复制再SQL语句中添加这一行,问题解决,1.13版本需要表有主键
" 'scan.incremental.snapshot.enabled' = 'false',"
本人开通付费的知识群,如果需要可以添加QQ:975863632,需要99.9元即可加入,添加需要备注【云雀课堂知识群】,这里可以获取到上面的源码,如果遇到问题可以一起解决,同时可以一起学习和进步。