为什么要写这篇文章?
Flink出来已经好几年了,现在release版本已经发布到1.10.0(截止2020-05-05),统一了批处理和流处理,很多大公司也都用到生实际务中,跑得也很high。这些大家都知道,但是当我开始考虑怎么在工作中落地flink的时候,我不知道怎么入手。公司比较小,目前没有实时计算,但是etl任务跑得比较慢,效率上有些跟不上。我的思路是想先试着用Flink来处理一些离线任务,看看能不能提升效率,同时为落地实时计算做准备。全网找了半天资料,文章倒是很多,包括一些付费资源,大部分的实例代码都跑不通,真的是跑不通。当然有部分原因是因为我对flink了解太少,但是完整的跑通除了word count之外的代码不应该是一件比较麻烦的事。
功能说明
1.生成json格式数据写入kafka topic1
2.消费topic1中的消息,写入topic2
目的很简单,如果要落地到具体业务免不了需要做多次的数据处理,Flink虽说是可以做批处理,但是支持得最好的还是流数据,确切的说是kafka的数据,跑通了这个流程,实际上Flink的落地就只差业务逻辑了,现在有Flink SQL,实现业务逻辑也是分分钟的事。
代码
其实只有4个文件
代码语言:javascript复制├── flink-learn-kafka-sink.iml
├── pom.xml
└── src
├── main
│ ├── java
│ │ └── org
│ │ └── apache
│ │ └── flink
│ │ └── learn
│ │ ├── Sink2Kafka.java
│ │ ├── model
│ │ │ └── FamilyMemberTemperatureRecord.java
│ │ └── utils
│ │ ├── GsonUtil.java
│ │ └── KafkaGenDataUtil.java
│ └── resources
└── test
└── java
pom依赖
代码语言:javascript复制 <properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.10.0</flink.version>
<java.version>1.8</java.version>
<scala.binary.version>2.11</scala.binary.version>
<maven.compiler.source>${java.version}</maven.compiler.source>
<maven.compiler.target>${java.version}</maven.compiler.target>
</properties>
<dependencies>
<!-- json 处理 -->
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.8.5</version>
</dependency>
<!-- kafka连接器 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.11_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- kafka 客户端 -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.11.0.2</version>
</dependency>
</dependencies>
model
新冠肺炎影响身边每一个人,举了一个测体温记录测例子
代码语言:java复制package org.apache.flink.learn.model;
public class FamilyMemberTemperatureRecord {
private int id; // 测量次数
private String name; // 姓名
private String temperature; // 体温
private String measureTime; // 测量时间
public FamilyMemberTemperatureRecord(int id, String name, String temperature, String measureTime) {
this.id = id;
this.name = name;
this.temperature = temperature;
this.measureTime = measureTime;
}
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getTemperature() {
return temperature;
}
public void setTemperature(String temperature) {
this.temperature = temperature;
}
public String getMeasureTime() {
return measureTime;
}
public void setMeasureTime(String measureTime) {
this.measureTime = measureTime;
}
}
json工具类
将对象解析为json格式的数据发给kafka
代码语言:java复制package org.apache.flink.learn.utils;
import com.google.gson.Gson;
import java.nio.charset.Charset;
/**
* Desc: json工具类
* Created by suddenly on 2020-05-05
*/
public class GsonUtil {
private final static Gson gson = new Gson();
public static <T> T fromJson(String value, Class<T> type) {
return gson.fromJson(value, type);
}
public static String toJson(Object value) {
return gson.toJson(value);
}
public static byte[] toJSONBytes(Object value) {
return gson.toJson(value).getBytes(Charset.forName("UTF-8"));
}
}
数据生成工具类
代码语言:java复制package org.apache.flink.learn.utils;
import org.apache.flink.learn.model.FamilyMemberTemperatureRecord;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.commons.lang3.RandomUtils;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Properties;
/**
* Desc: 生成数据,写到kafka中
* Created by suddenly on 2020-05-05
*/
public class KafkaGenDataUtil {
private static final String broker_list = "localhost:9092";
private static final String topic = "tempeature-source"; // 数据源topic
public static void genDataToKafka() throws InterruptedException {
Properties props = new Properties();
props.put("bootstrap.servers", broker_list);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
try {
for (int i = 1; i <= 100; i ) {
Date currentTime = new Date();
SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String dateString = formatter.format(currentTime); // 测量时间
Double body_tempeature = (int)(RandomUtils.nextDouble(36.0,38.5)*10)/10.0; // 体温
FamilyMemberTemperatureRecord patient = new FamilyMemberTemperatureRecord(i, "suddenly", String.valueOf(body_tempeature), dateString);
ProducerRecord record = new ProducerRecord<String, String>(topic, null, null, GsonUtil.toJson(patient));
producer.send(record);
System.out.println("记录体温: " GsonUtil.toJson(patient));
Thread.sleep(3 * 1000);
}
}catch (Exception e){
}
producer.flush();
}
public static void main(String[] args) throws InterruptedException {
genDataToKafka();
}
}
处理代码
代码语言:java复制package org.apache.flink.learn;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011;
import java.util.Properties;
/**
* Desc: 从kafka中读数据,写到另一个kafka topic中
* Created by suddenly on 2020-05-05
*/
public class Sink2Kafka {
private static final String SOURCE_TOPIC = "tempeature-source"; // 数据源topic,从这里读数据
private static final String SINK_TOPIC = "tempeature-sink"; // 什么都不做,数据读出来之后直接写到这个目标topic
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("zookeeper.connect", "localhost:2181");
props.put("group.id", "tempeature-measure-group"); // 这个随便起个名,没具体研究有什么用,我也是初学,先不用太在意这些细节
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("auto.offset.reset", "latest");
// 从source读数据
DataStreamSource<String> student = env.addSource(new FlinkKafkaConsumer011<>(
SOURCE_TOPIC,
new SimpleStringSchema(),
props)).setParallelism(1);
student.print();
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("zookeeper.connect", "localhost:2181");
properties.setProperty("group.id", "tempeature-measure-group");
// 写到sink里
student.addSink(new FlinkKafkaProducer011<>(
"localhost:9092",
SINK_TOPIC,
new SimpleStringSchema()
)).name("flink-connectors-kafka")
.setParallelism(5);
env.execute("flink learning connectors kafka");
}
}
运行效果
到此,我们实现了生成数据写到kafka,再把kafka的数据消费后,发到另一个kafka中。
扩展
思考一下,上面的处理过程怎么用到离线业务中
1.把数据生成部分换成离线业务的数据源
2.把转发部分的逻辑改成数据清洗逻辑,离线任务就变成准实时任务了(比如原来按天调度的任务,可以先改成按小时读数据,数据延时就从24小时变成1小时了,进步还是不小的)
3.如果未来离线要改为实时,实时数据肯定也是走消息队列,假设就是kafka,那生成的源数据直接打到data source中就可以了,处理逻辑基本不需要作修改
怎么运行
1.kafka肯定是要安装的
2.上面的例子直接在idea中运行的,代码copy下就可以,如果报错的话,需要把flink-dist的包添加到idea的依赖里,如果你也是mac,/usr目录被隐藏了,添加目录的时候选择Macintosh HD,再按commond shift .就能显示隐藏目录了