前言
大家好,我是ChinaManor,直译过来就是中国码农的意思,我希望自己能成为国家复兴道路的铺路人,大数据领域的耕耘者,平凡但不甘于平庸的人。
今天为大家带来Flink的一个综合应用案例:Flink数据写入Kafka 从Kafka存入Mysql 第一部分:写数据到kafka中
代码语言:javascript复制 public static void writeToKafka() throws Exception{
Properties props = new Properties();
props.put("bootstrap.servers", BROKER_LIST);
props.put("key.serializer", CONST_SERIALIZER);
props.put("value.serializer", CONST_SERIALIZER);
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
//构建User对象,在name为data后边加个随机数
int randomInt = RandomUtils.nextInt(1, 100000);
User user = new User();
user.setName("data" randomInt);
user.setId(randomInt);
//转换成JSON
String userJson = JSON.toJSONString(user);
//包装成kafka发送的记录
ProducerRecord<String, String> record = new ProducerRecord<String, String>(TOPIC_USER, partition,
null, userJson);
//发送到缓存
producer.send(record);
System.out.println("向kafka发送数据:" userJson);
//立即发送
producer.flush();
}
重点:
代码语言:javascript复制//发送到缓存
producer.send(record);
为了增强代码的Robust,我们将常量单独拎出来:
代码语言:javascript复制 //本地的kafka机器列表
public static final String BROKER_LIST = "192.168.88.161:9092";
//kafka的topic
public static final String TOPIC_USER = "USER";
//kafka的partition分区
public static final Integer partition = 0;
//序列化的方式
public static final String CONST_SERIALIZER = "org.apache.kafka.common.serialization.StringSerializer";
//反序列化
public static final String CONST_DESERIALIZER = "org.apache.kafka.common.serialization.StringDeserializer";
main方法如下:
代码语言:javascript复制public static void main(String[] args) {
while(true) {
try {
//每三秒写一条数据
TimeUnit.SECONDS.sleep(3);
writeToKafka();
} catch (Exception e) {
e.printStackTrace();
}
}
}
第二部分:从kafka获取数据
KafkaRickSourceFunction.java
代码语言:javascript复制import com.hy.flinktest.entity.User;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import java.io.IOException;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
@Slf4j
public class KafkaRickSourceFunction extends RichSourceFunction<String>{
//kafka
private static Properties prop = new Properties();
private boolean running = true;
//作静态化处理,增强robust
private static Integer partition = WritedatatoKafka.partition;
static {
prop.put("bootstrap.servers",WritedatatoKafka.BROKER_LIST);
prop.put("zookeeper.connect","192.168.88.161:2181");
prop.put("group.id",WritedatatoKafka.TOPIC_USER);
prop.put("key.deserializer",WritedatatoKafka.CONST_DESERIALIZER);
prop.put("value.deserializer",WritedatatoKafka.CONST_DESERIALIZER);
prop.put("auto.offset.reset","latest");
prop.put("max.poll.records", "500");
prop.put("auto.commit.interval.ms", "1000");
}
@Override
public void run(SourceContext sourceContext) throws Exception {
//创建一个消费者客户端实例
KafkaConsumer<String,String> kafkaConsumer = new KafkaConsumer<String, String>(prop);
//只消费TOPIC_USER 分区
TopicPartition topicPartition = new TopicPartition(WritedatatoKafka.TOPIC_USER,partition);
long offset =0; //这个初始值应该从zk或其他地方获取
offset = placeOffsetToBestPosition(kafkaConsumer, offset, topicPartition);
while (running){
ConsumerRecords<String, String> records = kafkaConsumer.poll(1000);
if(records.isEmpty()){
continue;
}
for (ConsumerRecord<String, String> record : records) {
//record.offset();
//record.key()
String value = record.value();
sourceContext.collect(value);
}
}
}
然后 返回最合适的offset
代码语言:javascript复制 /**
* 将offset定位到最合适的位置,并返回最合适的offset。
* @param kafkaConsumer consumer
* @param offset offset
* @param topicPartition partition
* @return the best offset
*/
private long placeOffsetToBestPosition(
KafkaConsumer<String, String> kafkaConsumer,
long offset, TopicPartition topicPartition) {
List<TopicPartition> partitions = Collections.singletonList(topicPartition);
kafkaConsumer.assign(partitions);
long bestOffset = offset;
if (offset == 0) {
log.info("由于offset为0,重新定位offset到kafka起始位置.");
kafkaConsumer.seekToBeginning(partitions);
} else if (offset > 0) {
kafkaConsumer.seekToBeginning(partitions);
long startPosition = kafkaConsumer.position(topicPartition);
kafkaConsumer.seekToEnd(partitions);
long endPosition = kafkaConsumer.position(topicPartition);
if (offset < startPosition) {
log.info("由于当前offset({})比kafka的最小offset({})还要小,则定位到kafka的最小offset({})处。",
offset, startPosition, startPosition);
kafkaConsumer.seekToBeginning(partitions);
bestOffset = startPosition;
} else if (offset > endPosition) {
log.info("由于当前offset({})比kafka的最大offset({})还要大,则定位到kafka的最大offset({})处。",
offset, endPosition, endPosition);
kafkaConsumer.seekToEnd(partitions);
bestOffset = endPosition;
} else {
kafkaConsumer.seek(topicPartition, offset);
}
}
return bestOffset;
}
@Override
public void cancel() {
running = false;
}
}
第三部分 主类:从kafka读取数据写入mysql
代码语言:javascript复制 //1.构建流执行环境 并添加数据源
代码语言:javascript复制 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> dataStreamSource = env.addSource(new KafkaRickSourceFunction());
代码语言:javascript复制 //2.从kafka里读取数据,转换成User对象
代码语言:javascript复制 DataStream<User> dataStream = dataStreamSource.map(lines -> JSONObject.parseObject(lines, User.class));
代码语言:javascript复制//3.收集5秒钟的总数
代码语言:javascript复制dataStream.timeWindowAll(Time.seconds(5L)).
apply(new AllWindowFunction<User, List<User>, TimeWindow>() {
@Override
public void apply(TimeWindow timeWindow, Iterable<User> iterable, Collector<List<User>> out) throws Exception {
List<User> users = Lists.newArrayList(iterable);
if(users.size() > 0) {
System.out.println("5秒内总共收到的条数:" users.size());
out.collect(users);
}
}
})
//sink 到数据库
.addSink(new MysqlRichSinkFunction());
//打印到控制台
//.print();
第四部分: 写入到目标数据库sink MysqlRichSinkFunction.java
代码语言:javascript复制@Slf4j
public class MysqlRichSinkFunction extends RichSinkFunction<List<User>> {
private Connection connection = null;
private PreparedStatement ps = null;
@Override
public void open(Configuration parameters) throws Exception {
// super.open(parameters);
log.info("获取数据库连接");
connection = DbUtil.getConnection();
String sql = "insert into user1(id,name) values (?,?)";
ps = connection.prepareStatement(sql);
}
public void invoke(List<User> users, Context ctx) throws Exception {
//获取ReadMysqlResoure发送过来的结果
for(User user : users) {
ps.setLong(1, user.getId());
ps.setString(2, user.getName());
ps.addBatch();
}
//一次性写入
int[] count = ps.executeBatch();
log.info("成功写入Mysql数量:" count.length);
}
@Override
public void close() throws Exception {
//关闭并释放资源
if(connection != null) {
connection.close();
}
if(ps != null) {
ps.close();
}
}
}