Flink从Kafka到Kafka

2020-05-06 18:00:54 浏览数 (1)

为什么要写这篇文章?

Flink出来已经好几年了,现在release版本已经发布到1.10.0(截止2020-05-05),统一了批处理和流处理,很多大公司也都用到生实际务中,跑得也很high。这些大家都知道,但是当我开始考虑怎么在工作中落地flink的时候,我不知道怎么入手。公司比较小,目前没有实时计算,但是etl任务跑得比较慢,效率上有些跟不上。我的思路是想先试着用Flink来处理一些离线任务,看看能不能提升效率,同时为落地实时计算做准备。全网找了半天资料,文章倒是很多,包括一些付费资源,大部分的实例代码都跑不通,真的是跑不通。当然有部分原因是因为我对flink了解太少,但是完整的跑通除了word count之外的代码不应该是一件比较麻烦的事。

功能说明

1.生成json格式数据写入kafka topic1

2.消费topic1中的消息,写入topic2

目的很简单,如果要落地到具体业务免不了需要做多次的数据处理,Flink虽说是可以做批处理,但是支持得最好的还是流数据,确切的说是kafka的数据,跑通了这个流程,实际上Flink的落地就只差业务逻辑了,现在有Flink SQL,实现业务逻辑也是分分钟的事。

代码

其实只有4个文件

代码语言:javascript复制
├── flink-learn-kafka-sink.iml
├── pom.xml
└── src
    ├── main
    │   ├── java
    │   │   └── org
    │   │       └── apache
    │   │           └── flink
    │   │               └── learn
    │   │                   ├── Sink2Kafka.java
    │   │                   ├── model
    │   │                   │   └── FamilyMemberTemperatureRecord.java
    │   │                   └── utils
    │   │                       ├── GsonUtil.java
    │   │                       └── KafkaGenDataUtil.java
    │   └── resources
    └── test
        └── java

pom依赖

代码语言:javascript复制
    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <flink.version>1.10.0</flink.version>
        <java.version>1.8</java.version>
        <scala.binary.version>2.11</scala.binary.version>
        <maven.compiler.source>${java.version}</maven.compiler.source>
        <maven.compiler.target>${java.version}</maven.compiler.target>
    </properties>

    <dependencies>
        <!--  json 处理 -->
        <dependency>
            <groupId>com.google.code.gson</groupId>
            <artifactId>gson</artifactId>
            <version>2.8.5</version>
        </dependency>

        <!--  kafka连接器 -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka-0.11_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <!--  kafka 客户端 -->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>0.11.0.2</version>
        </dependency>
    </dependencies>

model

新冠肺炎影响身边每一个人,举了一个测体温记录测例子

代码语言:java复制
package org.apache.flink.learn.model;

public class FamilyMemberTemperatureRecord {

    private int id;  // 测量次数
    private String name;    // 姓名
    private String temperature;    // 体温
    private String measureTime;    // 测量时间

    public FamilyMemberTemperatureRecord(int id, String name, String temperature, String measureTime) {
        this.id = id;
        this.name = name;
        this.temperature = temperature;
        this.measureTime = measureTime;
    }

    public int getId() {
        return id;
    }

    public void setId(int id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public String getTemperature() {
        return temperature;
    }

    public void setTemperature(String temperature) {
        this.temperature = temperature;
    }

    public String getMeasureTime() {
        return measureTime;
    }

    public void setMeasureTime(String measureTime) {
        this.measureTime = measureTime;
    }
}

json工具类

将对象解析为json格式的数据发给kafka

代码语言:java复制
package org.apache.flink.learn.utils;

import com.google.gson.Gson;
import java.nio.charset.Charset;

/**
 * Desc: json工具类
 * Created by suddenly on 2020-05-05
 */
 
public class GsonUtil {
    private final static Gson gson = new Gson();

    public static <T> T fromJson(String value, Class<T> type) {
        return gson.fromJson(value, type);
    }

    public static String toJson(Object value) {
        return gson.toJson(value);
    }

    public static byte[] toJSONBytes(Object value) {
        return gson.toJson(value).getBytes(Charset.forName("UTF-8"));
    }
}

数据生成工具类

代码语言:java复制
package org.apache.flink.learn.utils;

import org.apache.flink.learn.model.FamilyMemberTemperatureRecord;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.commons.lang3.RandomUtils;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Properties;

/**
 * Desc: 生成数据,写到kafka中
 * Created by suddenly on 2020-05-05
 */
 
public class KafkaGenDataUtil {
        private static final String broker_list = "localhost:9092";
        private static final String topic = "tempeature-source";    // 数据源topic 

        public static void genDataToKafka() throws InterruptedException {
            Properties props = new Properties();
            props.put("bootstrap.servers", broker_list);
            props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            Producer<String, String> producer = new KafkaProducer<>(props);
            try {
                for (int i = 1; i <= 100; i  ) {
                    Date currentTime = new Date();
                    SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
                    String dateString = formatter.format(currentTime);  // 测量时间
                    Double body_tempeature = (int)(RandomUtils.nextDouble(36.0,38.5)*10)/10.0;  // 体温
                    FamilyMemberTemperatureRecord patient = new FamilyMemberTemperatureRecord(i, "suddenly",  String.valueOf(body_tempeature), dateString);
                    ProducerRecord record = new ProducerRecord<String, String>(topic, null, null, GsonUtil.toJson(patient));
                    producer.send(record);
                    System.out.println("记录体温: "   GsonUtil.toJson(patient));
                    Thread.sleep(3 * 1000);
                }
            }catch (Exception e){
            }
            producer.flush();
        }
        public static void main(String[] args) throws InterruptedException {
            genDataToKafka();
        }
}

处理代码

代码语言:java复制
package org.apache.flink.learn;

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011;
import java.util.Properties;

/**
 * Desc: 从kafka中读数据,写到另一个kafka topic中
 * Created by suddenly on 2020-05-05
 */
 
public class Sink2Kafka {
    private static final String SOURCE_TOPIC = "tempeature-source"; // 数据源topic,从这里读数据
    private static final String SINK_TOPIC = "tempeature-sink";     // 什么都不做,数据读出来之后直接写到这个目标topic
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("zookeeper.connect", "localhost:2181");
        props.put("group.id", "tempeature-measure-group");   // 这个随便起个名,没具体研究有什么用,我也是初学,先不用太在意这些细节
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("auto.offset.reset", "latest");
        // 从source读数据
        DataStreamSource<String> student = env.addSource(new FlinkKafkaConsumer011<>(
                SOURCE_TOPIC,
                new SimpleStringSchema(),
                props)).setParallelism(1);
        student.print();
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "localhost:9092");
        properties.setProperty("zookeeper.connect", "localhost:2181");
        properties.setProperty("group.id", "tempeature-measure-group");
        //  写到sink里
        student.addSink(new FlinkKafkaProducer011<>(
                "localhost:9092",
                SINK_TOPIC,
                new SimpleStringSchema()
        )).name("flink-connectors-kafka")
                .setParallelism(5);
        env.execute("flink learning connectors kafka");
    }
}

运行效果

生成数据生成数据
消费数据消费数据
查看kafka source和sink topic中的数据查看kafka source和sink topic中的数据

到此,我们实现了生成数据写到kafka,再把kafka的数据消费后,发到另一个kafka中。

扩展

思考一下,上面的处理过程怎么用到离线业务中

1.把数据生成部分换成离线业务的数据源

2.把转发部分的逻辑改成数据清洗逻辑,离线任务就变成准实时任务了(比如原来按天调度的任务,可以先改成按小时读数据,数据延时就从24小时变成1小时了,进步还是不小的)

3.如果未来离线要改为实时,实时数据肯定也是走消息队列,假设就是kafka,那生成的源数据直接打到data source中就可以了,处理逻辑基本不需要作修改

怎么运行

1.kafka肯定是要安装的

2.上面的例子直接在idea中运行的,代码copy下就可以,如果报错的话,需要把flink-dist的包添加到idea的依赖里,如果你也是mac,/usr目录被隐藏了,添加目录的时候选择Macintosh HD,再按commond shift .就能显示隐藏目录了

idea添加flink基础依赖idea添加flink基础依赖

0 人点赞