Flink 写入数据到 Kafka

2022-06-17 17:20:32 浏览数 (1)

前言

通过Flink官网可以看到Flink里面就默认支持了不少sink,比如也支持Kafka sink connector(FlinkKafkaProducer),那么这篇文章我们就来看看如何将数据写入到Kafka。

准备

Flink里面支持Kafka 0.8、0.9、0.10、0.11.

这里我们需要安装下Kafka,请对应添加对应的Flink Kafka connector依赖的版本,这里我们使用的是0.11 版本:

代码语言:javascript复制
<dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka-0.11_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
</dependency>

目前我们先看下本地Kafka是否有这个student-write topic呢?需要执行下这个命令:

代码语言:javascript复制
./kafka-topics.sh --list --zookeeper localhost:2181

执行结果:

代码语言:javascript复制
Picked up _JAVA_OPTIONS: -Dawt.useSystemAAFontSettings=on -Dswing.aatext=true  
__consumer_offsets  
metric  
student

如果等下我们的程序运行起来后,再次执行这个命令出现student-write topic,那么证明我的程序确实起作用了,已经将其他集群的Kafka数据写入到本地Kafka了。

代码

代码语言:javascript复制
package com.thinker.kafka;

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.*;

import java.util.Properties;

/**
 * @author zeekling [lingzhaohui@zeekling.cn]
 * @version 1.0
 * @apiNote
 * @since 2020-05-14
 */
public class FlinkSinkToKafka {

    private static final String READ_TOPIC = "student-write";

    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("zookeeper.connect", "localhost:2181");
        props.put("group.id", "student-group");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("auto.offset.reset", "latest");
        DataStreamSource<String> student = env.addSource(new FlinkKafkaConsumer011<>(
                READ_TOPIC,   //这个 kafka topic 需要和上面的工具类的 topic 一致
                new SimpleStringSchema(),
                props)).setParallelism(1);
        student.print();
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "localhost:9092");
        properties.setProperty("zookeeper.connect", "localhost:2181");
        properties.setProperty("group.id", "student-write");

        student.addSink(new FlinkKafkaProducer011<String>(
                "student-write",
                new SimpleStringSchema(),
                properties
        )).name("flink-connectors-kafka").setParallelism(1);
        student.print();
        env.execute("flink learning connectors kafka");
    }

}

运行程序

将下面列举出来的包拷贝到flink对应的目录下面,并且重启flink。

执行下面命令提交flink任务

代码语言:javascript复制
./bin/flink run -c com.thinker.kafka.FlinkSinkToKafka ~/project/flink-test/target/flink-test-1.0-SNAPSHOT.jar

提交成功后执行下面命令:

代码语言:javascript复制
/kafka-topics.sh --list --zookeeper localhost:2181

执行结果:

0 人点赞