在上一篇讲述CEP的文章里,直接使用了自定义Source和Sink,我翻阅了一下以前的文章,似乎没有对这部分进行一个梳理,那么今天我们来就这上次的代码,来说说 Data Source 和 Data Sink吧。
从宏观上讲,flink的编程模型就可以概况成接入data source,然后进行数据转换操作,再讲处理结果sink出来。如下图所示。
其实这可以形成一个完美的闭环,将处理结果sink到另外一个流里的时候,那么这个sink就又可以变成下一个flink job的source了。当然也可以选择sink到一个csv文件里,或是通过jdbc写到数据库里。
Data Source
我们还是以上一篇文章的空气质量例子为例,我们制造一个发生器,来向制造数据,然后将数据写入kafka。
代码语言:javascript复制import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010;
import wang.datahub.cep.event.AirQualityRecoder;
//import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer09;
import java.io.*;
import java.util.HashMap;
import java.util.Map;
public class WriteIntoKafka {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Map prop = new HashMap();
prop.put("bootstrap.servers", "localhost:9092");
prop.put("topic", "test1");
ParameterTool parameterTool = ParameterTool.fromMap(prop);
DataStream<AirQualityRecoder> messageStream = env.addSource(new SimpleGenerator());
DataStreamSink<AirQualityRecoder> airQualityVODataStreamSink = messageStream.addSink(new FlinkKafkaProducer010<>(parameterTool.getRequired("bootstrap.servers"),
parameterTool.getRequired("topic"),
new SimpleAirQualityRecoderSchema()));
messageStream.print();
env.execute("write to kafka !!!");
}
public static class SimpleGenerator implements SourceFunction<AirQualityRecoder>{
private static final long serialVersionUID = 1L;
boolean running = true;
@Override
public void run(SourceContext<AirQualityRecoder> ctx) throws Exception {
while(running) {
ctx.collect(AirQualityRecoder.createOne());
}
}
@Override
public void cancel() {
running = false;
}
}
public static class SimpleAirQualityRecoderSchema implements DeserializationSchema<AirQualityRecoder>, SerializationSchema<AirQualityRecoder>{
@Override
public AirQualityRecoder deserialize(byte[] message) throws IOException {
//System.out.println(new String(message));
ByteArrayInputStream bi = new ByteArrayInputStream(message);
ObjectInputStream oi = new ObjectInputStream(bi);
AirQualityRecoder obj = null;
try {
obj = (AirQualityRecoder)oi.readObject();
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
bi.close();
oi.close();
return obj;
}
@Override
public boolean isEndOfStream(AirQualityRecoder nextElement) {
return false;
}
@Override
public byte[] serialize(AirQualityRecoder element) {
byte[] bytes = null;
try {
ByteArrayOutputStream bo = new ByteArrayOutputStream();
ObjectOutputStream oo = new ObjectOutputStream(bo);
oo.writeObject(element);
bytes = bo.toByteArray();
bo.close();
oo.close();
} catch (IOException e) {
e.printStackTrace();
}
return bytes;
// return element.toCsvRec().getBytes();
}
@Override
public TypeInformation<AirQualityRecoder> getProducedType() {
return TypeInformation.of(new TypeHint<AirQualityRecoder>(){});
}
}
}
这里是完整代码,我们需要实现一个类,实现SourceFunction接口,重写run和cancel两个方法,run方法用于告诉上下文如何添加数据,而cancel方法则实现如何取消这个数据发生器。将这个类的实例addSource到当前环境实例上,就完成的数据的接入。这个例子,我们还使用了一个kafka connector提供的默认sink,将模拟数据写入kafka。
Data Sink
Sink部分会介绍两部分内容,1.Sink 到 JDBC 2.通过 Flink SQL Sink 到 CSV
Sink 到 JDBC
首先我们创建一个sink类,继承RichSinkFunction,需要实现其open,close,invoke三个方法,其中open和close用于初始化资源和释放资源,invoke用于实现具体的sink动作。
代码语言:javascript复制import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import wang.datahub.cep.event.AirQualityRecoder;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
public class MySQLSink extends RichSinkFunction<AirQualityRecoder> {
PreparedStatement ps;
private Connection connection;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
connection = getConnection();
String sql = "insert into sinktable(id, city,airquality,emmit,et) values(?, ?, ?, ?, ?);";
ps = this.connection.prepareStatement(sql);
}
@Override
public void close() throws Exception {
super.close();
//关闭连接和释放资源
if (connection != null) {
connection.close();
}
if (ps != null) {
ps.close();
}
}
@Override
public void invoke(AirQualityRecoder value, Context context) throws Exception {
//组装数据,执行插入操作
ps.setString(1, value.getId());
ps.setString(2, value.getCity());
ps.setInt(3, value.getAirQuality());
ps.setDate(4,new java.sql.Date(value.getEmmit().getTime()));
ps.setDate(5,new java.sql.Date(value.getEt()));
ps.executeUpdate();
}
private static Connection getConnection() {
Connection con = null;
try {
Class.forName("com.mysql.jdbc.Driver");
con = DriverManager.getConnection("jdbc:mysql://localhost:3306/flinksink?useUnicode=true&characterEncoding=UTF-8", "dafei1288", "dafei1288");
} catch (Exception e) {
e.printStackTrace();
}
return con;
}
}
构建一个测试类,直接读取kafka输入的数据,然后将其sink出来。
代码语言:javascript复制import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import wang.datahub.cep.WriteIntoKafka;
import wang.datahub.cep.event.AirQualityRecoder;
import java.util.HashMap;
import java.util.Map;
public class SinkToMySQLApp {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Map properties= new HashMap();
properties.put("bootstrap.servers", "localhost:9092");
properties.put("group.id", "test");
properties.put("enable.auto.commit", "true");
properties.put("auto.commit.interval.ms", "1000");
properties.put("auto.offset.reset", "earliest");
properties.put("session.timeout.ms", "30000");
// properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("topic", "test1");
ParameterTool parameterTool = ParameterTool.fromMap(properties);
FlinkKafkaConsumer010 consumer010 = new FlinkKafkaConsumer010(
parameterTool.getRequired("topic"), new WriteIntoKafka.SimpleAirQualityRecoderSchema(), parameterTool.getProperties());
DataStream<AirQualityRecoder> aqrStream = env
.addSource(consumer010);
MySQLSink mSink = new MySQLSink();
aqrStream.addSink(mSink);
env.execute("write to mysql");
}
}
执行成功。
通过 Flink SQL Sink 到 CSV
这个sink比较特殊,是通过flink sql执行DML来,最终达到sink的目的,我们这个案例,使用了API提供的CsvTableSink。
这里我们将输入流的数据,注册成了AirQualityRecoder表,然后sink table ss以及其包含的字段名称和类型,,最后通过SQL语句
INSERT INTO ss SELECT id,city,airQuality,emmit,et FROM AirQualityRecoder
将数据写入csv文件中
代码语言:javascript复制import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.table.api.StreamTableEnvironment;
import org.apache.flink.table.sinks.CsvTableSink;
import org.apache.flink.table.sinks.TableSink;
import wang.datahub.cep.WriteIntoKafka;
import wang.datahub.cep.event.AirQualityRecoder;
import java.util.HashMap;
import java.util.Map;
public class FlinkSQLSinkToMySQLApp {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.getTableEnvironment(env);
Map properties= new HashMap();
properties.put("bootstrap.servers", "localhost:9092");
properties.put("group.id", "test");
properties.put("enable.auto.commit", "true");
properties.put("auto.commit.interval.ms", "1000");
properties.put("auto.offset.reset", "earliest");
properties.put("session.timeout.ms", "30000");
// properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("topic", "test1");
ParameterTool parameterTool = ParameterTool.fromMap(properties);
FlinkKafkaConsumer010 consumer010 = new FlinkKafkaConsumer010(
parameterTool.getRequired("topic"), new WriteIntoKafka.SimpleAirQualityRecoderSchema(), parameterTool.getProperties());
DataStream<AirQualityRecoder> aqrStream = env
.addSource(consumer010);
tableEnv.registerDataStreamInternal("AirQualityRecoder",aqrStream);
TableSink csvSink = new CsvTableSink("E:\devlop\sourcespace\cept\src\test\aa",",");
String[] fieldNames = {"id", "city","airQuality","emmit","et"};
TypeInformation[] fieldTypes = {Types.STRING, Types.STRING,Types.INT,Types.SQL_DATE,Types.LONG};
tableEnv.registerTableSink("ss", fieldNames, fieldTypes, csvSink);
tableEnv.sqlUpdate(
"INSERT INTO ss SELECT id,city,airQuality,emmit,et FROM AirQualityRecoder");
env.execute("write to mysql");
}
}
为了方便演示,我们将AirQualityRecoder里的emmit字段,变更为了java.sql.Date类型。下面是sink的结果。
好了,关于 Data Source 和 Data Sink 就先介绍到这里,欢迎大家和我交流。