案例四
需求
从Kafka中消费数据并过滤出状态为success的数据再写入到Kafka
代码语言:javascript复制{"user_id": "1", "page_id":"1", "status": "success"}
{"user_id": "1", "page_id":"1", "status": "success"}
{"user_id": "1", "page_id":"1", "status": "success"}
{"user_id": "1", "page_id":"1", "status": "success"}
{"user_id": "1", "page_id":"1", "status": "fail"}
代码语言:javascript复制/export/server/kafka/bin/kafka-topics.sh --create --zookeeper node1:2181 --replication-factor 2 --partitions 3 --topic input_kafka
/export/server/kafka/bin/kafka-topics.sh --create --zookeeper node1:2181 --replication-factor 2 --partitions 3 --topic output_kafka
/export/server/kafka/bin/kafka-console-producer.sh --broker-list node1:9092 --topic input_kafka
/export/server/kafka/bin/kafka-console-consumer.sh --bootstrap-server node1:9092 --topic output_kafka --from-beginning
代码实现
Apache Flink 1.12 Documentation: Table API & SQL
Apache Flink 1.12 Documentation: Apache Kafka SQL Connector
代码语言:javascript复制package cn.it.sql;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
/**
* Author lanson
* Desc
*/
public class FlinkSQL_Table_Demo06 {
public static void main(String[] args) throws Exception {
//1.准备环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
//2.Source
TableResult inputTable = tEnv.executeSql(
"CREATE TABLE input_kafka (n"
" `user_id` BIGINT,n"
" `page_id` BIGINT,n"
" `status` STRINGn"
") WITH (n"
" 'connector' = 'kafka',n"
" 'topic' = 'input_kafka',n"
" 'properties.bootstrap.servers' = 'node1:9092',n"
" 'properties.group.id' = 'testGroup',n"
" 'scan.startup.mode' = 'latest-offset',n"
" 'format' = 'json'n"
")"
);
TableResult outputTable = tEnv.executeSql(
"CREATE TABLE output_kafka (n"
" `user_id` BIGINT,n"
" `page_id` BIGINT,n"
" `status` STRINGn"
") WITH (n"
" 'connector' = 'kafka',n"
" 'topic' = 'output_kafka',n"
" 'properties.bootstrap.servers' = 'node1:9092',n"
" 'format' = 'json',n"
" 'sink.partitioner' = 'round-robin'n"
")"
);
String sql = "select "
"user_id,"
"page_id,"
"status "
"from input_kafka "
"where status = 'success'";
Table ResultTable = tEnv.sqlQuery(sql);
DataStream<Tuple2<Boolean, Row>> resultDS = tEnv.toRetractStream(ResultTable, Row.class);
resultDS.print();
tEnv.executeSql("insert into output_kafka select * from " ResultTable);
//7.excute
env.execute();
}
}