FlinkCDC的探索与实践【SQL部分】2

2022-09-23 22:33:59 浏览数 (1)

1.示例代码

代码语言:javascript复制
public class FlinkCDCSQL {

	public static void main(String[] args) throws Exception {
		// 1.设置流的环境
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//		// 2.设置并行度
		env.setParallelism(1);
		StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

		//2.创建 Flink-MySQL-CDC 的 Source
		tableEnv.executeSql("CREATE TABLE student2 ("  
				" id STRING,"  
				" name STRING,"  
				" address STRING"  
				") WITH ("  
				" 'connector' = 'mysql-cdc',"  
				" 'scan.startup.mode' = 'latest-offset',"  
				" 'scan.incremental.snapshot.enabled' = 'false',"  
				" 'hostname' = 'localhost',"  
				" 'port' = '3306',"  
				" 'username' = 'root',"  
				" 'password' = 'root',"  
				" 'database-name' = 'test',"  
				" 'table-name' = 'student'"  
				")");
		tableEnv.executeSql("select * from student2").print();
		env.execute();
	}
}

2.遇到的问题

问题1

代码语言:javascript复制
Caused by: java.lang.NoSuchMethodError: org.apache.flink.table.factories.DynamicTableFactory$Context.getCatalogTable()Lorg/apache/flink/table/catalog/ResolvedCatalogTable;at com.ververica.cdc.connectors.mysql.table.MySqlTableSourceFactory.createDynamicTableSource(MySqlTableSourceFactory.java:80)
at org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:122)
... 19 more

解决办法:

代码语言:javascript复制
Maven 将Flink版本换成
<flink-version>1.13.0</flink-version>

问题2

代码语言:javascript复制
Caused by: org.apache.flink.table.api.ValidationException: The primary key is necessary when enable 'Key: 'scan.incremental.snapshot.enabled' , default: true (fallback keys: [])' to 'true'
	at com.ververica.cdc.connectors.mysql.table.MySqlTableSourceFactory.validatePrimaryKeyIfEnableParallel(MySqlTableSourceFactory.java:182)
	at com.ververica.cdc.connectors.mysql.table.MySqlTableSourceFactory.createDynamicTableSource(MySqlTableSourceFactory.java:85)
	at org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:128)
	... 19 more

解决方案

代码语言:javascript复制
再SQL语句中添加这一行,问题解决,1.13版本需要表有主键
" 'scan.incremental.snapshot.enabled' = 'false',"

本人开通付费的知识群,如果需要可以添加QQ:975863632,需要99.9元即可加入,添加需要备注【云雀课堂知识群】,这里可以获取到上面的源码,如果遇到问题可以一起解决,同时可以一起学习和进步。

0 人点赞