【基于Flink的城市交通实时监控平台】需求三:实时车辆分布情况-滚动窗口-JSON解析为对象

2024-07-25 15:45:40 浏览数 (1)

需求分析

实时车辆分布情况,是指在一段时间内(比如:10分钟)整个城市中每个区分布多少量车。这里要注意车辆的去重,因为在10分钟内一定会有很多的车,经过不同的卡口。这些车牌相同的车,我们只统计一次。其实就是根据车牌号去重。

具体需求以及技术选型:

  • 使用Flink读取kafka中发送的Json会话;
  • 使用反序列化工具解析Json会话字符串为JavaBean对象MonitorInfo.java(详见需求一);
  • getAreaId作为key,而后设置滚动窗口;
  • apply中编写业务代码统计车量;
  • 写入MySQL数据库t_area_control
测试数据
代码语言:javascript复制
{"actionTime":1686647522,"monitorId":"0003","cameraId":"1","car":"豫A99999","speed":60,"roadId":"01","areaId":"20"}
{"actionTime":1686647523,"monitorId":"0004","cameraId":"1","car":"豫A99999","speed":80,"roadId":"01","areaId":"20"}
{"actionTime":1686647523,"monitorId":"0004","cameraId":"1","car":"豫A99999","speed":80,"roadId":"01","areaId":"30"}
{"actionTime":1686647524,"monitorId":"0005","cameraId":"1","car":"豫A99998","speed":60,"roadId":"01","areaId":"30"}
{"actionTime":1686647524,"monitorId":"0005","cameraId":"1","car":"豫A99997","speed":60,"roadId":"01","areaId":"10"}
测试结果查询样例:
代码语言:javascript复制
id   区域编号    车的数量      窗口的开始时间   		    窗口的结束时间
1     20           1           20230-06-19 18:30:00         20230-06-19 18:40:00
2     30           2           20230-06-19 18:30:00         20230-06-19 18:40:00
3     10           1           20230-06-19 18:30:00         20230-06-19 18:40:00
建表语句
代码语言:javascript复制
create table t_area_control(
	id 			int 	primary key auto_increment,
	area_id     varchar(50),
	car_count   int,
	window_start  varchar(50),
	window_end    varchar(50)
)
需求代码
代码语言:javascript复制
package car;

import bean.MonitorInfo;

import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.util.Collector;
import util.Constants;
import util.JSONDeserializationSchema;

import java.util.HashSet;
import java.util.Properties;

/**
 * 实时车辆分布情况
 * 实时车辆分布情况,是指在一段时间内(比如:10分钟)整个城市中每个区分布多少量车。
 * 这里要注意车辆的去重,因为在10分钟内一定会有很多的车,经过不同的卡口。这些车牌相同的车,我们只统计一次。其实就是根据车牌号去重。
 * 统计结果如下
 * id   区域编号    车的数量      窗口的开始时间   		    窗口的结束时间
 * 1     20           1           20230-06-19 18:30:00         20230-06-19 18:40:00
 * 2     30           2           20230-06-19 18:30:00         20230-06-19 18:40:00
 * 3     10           1           20230-06-19 18:30:00         20230-06-19 18:40:00
 */
public class Test5_RealTimeDistribution {
    public static void main(String[] args) throws Exception {
        //TODO 1.env-准备环境
        StreamExecutionEnvironment env =
                StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
        env.setParallelism(1);
        //TODO 2.source-加载数据
        //从kafka的topic1消费数据
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "hadoop10:9092");
        properties.setProperty("group.id", "flinkgroup3");
        //使用反序列化工具获取JSON字符串内容,将其解析为javaBean对象
        FlinkKafkaConsumer<MonitorInfo> consumer = new FlinkKafkaConsumer<MonitorInfo>("topic-car",
                new JSONDeserializationSchema<>(MonitorInfo.class), properties);
        DataStreamSource<MonitorInfo> ds1 = env.addSource(consumer);
        ds1.print();//MonitorInfo{actionTime = 1686647524, monitorId = 0005, cameraId = 1, car = 豫A99997, speed = 60.0, roadId = 01, areaId = 10, speedLimit = null}
        KeyedStream<MonitorInfo, String> dsKey = ds1.keyBy(v -> v.getAreaId());//获取areaId为key
        WindowedStream<MonitorInfo, String, TimeWindow> window =
                dsKey.window(TumblingProcessingTimeWindows.of(Time.seconds(5)));//设置滚动窗口


        SingleOutputStreamOperator<Tuple4<String, Integer, String, String>> apply = window.apply(new WindowFunction<MonitorInfo, Tuple4<String, Integer, String, String>, String, TimeWindow>() {

            @Override
            public void apply(String s, TimeWindow window, Iterable<MonitorInfo> iterable, Collector<Tuple4<String, Integer, String, String>> collector) throws Exception {
                //将车牌放到集合中去重
                HashSet<String> cars = new HashSet<>();

                for (MonitorInfo car : iterable) {
                    cars.add(car.getCar());
                }
                String start = DateFormatUtils.format(window.getStart(), Constants.D1);
                String end = DateFormatUtils.format(window.getEnd(), Constants.D1);
                collector.collect(Tuple4.of(s, cars.size(), start, end));
            }
        });


        apply.addSink(
                JdbcSink.sink(
                        "insert into t_area_control values (null, ?, ?, ?,? )",
                        (ps, value) -> {
                            ps.setString(1, value.f0);
                            ps.setLong(2, value.f1);
                            ps.setString(3, value.f2);
                            ps.setString(4, value.f3);

                        },
                        JdbcExecutionOptions.builder()
                                .withBatchSize(1)
                                .withBatchIntervalMs(200)
                                .build(),
                        new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                                .withUrl("jdbc:mysql://hadoop10:3306/yangyulin?allowMultiQueries=true&useUnicode=true&characterEncoding=UTF-8&useSSL=false")
                                .withDriverName("com.mysql.jdbc.Driver")
                                .withUsername("root")
                                .withPassword("0000")
                                .build()
                ));


        env.execute();
    }
}

import util.Constants;该工具为解析时间戳工具:

代码语言:javascript复制
package util;

public class Constants {
    public static final String  D1 = "yyyy-MM-dd HH:mm:ss";
}

import util.JSONDeserializationSchema;该工具为解析JSON为JavbaBean对象:

代码语言:javascript复制
package util;

import com.alibaba.fastjson.JSON;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;

import java.io.IOException;

public class JSONDeserializationSchema<T> implements DeserializationSchema<T> {
    private Class<T> clz;

    public JSONDeserializationSchema(Class<T> clz) {
        this.clz = clz;
    }

    @Override
    public T deserialize(byte[] message) throws IOException {
        return JSON.parseObject(new String(message),clz);
    }

    @Override
    public boolean isEndOfStream(T nextElement) {
        return false;
    }

    @Override
    public TypeInformation<T> getProducedType() {
        return TypeInformation.of(clz);
    }

}
实验截图
在这里插入图片描述在这里插入图片描述

通过Kafka发送测试数据

查询MySQL表中结果

扩展内容

解析JSON为Bean对象使用了alibaba的maven依赖工具:

代码语言:javascript复制
    <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>2.0.32</version>
    </dependency>

测试案例:

代码语言:javascript复制
package test;

import bean.MonitorInfo;
import com.alibaba.fastjson.JSON;
//json测试
public class testJson {
    public static void main(String[] args) {
        String s1 = "{"actionTime":1686647524,"monitorId":"0005","cameraId":"1","car":"豫A99997","speed":60,"roadId":"01","areaId":"10"}n";
        MonitorInfo monitorInfo = JSON.parseObject(s1, MonitorInfo.class);
        System.out.println(monitorInfo);
    }
}

0 人点赞