需求分析
卡口的实时拥堵情况,其实就是通过卡口的车辆平均车速和通过的车辆的数量,为了统计实时的平均车速,我设定一个滑动窗口,窗口长度是为5分钟,滑动步长为1分钟。
代码语言:javascript复制平均车速=当前窗口内通过车辆的车速之和 / 当前窗口内通过的车辆数量
滑动窗口: 窗口长度是为5分钟,滑动步长为1分钟(为了测试方便,设置为10秒)
MySQL建表语句
代码语言:javascript复制DROP TABLE IF EXISTS `t_average_speed`;
CREATE TABLE `t_average_speed` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`start_time` bigint(20) DEFAULT NULL,
`end_time` bigint(20) DEFAULT NULL,
`monitor_id` varchar(255) DEFAULT NULL,
`avg_speed` double DEFAULT NULL,
`car_count` int(11) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
项目代码
代码语言:javascript复制package car;
import bean.MonitorInfo;
import com.mysql.jdbc.PreparedStatement;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.sql.Timestamp;
public class JamWindow {
//day110612SourceMySQL.java是需求一
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
DataStreamSource<String> dss =
env.socketTextStream("hadoop10", 9999);
SingleOutputStreamOperator<MonitorInfo> carDss = dss.map(new MapFunction<String, MonitorInfo>() {
@Override
public MonitorInfo map(String s) throws Exception {
String[] arr = s.split(",");
return new MonitorInfo(
Long.parseLong(arr[0]),
arr[1],
arr[2],
arr[3],
Double.parseDouble(arr[4]), arr[5], arr[6]);
}
});
//获取路口的ID,因为要按照路口分组
KeyedStream<MonitorInfo, String> keyedDS = carDss.keyBy(c -> c.getMonitorId());
//1> MonitorInfo{actionTime = 1686647521, monitorId = 0002, cameraId = 1, car = 豫A39396, speed = 90.0, roadId = 01, areaId = 20, speedLimit = null}
// 路口通过汽车的数量--基于时间的滚动窗口 10miao
SingleOutputStreamOperator<Tuple2<String, String>> avg = keyedDS
.window(TumblingProcessingTimeWindows.of(Time.seconds(10))) // 设置滚动窗口长度为10秒
.apply(new WindowFunction<MonitorInfo, Tuple2<String, String>, String, TimeWindow>() { //IN OUT KEY W
@Override
public void apply(String key,
TimeWindow window,
Iterable<MonitorInfo> input,
Collector<Tuple2<String, String>> out)
throws Exception {
//累加窗口内通过车辆的车速之和和计算路口通过汽车的数量。
double speedSum = 0.0;
int count = 0;
for (MonitorInfo info : input) {
speedSum = info.getSpeed(); //速度累加
count ; //计数器
}
double averageSpeed = count > 0 ? speedSum / count : 0.0;
//如果 count 大于 0,则计算速度总和(speedSum)除以 count 得到平均速度;否则,将平均速度设置为 0.0。
String start = new Timestamp(window.getStart()).toString();
String end = new Timestamp(window.getEnd()).toString();
out.collect(Tuple2.of(key, averageSpeed "," count "," start "," end));
}
});
avg.print();
keyedDS.print();
//TODO 5.execute-执行
avg.addSink(
JdbcSink.sink(
"insert into t_average_speed (id,start_time,end_time,monitor_id,avg_speed,car_count) values (null, ?, ?, ?, ?, ?)",
(preparedStatement, tuple) -> {
// 解析二元组的字段
String key = tuple.f0;
String[] fields = tuple.f1.split(",");
double averageSpeed = Double.parseDouble(fields[0]);
int carCount = Integer.parseInt(fields[1]);
String startTime = fields[2];
String endTime = fields[3];
// 设置参数
preparedStatement.setString(1, startTime);
preparedStatement.setString(2, endTime);
preparedStatement.setString(3, key);
preparedStatement.setDouble(4, averageSpeed);
preparedStatement.setInt(5, carCount);
},
JdbcExecutionOptions.builder()
.withBatchSize(1) //设置批处理大小,表示每次向数据库提交的批处理数据量。在这里设置为 1,表示每次只提交一条数据。
.withBatchIntervalMs(200)//设置批处理间隔时间,表示两次提交批处理之间的时间间隔。在这里设置为 200 毫秒,表示每隔 200 毫秒提交一次批处理。
.build(),
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withUrl("jdbc:mysql://hadoop10:3306/yangyulin?allowMultiQueries=true&useUnicode=true&characterEncoding=UTF-8&useSSL=false")
.withDriverName("com.mysql.jdbc.Driver")
.withUsername("root")
.withPassword("0000")
.build()
)
);
env.execute();
}
}
代码语言:javascript复制这段代码实现了对车辆通过卡口的实时拥堵情况进行统计和存储。
代码解释如下:
导入所需的类和包。
创建StreamExecutionEnvironment实例,并设置运行模式为AUTOMATIC。
创建一个socketTextStream,从指定的主机和端口接收实时数据流。
使用map函数将接收到的文本数据转换为MonitorInfo对象。
使用keyBy操作将数据流按照卡口ID进行分区。
创建一个滚动窗口,窗口长度为10秒,对每个窗口内的数据进行处理。
在窗口函数apply中,累加窗口内通过车辆的车速之和和计算路口通过的车辆数量。
计算窗口内平均车速,如果有通过的车辆,则计算速度总和除以车辆数量得到平均速度;否则,平均速度设置为0.0。
获取窗口的起始时间和结束时间,并将结果以元组形式输出。
使用print()函数打印计算得到的平均车速。
使用print()函数打印分区后的数据流。
将结果写入到MySQL数据库中。
使用JdbcSink.sink()方法创建JDBC sink。
设置插入数据的SQL语句,使用占位符表示待填充的参数。
使用lambda表达式定义参数填充逻辑,将元组中的字段值设置到预编译语句中的对应位置。
使用JdbcExecutionOptions设置批处理大小和间隔时间。
使用JdbcConnectionOptions设置数据库连接信息。
将JDBC sink添加到数据流中,用于将数据写入MySQL数据库。
调用env.execute()方法启动Flink程序的执行。
总体来说,该代码通过对车辆数据流的处理,统计每个卡口窗口内的平均车速和通过的车辆数量,并将结果写入到MySQL数据库中。同时,通过print()函数打印中间结果,方便调试和观察程序执行过程。
测试流程
任意从端口发送对应格式的数据即可