2021年大数据Flink(三十七):​​​​​​​Table与SQL ​​​​​​案例四

2021-10-11 14:53:52 浏览数 (1)


案例四

需求

从Kafka中消费数据并过滤出状态为success的数据再写入到Kafka

代码语言:javascript复制
{"user_id": "1", "page_id":"1", "status": "success"}

{"user_id": "1", "page_id":"1", "status": "success"}

{"user_id": "1", "page_id":"1", "status": "success"}

{"user_id": "1", "page_id":"1", "status": "success"}

{"user_id": "1", "page_id":"1", "status": "fail"}
代码语言:javascript复制
/export/server/kafka/bin/kafka-topics.sh --create --zookeeper node1:2181 --replication-factor 2 --partitions 3 --topic input_kafka


/export/server/kafka/bin/kafka-topics.sh --create --zookeeper node1:2181 --replication-factor 2 --partitions 3 --topic output_kafka


/export/server/kafka/bin/kafka-console-producer.sh --broker-list node1:9092 --topic input_kafka


/export/server/kafka/bin/kafka-console-consumer.sh --bootstrap-server node1:9092 --topic output_kafka --from-beginning

​​​​​​​代码实现

Apache Flink 1.12 Documentation: Table API & SQL

Apache Flink 1.12 Documentation: Apache Kafka SQL Connector

代码语言:javascript复制
package cn.it.sql;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

/**
 * Author lanson
 * Desc
 */
public class FlinkSQL_Table_Demo06 {
    public static void main(String[] args) throws Exception {
        //1.准备环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

        //2.Source
        TableResult inputTable = tEnv.executeSql(
                "CREATE TABLE input_kafka (n"  
                        "  `user_id` BIGINT,n"  
                        "  `page_id` BIGINT,n"  
                        "  `status` STRINGn"  
                        ") WITH (n"  
                        "  'connector' = 'kafka',n"  
                        "  'topic' = 'input_kafka',n"  
                        "  'properties.bootstrap.servers' = 'node1:9092',n"  
                        "  'properties.group.id' = 'testGroup',n"  
                        "  'scan.startup.mode' = 'latest-offset',n"  
                        "  'format' = 'json'n"  
                        ")"
        );
        TableResult outputTable = tEnv.executeSql(
                "CREATE TABLE output_kafka (n"  
                        "  `user_id` BIGINT,n"  
                        "  `page_id` BIGINT,n"  
                        "  `status` STRINGn"  
                        ") WITH (n"  
                        "  'connector' = 'kafka',n"  
                        "  'topic' = 'output_kafka',n"  
                        "  'properties.bootstrap.servers' = 'node1:9092',n"  
                        "  'format' = 'json',n"  
                        "  'sink.partitioner' = 'round-robin'n"  
                        ")"
        );

        String sql = "select "  
                "user_id,"  
                "page_id,"  
                "status "  
                "from input_kafka "  
                "where status = 'success'";

        Table ResultTable = tEnv.sqlQuery(sql);

        DataStream<Tuple2<Boolean, Row>> resultDS = tEnv.toRetractStream(ResultTable, Row.class);
        resultDS.print();

        tEnv.executeSql("insert into output_kafka select * from " ResultTable);


        //7.excute
        env.execute();
    }


}

0 人点赞