代码语言:javascript复制
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
tableEnv.executeSql("CREATE TABLE WaterSensor ("
"id STRING,"
"ts BIGINT,"
"vc BIGINT,"
// "`pt` TIMESTAMP(3),"
// "WATERMARK FOR pt AS pt - INTERVAL '10' SECOND"
"pt as PROCTIME() "
") WITH ("
"'connector' = 'kafka',"
"'topic' = 'kafka_data_waterSensor',"
"'properties.bootstrap.servers' = '127.0.0.1:9092',"
"'properties.group.id' = 'test',"
"'scan.startup.mode' = 'earliest-offset',"
// "'json.fail-on-missing-field' = 'false',"
// "'json.ignore-parse-errors' = 'true',"
"'format' = 'json'"
")"
);
tableEnv.executeSql("CREATE TABLE flinksink ("
"componentname STRING,"
"componentcount BIGINT NOT NULL,"
"componentsum BIGINT"
") WITH ("
"'connector.type' = 'jdbc',"
"'connector.url' = 'jdbc:mysql://localhost:3306/testdb?characterEncoding=UTF-8&useUnicode=true&useSSL=false&tinyInt1isBit=false&allowPublicKeyRetrieval=true&serverTimezone=Asia/Shanghai',"
"'connector.table' = 'flinksink',"
"'connector.driver' = 'com.mysql.cj.jdbc.Driver',"
"'connector.username' = 'root',"
"'connector.password' = 'root',"
"'connector.write.flush.max-rows'='3'rn"
")"
);
Table result = tableEnv.sqlQuery(
"SELECT "
"id as componentname, " //window_start, window_end,
"COUNT(ts) as componentcount ,SUM(ts) as componentsum "
"FROM TABLE( "
"TUMBLE( TABLE WaterSensor , "
"DESCRIPTOR(pt), "
"INTERVAL '10' SECOND)) "
"GROUP BY id , window_start, window_end"
);
// //方式一:写入数据库
//// result.executeInsert("flinksink").print(); //;.insertInto("flinksink");
//
//方式二:写入数据库
tableEnv.createTemporaryView("ResultTable", result);
tableEnv.executeSql("insert into flinksink SELECT * FROM ResultTable").print();
env.execute();
}