(1)通过FlinkSQL将数据写入mysql demo

2022-08-08 11:06:47 浏览数 (2)

FlinkSQL的出现,极大程度上降低了Flink的编程门槛,更加容易理解和掌握使用。今天将自己的笔记分享出来,希望能帮助在这方面有需要的朋友。

(1)首先引入POM依赖:

代码语言:javascript复制
<properties>
    <flink.version>1.13.1</flink.version>
    <scala.binary.version>2.12</scala.binary.version>
    <slf4j.version>1.7.30</slf4j.version>
</properties>

<dependencies>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-java</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
        <version>${flink.version}</version
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients_${scala.binary.version}</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
        <version>${flink.version}</version>
    </dependency>

    <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-jdbc -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId>
        <version>${flink.version}</version>
        <!--<scope>provided</scope>-->
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-table-common</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-json</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-databind -->
    <dependency>
        <groupId>com.fasterxml.jackson.core</groupId>
        <artifactId>jackson-databind</artifactId>
        <version>2.12.0</version>
    </dependency>

    <!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java -->
    <dependency>
        <groupId>mysql</groupId>
        <artifactId>mysql-connector-java</artifactId>
        <version>8.0.16</version>
    </dependency>

    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>1.2.66</version>
    </dependency>
</dependencies>

(2)编写代码

代码语言:javascript复制
public static void main(String[] args) throws Exception {
    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    EnvironmentSettings settings = EnvironmentSettings.newInstance()
            .inStreamingMode()
            //.useOldPlanner() // flink
            .useBlinkPlanner() // blink
            .build();
    StreamTableEnvironment ste = StreamTableEnvironment.create(env, settings);


    String ddl = "CREATE TABLE flinksinksds(rn"  
            "componentname STRING,rn"  
            "componentcount INT,rn"  
            "componentsum INTrn"  
            ") WITH(rn"  
            "'connector.type'='jdbc',rn"  
            "'connector.driver' =  'com.mysql.cj.jdbc.Driver',"  
            "'connector.url'='jdbc:mysql://localhost:3306/testdb?characterEncoding=UTF-8&useUnicode=true&useSSL=false&tinyInt1isBit=false&allowPublicKeyRetrieval=true&serverTimezone=Asia/Shanghai',rn"  
            "'connector.table'='flinksink',rn"  
            "'connector.username'='root',rn"  
            "'connector.password'='root',rn"  
            "'connector.write.flush.max-rows'='1'rn"  
            ")";
    System.err.println(ddl);
    ste.executeSql(ddl);

    String insert = "insert into flinksinksds(componentname,componentcount,componentsum)"  
            "values('1024', 1 , 2 )";
    ste.executeSql(insert);
    env.execute();
    System.exit(0);
}

(3)执行结果:

0 人点赞