【基于Flink的城市交通实时监控平台】需求二:卡口的实时拥堵情况-滑动窗口

2024-07-25 15:45:49 浏览数 (2)

需求分析

卡口的实时拥堵情况,其实就是通过卡口的车辆平均车速和通过的车辆的数量,为了统计实时的平均车速,我设定一个滑动窗口,窗口长度是为5分钟,滑动步长为1分钟。

代码语言:javascript复制
平均车速=当前窗口内通过车辆的车速之和 / 当前窗口内通过的车辆数量
滑动窗口: 窗口长度是为5分钟,滑动步长为1分钟(为了测试方便,设置为10秒)
MySQL建表语句
代码语言:javascript复制
DROP TABLE IF EXISTS `t_average_speed`;
CREATE TABLE `t_average_speed` (
 `id` int(11) NOT NULL AUTO_INCREMENT,
 `start_time` bigint(20) DEFAULT NULL,
 `end_time` bigint(20) DEFAULT NULL,
 `monitor_id` varchar(255) DEFAULT NULL,
 `avg_speed` double DEFAULT NULL,
 `car_count` int(11) DEFAULT NULL,
 PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
项目代码
代码语言:javascript复制
package car;

import bean.MonitorInfo;
import com.mysql.jdbc.PreparedStatement;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.sql.Timestamp;

public class JamWindow {
//day110612SourceMySQL.java是需求一
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env =
                StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

        DataStreamSource<String> dss =
                env.socketTextStream("hadoop10", 9999);

        SingleOutputStreamOperator<MonitorInfo> carDss = dss.map(new MapFunction<String, MonitorInfo>() {
            @Override
            public MonitorInfo map(String s) throws Exception {
                String[] arr = s.split(",");
                return new MonitorInfo(
                        Long.parseLong(arr[0]),
                        arr[1],
                        arr[2],
                        arr[3],
                        Double.parseDouble(arr[4]), arr[5], arr[6]);
            }
        });

        //获取路口的ID,因为要按照路口分组
        KeyedStream<MonitorInfo, String> keyedDS = carDss.keyBy(c -> c.getMonitorId());
        //1> MonitorInfo{actionTime = 1686647521, monitorId = 0002, cameraId = 1, car = 豫A39396, speed = 90.0, roadId = 01, areaId = 20, speedLimit = null}

        // 路口通过汽车的数量--基于时间的滚动窗口  10miao
        SingleOutputStreamOperator<Tuple2<String, String>> avg = keyedDS
                .window(TumblingProcessingTimeWindows.of(Time.seconds(10)))  // 设置滚动窗口长度为10秒
                .apply(new WindowFunction<MonitorInfo, Tuple2<String, String>, String, TimeWindow>() { //IN OUT KEY W
                    @Override
                    public void apply(String key,
                                      TimeWindow window,
                                      Iterable<MonitorInfo> input,
                                      Collector<Tuple2<String, String>> out)
                            throws Exception {
                        //累加窗口内通过车辆的车速之和和计算路口通过汽车的数量。
                        double speedSum = 0.0;
                        int count = 0;

                        for (MonitorInfo info : input) {
                            speedSum  = info.getSpeed(); //速度累加
                            count  ;                     //计数器
                        }
                        double averageSpeed = count > 0 ? speedSum / count : 0.0;
                        //如果 count 大于 0,则计算速度总和(speedSum)除以 count 得到平均速度;否则,将平均速度设置为 0.0。

                        String start = new Timestamp(window.getStart()).toString();
                        String end = new Timestamp(window.getEnd()).toString();

                        out.collect(Tuple2.of(key, averageSpeed   ","   count   ","   start   ","   end));
                    }
                });


        avg.print();
        keyedDS.print();
        //TODO 5.execute-执行

        avg.addSink(
                JdbcSink.sink(
                        "insert into t_average_speed (id,start_time,end_time,monitor_id,avg_speed,car_count) values (null, ?, ?, ?, ?, ?)",
                        (preparedStatement, tuple) -> {
                            // 解析二元组的字段
                            String key = tuple.f0;
                            String[] fields = tuple.f1.split(",");
                            double averageSpeed = Double.parseDouble(fields[0]);
                            int carCount = Integer.parseInt(fields[1]);
                            String startTime = fields[2];
                            String endTime = fields[3];

                            // 设置参数
                            preparedStatement.setString(1, startTime);
                            preparedStatement.setString(2, endTime);
                            preparedStatement.setString(3, key);
                            preparedStatement.setDouble(4, averageSpeed);
                            preparedStatement.setInt(5, carCount);
                        },
                        JdbcExecutionOptions.builder()
                                .withBatchSize(1) //设置批处理大小,表示每次向数据库提交的批处理数据量。在这里设置为 1,表示每次只提交一条数据。
                                .withBatchIntervalMs(200)//设置批处理间隔时间,表示两次提交批处理之间的时间间隔。在这里设置为 200 毫秒,表示每隔 200 毫秒提交一次批处理。
                                .build(),
                        new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                                .withUrl("jdbc:mysql://hadoop10:3306/yangyulin?allowMultiQueries=true&useUnicode=true&characterEncoding=UTF-8&useSSL=false")
                                .withDriverName("com.mysql.jdbc.Driver")
                                .withUsername("root")
                                .withPassword("0000")
                                .build()
                )
        );


        env.execute();

    }
}
代码语言:javascript复制
这段代码实现了对车辆通过卡口的实时拥堵情况进行统计和存储。

代码解释如下:

导入所需的类和包。

创建StreamExecutionEnvironment实例,并设置运行模式为AUTOMATIC。

创建一个socketTextStream,从指定的主机和端口接收实时数据流。

使用map函数将接收到的文本数据转换为MonitorInfo对象。

使用keyBy操作将数据流按照卡口ID进行分区。

创建一个滚动窗口,窗口长度为10秒,对每个窗口内的数据进行处理。

在窗口函数apply中,累加窗口内通过车辆的车速之和和计算路口通过的车辆数量。

计算窗口内平均车速,如果有通过的车辆,则计算速度总和除以车辆数量得到平均速度;否则,平均速度设置为0.0。

获取窗口的起始时间和结束时间,并将结果以元组形式输出。

使用print()函数打印计算得到的平均车速。

使用print()函数打印分区后的数据流。

将结果写入到MySQL数据库中。

使用JdbcSink.sink()方法创建JDBC sink。
设置插入数据的SQL语句,使用占位符表示待填充的参数。
使用lambda表达式定义参数填充逻辑,将元组中的字段值设置到预编译语句中的对应位置。
使用JdbcExecutionOptions设置批处理大小和间隔时间。
使用JdbcConnectionOptions设置数据库连接信息。
将JDBC sink添加到数据流中,用于将数据写入MySQL数据库。
调用env.execute()方法启动Flink程序的执行。

总体来说,该代码通过对车辆数据流的处理,统计每个卡口窗口内的平均车速和通过的车辆数量,并将结果写入到MySQL数据库中。同时,通过print()函数打印中间结果,方便调试和观察程序执行过程。
测试流程

任意从端口发送对应格式的数据即可

0 人点赞