需求分析
实时车辆分布情况,是指在一段时间内(比如:10分钟)整个城市中每个区分布多少量车。这里要注意车辆的去重,因为在10分钟内一定会有很多的车,经过不同的卡口。这些车牌相同的车,我们只统计一次。其实就是根据车牌号去重。
具体需求以及技术选型:
- 使用Flink读取kafka中发送的Json会话;
- 使用反序列化工具解析Json会话字符串为JavaBean对象
MonitorInfo.java
(详见需求一); - getAreaId作为key,而后设置滚动窗口;
- apply中编写业务代码统计车量;
- 写入MySQL数据库
t_area_control
表
测试数据
代码语言:javascript复制{"actionTime":1686647522,"monitorId":"0003","cameraId":"1","car":"豫A99999","speed":60,"roadId":"01","areaId":"20"}
{"actionTime":1686647523,"monitorId":"0004","cameraId":"1","car":"豫A99999","speed":80,"roadId":"01","areaId":"20"}
{"actionTime":1686647523,"monitorId":"0004","cameraId":"1","car":"豫A99999","speed":80,"roadId":"01","areaId":"30"}
{"actionTime":1686647524,"monitorId":"0005","cameraId":"1","car":"豫A99998","speed":60,"roadId":"01","areaId":"30"}
{"actionTime":1686647524,"monitorId":"0005","cameraId":"1","car":"豫A99997","speed":60,"roadId":"01","areaId":"10"}
测试结果查询样例:
代码语言:javascript复制id 区域编号 车的数量 窗口的开始时间 窗口的结束时间
1 20 1 20230-06-19 18:30:00 20230-06-19 18:40:00
2 30 2 20230-06-19 18:30:00 20230-06-19 18:40:00
3 10 1 20230-06-19 18:30:00 20230-06-19 18:40:00
建表语句
代码语言:javascript复制create table t_area_control(
id int primary key auto_increment,
area_id varchar(50),
car_count int,
window_start varchar(50),
window_end varchar(50)
)
需求代码
代码语言:javascript复制package car;
import bean.MonitorInfo;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.util.Collector;
import util.Constants;
import util.JSONDeserializationSchema;
import java.util.HashSet;
import java.util.Properties;
/**
* 实时车辆分布情况
* 实时车辆分布情况,是指在一段时间内(比如:10分钟)整个城市中每个区分布多少量车。
* 这里要注意车辆的去重,因为在10分钟内一定会有很多的车,经过不同的卡口。这些车牌相同的车,我们只统计一次。其实就是根据车牌号去重。
* 统计结果如下
* id 区域编号 车的数量 窗口的开始时间 窗口的结束时间
* 1 20 1 20230-06-19 18:30:00 20230-06-19 18:40:00
* 2 30 2 20230-06-19 18:30:00 20230-06-19 18:40:00
* 3 10 1 20230-06-19 18:30:00 20230-06-19 18:40:00
*/
public class Test5_RealTimeDistribution {
public static void main(String[] args) throws Exception {
//TODO 1.env-准备环境
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
env.setParallelism(1);
//TODO 2.source-加载数据
//从kafka的topic1消费数据
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "hadoop10:9092");
properties.setProperty("group.id", "flinkgroup3");
//使用反序列化工具获取JSON字符串内容,将其解析为javaBean对象
FlinkKafkaConsumer<MonitorInfo> consumer = new FlinkKafkaConsumer<MonitorInfo>("topic-car",
new JSONDeserializationSchema<>(MonitorInfo.class), properties);
DataStreamSource<MonitorInfo> ds1 = env.addSource(consumer);
ds1.print();//MonitorInfo{actionTime = 1686647524, monitorId = 0005, cameraId = 1, car = 豫A99997, speed = 60.0, roadId = 01, areaId = 10, speedLimit = null}
KeyedStream<MonitorInfo, String> dsKey = ds1.keyBy(v -> v.getAreaId());//获取areaId为key
WindowedStream<MonitorInfo, String, TimeWindow> window =
dsKey.window(TumblingProcessingTimeWindows.of(Time.seconds(5)));//设置滚动窗口
SingleOutputStreamOperator<Tuple4<String, Integer, String, String>> apply = window.apply(new WindowFunction<MonitorInfo, Tuple4<String, Integer, String, String>, String, TimeWindow>() {
@Override
public void apply(String s, TimeWindow window, Iterable<MonitorInfo> iterable, Collector<Tuple4<String, Integer, String, String>> collector) throws Exception {
//将车牌放到集合中去重
HashSet<String> cars = new HashSet<>();
for (MonitorInfo car : iterable) {
cars.add(car.getCar());
}
String start = DateFormatUtils.format(window.getStart(), Constants.D1);
String end = DateFormatUtils.format(window.getEnd(), Constants.D1);
collector.collect(Tuple4.of(s, cars.size(), start, end));
}
});
apply.addSink(
JdbcSink.sink(
"insert into t_area_control values (null, ?, ?, ?,? )",
(ps, value) -> {
ps.setString(1, value.f0);
ps.setLong(2, value.f1);
ps.setString(3, value.f2);
ps.setString(4, value.f3);
},
JdbcExecutionOptions.builder()
.withBatchSize(1)
.withBatchIntervalMs(200)
.build(),
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withUrl("jdbc:mysql://hadoop10:3306/yangyulin?allowMultiQueries=true&useUnicode=true&characterEncoding=UTF-8&useSSL=false")
.withDriverName("com.mysql.jdbc.Driver")
.withUsername("root")
.withPassword("0000")
.build()
));
env.execute();
}
}
import util.Constants;
该工具为解析时间戳工具:
package util;
public class Constants {
public static final String D1 = "yyyy-MM-dd HH:mm:ss";
}
import util.JSONDeserializationSchema;
该工具为解析JSON为JavbaBean对象:
package util;
import com.alibaba.fastjson.JSON;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import java.io.IOException;
public class JSONDeserializationSchema<T> implements DeserializationSchema<T> {
private Class<T> clz;
public JSONDeserializationSchema(Class<T> clz) {
this.clz = clz;
}
@Override
public T deserialize(byte[] message) throws IOException {
return JSON.parseObject(new String(message),clz);
}
@Override
public boolean isEndOfStream(T nextElement) {
return false;
}
@Override
public TypeInformation<T> getProducedType() {
return TypeInformation.of(clz);
}
}
实验截图
通过Kafka发送测试数据
查询MySQL表中结果
扩展内容
解析JSON为Bean对象使用了alibaba的maven依赖工具:
代码语言:javascript复制 <dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>2.0.32</version>
</dependency>
测试案例:
代码语言:javascript复制package test;
import bean.MonitorInfo;
import com.alibaba.fastjson.JSON;
//json测试
public class testJson {
public static void main(String[] args) {
String s1 = "{"actionTime":1686647524,"monitorId":"0005","cameraId":"1","car":"豫A99997","speed":60,"roadId":"01","areaId":"10"}n";
MonitorInfo monitorInfo = JSON.parseObject(s1, MonitorInfo.class);
System.out.println(monitorInfo);
}
}