自定义Data Sink

2022-06-17 17:21:15 浏览数 (1)

前言

上一篇文章介绍了 Flink Data Sink,也介绍了 Flink 自带的 Sink,那么如何自定义自己的 Sink 呢?这篇文章将写一个 demo 教大家将从 Kafka Source 的数据 Sink 到 MySQL 中去。

准备工作

我们先来看下 Flink 从 Kafka topic 中获取数据的 demo,首先你需要安装好了 FLink 和 Kafka 。 运行启动 Flink、Zookepeer、Kafka,(详细见自定义data source篇) 好了,都启动了!

数据库建表

代码语言:javascript复制
DROP TABLE IF EXISTS `Student`;
CREATE TABLE `Student` (
  `id` int(11) unsigned NOT NULL AUTO_INCREMENT,
  `name` varchar(25) COLLATE utf8_bin DEFAULT NULL,
  `password` varchar(25) COLLATE utf8_bin DEFAULT NULL,
  `age` int(10) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=5 DEFAULT CHARSET=utf8 COLLATE=utf8_bin;

实体类

Student.java

代码语言:javascript复制
package com.thinker.model;
import lombok.*;

/**
 * @author zeekling [lingzhaohui@zeekling.cn]
 * @version 1.0
 * @apiNote 自定义Data Sink
 * @since 2020-05-05
 */
@Setter
@Getter
@ToString
@NoArgsConstructor
@AllArgsConstructor
public class Student2 {

    private int id;
    private String name;
    private String password;
    private int age;
}

工具类

工具类往 kafka topic student 发送数据

代码语言:javascript复制
package com.thinker.util;

import com.thinker.model.Student;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import com.alibaba.fastjson.JSON;

import java.util.Properties;

/**
 * @author zeekling [lingzhaohui@zeekling.cn]
 * @version 1.0
 * @apiNote
 * @since 2020-05-05
 */
public class KafkaUtils2 {
    private static final String broker_list = "localhost:9092";
    private static final String topic = "student";  //kafka topic 需要和 flink 程序用同一个 topic

    private static void writeToKafka() throws InterruptedException {
        Properties props = new Properties();
        props.put("bootstrap.servers", broker_list);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        KafkaProducer producer = new KafkaProducer<String, String>(props);

        for (int i = 1; i <= 200; i  ) {
            Student student = new Student(i, "baiyu"   i, "password"   i, 18   i);
            ProducerRecord record = new ProducerRecord<String, String>(topic, null, null, JSON.toJSONString(student));
            producer.send(record);
            System.out.println("发送数据: "   JSON.toJSONString(student));
        }
        producer.flush();
    }

    public static void main(String[] args) throws InterruptedException {
        writeToKafka();
    }

}

SinkToMySQL

该类就是 Sink Function,继承了 RichSinkFunction ,然后重写了里面的方法。在 invoke 方法中将数据插入到 MySQL 中。

代码语言:javascript复制
package com.thinker.sql;

import com.thinker.model.Student;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;

/**
 * @author zeekling [lingzhaohui@zeekling.cn]
 * @version 1.0
 * @apiNote
 * @since 2020-05-05
 */
public class SinkToMySQL  extends RichSinkFunction<Student> {


    private PreparedStatement ps;
    private Connection connection;

    /**
     * open() 方法中建立连接,这样不用每次 invoke 的时候都要建立连接和释放连接
     *
     * @param parameters
     * @throws Exception
     */
    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        connection = getConnection();
        String sql = "insert into Student(id, name, password, age) values(?, ?, ?, ?);";
        ps = this.connection.prepareStatement(sql);
    }

    @Override
    public void close() throws Exception {
        super.close();
        //关闭连接和释放资源
        if (connection != null) {
            connection.close();
        }
        if (ps != null) {
            ps.close();
        }
    }

    /**
     * 每条数据的插入都要调用一次 invoke() 方法
     *
     * @param value
     * @param context
     * @throws Exception
     */
    @Override
    public void invoke(Student value, Context context) throws Exception {
        //组装数据,执行插入操作
        ps.setInt(1, value.getId());
        ps.setString(2, value.getName());
        ps.setString(3, value.getPassword());
        ps.setInt(4, value.getAge());
        ps.executeUpdate();
        System.out.println("sink to mysql");
    }

    private static Connection getConnection() {
        Connection con = null;
        try {
            con = DriverManager.getConnection("jdbc:mysql://localhost:3306/flink_test?useUnicode=true&characterEncoding=UTF-8", "root", "123456");
        } catch (Exception e) {
            System.out.println("-----------mysql get connection has exception , msg = "  e.getMessage());
        }
        return con;
    }

}

Flink 程序

这里的 source 是从 kafka 读取数据的,然后 Flink 从 Kafka 读取到数据(JSON)后用阿里 fastjson 来解析成 student 对象,然后在 addSink 中使用我们创建的 SinkToMySQL,这样就可以把数据存储到 MySQL 了。

代码语言:javascript复制
package com.thinker.main;

import com.thinker.model.Student;
import com.alibaba.fastjson.JSON;
import com.thinker.sql.SinkToMySQL;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;

import java.util.Properties;

/**
* @author zeekling [lingzhaohui@zeekling.cn]
* @version 1.0
* @apiNote
* @since 2020-05-05
*/
public class SinkToMysql {

   public static void main(String[] args) throws Exception {
       final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

       Properties props = new Properties();
       props.put("bootstrap.servers", "localhost:9092");
       props.put("zookeeper.connect", "localhost:2181");
       props.put("group.id", "metric-group");
       props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
       props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
       props.put("auto.offset.reset", "latest");

       SingleOutputStreamOperator<Student> student = env.addSource(new FlinkKafkaConsumer011<>(
               "student",   //这个 kafka topic 需要和上面的工具类的 topic 一致
               new SimpleStringSchema(),
               props)).setParallelism(1)
               .map(string -> JSON.parseObject(string, Student.class)); //Fastjson 解析字符串成 student 对象

       student.addSink(new SinkToMySQL()); //数据 sink 到 mysql

       env.execute("Flink add sink");
   }

}

结果

运行 Flink 程序,然后再运行 KafkaUtils2.java 工具类,这样就可以了。 如果数据插入成功了,那么我们查看下我们的数据库:

数据库中已经插入了 100 条我们从 Kafka 发送的数据了。证明我们的 SinkToMySQL 起作用了。

0 人点赞