代码语言:javascript复制
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
DataStreamSource<String> streamSource = env.socketTextStream("127.0.0.1", 9999,"n");
SingleOutputStreamOperator<WaterSensor> waterDS = streamSource.map(new MapFunction<String, WaterSensor>() {
@Override
public WaterSensor map(String s) throws Exception {
String[] split = s.split(",");
return new WaterSensor(split[0], Long.parseLong(split[1]), Integer.parseInt(split[2]));
}
});
// 将流转化为表
Table table = tableEnv.fromDataStream(waterDS,
$("id"),
$("ts"),
$("vc"),
$("pt").proctime());
tableEnv.createTemporaryView("EventTable", table);
tableEnv.executeSql("CREATE TABLE flinksink ("
"componentname STRING,"
"componentcount BIGINT NOT NULL,"
"componentsum BIGINT"
") WITH ("
"'connector.type' = 'jdbc',"
"'connector.url' = 'jdbc:mysql://localhost:3306/testdb?characterEncoding=UTF-8&useUnicode=true&useSSL=false&tinyInt1isBit=false&allowPublicKeyRetrieval=true&serverTimezone=Asia/Shanghai',"
"'connector.table' = 'flinksink',"
"'connector.driver' = 'com.mysql.cj.jdbc.Driver',"
"'connector.username' = 'root',"
"'connector.password' = 'root',"
"'connector.write.flush.max-rows'='3'rn"
")"
);
Table mysql_user = tableEnv.from("flinksink");
mysql_user.printSchema();
Table result = tableEnv.sqlQuery(
"SELECT "
"id as componentname, " //window_start, window_end,
"COUNT(ts) as componentcount ,SUM(ts) as componentsum "
"FROM TABLE( "
"TUMBLE( TABLE EventTable , "
"DESCRIPTOR(pt), "
"INTERVAL '10' SECOND)) "
"GROUP BY id , window_start, window_end"
);
//方式一:写入数据库
// result.executeInsert("flinksink").print(); //;.insertInto("flinksink");
//方式二:写入数据库
tableEnv.createTemporaryView("ResultTable", result);
tableEnv.executeSql("insert into flinksink SELECT * FROM ResultTable").print();
// tableEnv.toAppendStream(result, Row.class).print("toAppendStream"); //追加模式
env.execute();
}