(5)FlinkSQL将socket数据写入到mysql方式二

2022-08-08 11:16:23 浏览数 (2)

代码语言:javascript复制
    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
        DataStreamSource<String> streamSource = env.socketTextStream("127.0.0.1", 9999,"n");
        SingleOutputStreamOperator<WaterSensor> waterDS = streamSource.map(new MapFunction<String, WaterSensor>() {
            @Override
            public WaterSensor map(String s) throws Exception {
                String[] split = s.split(",");
                return new WaterSensor(split[0], Long.parseLong(split[1]), Integer.parseInt(split[2]));
            }
        });

        // 将流转化为表
        Table table = tableEnv.fromDataStream(waterDS,
                $("id"),
                $("ts"),
                $("vc"),
                $("pt").proctime());

        tableEnv.createTemporaryView("EventTable", table);


        tableEnv.executeSql("CREATE TABLE flinksink ("  
                "componentname STRING,"  
                "componentcount BIGINT NOT NULL,"  
                "componentsum BIGINT"  
                ") WITH ("  
                "'connector.type' = 'jdbc',"  
                "'connector.url' = 'jdbc:mysql://localhost:3306/testdb?characterEncoding=UTF-8&useUnicode=true&useSSL=false&tinyInt1isBit=false&allowPublicKeyRetrieval=true&serverTimezone=Asia/Shanghai',"  
                "'connector.table' = 'flinksink',"  
                "'connector.driver' =  'com.mysql.cj.jdbc.Driver',"  
                "'connector.username' = 'root',"  
                "'connector.password' = 'root',"  
                "'connector.write.flush.max-rows'='3'rn"  
                ")"
        );
        Table mysql_user = tableEnv.from("flinksink");
        mysql_user.printSchema();

        Table result = tableEnv.sqlQuery(
                "SELECT "  
                        "id as componentname, "                  //window_start, window_end,
                        "COUNT(ts) as componentcount ,SUM(ts) as componentsum "  
                        "FROM TABLE( "  
                        "TUMBLE( TABLE EventTable , "  
                        "DESCRIPTOR(pt), "  
                        "INTERVAL '10' SECOND)) "  
                        "GROUP BY id , window_start, window_end"
        );

        //方式一:写入数据库
//        result.executeInsert("flinksink").print(); //;.insertInto("flinksink");

        //方式二:写入数据库
        tableEnv.createTemporaryView("ResultTable", result);
        tableEnv.executeSql("insert into flinksink SELECT * FROM ResultTable").print();

//        tableEnv.toAppendStream(result, Row.class).print("toAppendStream");           //追加模式
        env.execute();
    }

0 人点赞