(7)FlinkSQL将kafka数据写入到mysql方式二

2022-08-08 11:19:55 浏览数 (2)

代码语言:javascript复制
    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        tableEnv.executeSql("CREATE TABLE WaterSensor ("  
                "id STRING,"  
                "ts BIGINT,"  
                "vc BIGINT,"  
//                "`pt` TIMESTAMP(3)," 
//                "WATERMARK FOR pt AS pt - INTERVAL '10' SECOND"  
                "pt as PROCTIME() "  
                ") WITH ("  
                "'connector' = 'kafka',"  
                "'topic' = 'kafka_data_waterSensor',"  
                "'properties.bootstrap.servers' = '127.0.0.1:9092',"  
                "'properties.group.id' =  'test',"  
                "'scan.startup.mode' = 'earliest-offset',"  
//                "'json.fail-on-missing-field' = 'false',"  
//                "'json.ignore-parse-errors' = 'true',"  
                "'format' = 'json'"  
                ")"
        );

        tableEnv.executeSql("CREATE TABLE flinksink ("  
                "componentname STRING,"  
                "componentcount BIGINT NOT NULL,"  
                "componentsum BIGINT"  
                ") WITH ("  
                "'connector.type' = 'jdbc',"  
                "'connector.url' = 'jdbc:mysql://localhost:3306/testdb?characterEncoding=UTF-8&useUnicode=true&useSSL=false&tinyInt1isBit=false&allowPublicKeyRetrieval=true&serverTimezone=Asia/Shanghai',"  
                "'connector.table' = 'flinksink',"  
                "'connector.driver' =  'com.mysql.cj.jdbc.Driver',"  
                "'connector.username' = 'root',"  
                "'connector.password' = 'root',"  
                "'connector.write.flush.max-rows'='3'rn"  
                ")"
        );

        Table result = tableEnv.sqlQuery(
                "SELECT "  
                        "id as componentname, "                  //window_start, window_end,
                        "COUNT(ts) as componentcount ,SUM(ts) as componentsum "  
                        "FROM TABLE( "  
                        "TUMBLE( TABLE WaterSensor , "  
                        "DESCRIPTOR(pt), "  
                        "INTERVAL '10' SECOND)) "  
                        "GROUP BY id , window_start, window_end"
        );

//        //方式一:写入数据库
////        result.executeInsert("flinksink").print(); //;.insertInto("flinksink");
//
        //方式二:写入数据库
        tableEnv.createTemporaryView("ResultTable", result);
        tableEnv.executeSql("insert into flinksink SELECT * FROM ResultTable").print();

        env.execute();
    }

0 人点赞