文章目录- 驾驶行程分析定义和业务逻辑
- 驾驶行程的概念
- 驾驶行程分析业务的价值,分析什么指标
- 驾驶行程分析流程
- 驾驶行程分析任务设置
- 核心主类创建
- 知识点回顾——水印机制
- 知识点回顾——窗口分配
- 驾驶行程采样入库
- 驾驶行程入库
- 驾驶行程指标分析
- 驾驶行程的概念
- 驾驶行程分析业务的价值,分析什么指标
- 驾驶行程分析流程
- 核心主类创建
- 知识点回顾——水印机制
- 知识点回顾——窗口分配
- 驾驶行程采样入库
- 驾驶行程入库
- 驾驶行程指标分析
驾驶行程分析定义和业务逻辑
驾驶行程的概念
一个完整意义的驾驶的行程,定义 15 分钟作为一个完整的行程,15分钟是停车前最后一条数据和驾驶前第一条数据之间间隔15分钟
驾驶行程分析业务的价值,分析什么指标
单次行驶里程区间分布、单次行程消耗soc区间分布、最大里程分布、充电行程占比、平均行驶里程分布、周行驶里程分布、最大行驶里程分段统计、常用行驶里程、全国-每日平均行驶里程(近4周)、全国-单车日均行驶里程分布(近一年)、各车系单次最大行驶里程分布、不同里程范围内车辆占比情况。
驾驶行程分析流程
1.创建流执行环境 2.获取kafka中的数据 3.将json字符串解析成车辆数据对象 4.过滤出正确的数据并且是行程数据 chargeStatus=2或者chargeStatus=3 0x01:停车充电。0x02:行驶充电。0x03:未充电状态。0x04:充电完成。0xFE:异常。0xFF:无效。 5.分配水印机制,设置最大延迟时间 30s 6.超过3分钟的数据,保存到侧输出流,分析一下数据为什么会延迟 7.对车辆数据进行分组,创建会话窗口 8.数据的采样分析 8.1.应用窗口,数据的采样分析 8.2.将分析的采样数据封装成数组,并将其保存到HBase中 9.数据的行程分析 9.1.应用窗口数据,分析低速、中速、高速车辆的soc、行驶里程、油耗、速度、速度切换的次数等数据封装成对象 9.2.将这个对象保存到hbase中 10.执行流环境任务
驾驶行程分析任务设置
核心主类创建
任务名称—— TripDriveTask
继承 BaseTask
获取流执行环境
读取 Kafka 数据源
数据的实时 ETL 操作,解析、转换,过滤出来正常的数据,过滤出来驾驶行程数据
chargeStatus=2 or chargeStatus=3
创建行程划分window
- 分配水印机制
- 根据 vin 进行分组
- 指定window为 SessionWindow
驾驶行程采样分析入库
驾驶行程划分与入库
代码语言:javascript复制实现步骤:
1:初始化flink流处理的运行环境(checkpoint设置、事件时间处理数据、并行度等等)
2:接入kafka数据源将数据读取到返回(指定topic、集群地址、消费者id等等)
3:将消费到的json字符串转换成itcastDataObj
4:过滤出来驾驶行程相关的数据,行车充电(chargeStatus=2)、未充电(=3)
5:对驾驶行程数据应用水印(允许数据延迟30s)
6:对加了水印的数据进行分组操作,应用窗口操作(session窗口)
6.1:车辆在驾驶行程中如果超过15m没有上报数据,可以认为是上一个行程的结束
7:驾驶行程采样数据的业务开发
7.1:对数据应用自定义窗口的业务逻辑处理(划分四类数据:5s、10s、15s、20s)
7.2:将自定义驾驶行程采样业务处理后的数据写入到hbase中
8:驾驶行程数据的业务开发
8.1:驾驶行程数据应用自定义窗口的业务逻辑处理
8.2:将自定义驾驶行程业务处理后的数据写入到hbase中
9:递交作业
知识点回顾——水印机制
- 水印机制解决数据延迟和乱序问题
知识点回顾——窗口分配
- BoundedOutofTimeExtractor ,抽取出来最大容忍延迟时间,指定事件时间
驾驶行程采样入库
采样逻辑
行程划分水印
根据 vin 进行分组
创建会话窗口
处理行程划分,指定window function
引入 guava 库中的Lists类,创建一个可变的包含给定元素的ArrayList示例
代码语言:javascript复制<dependency>
<groupId>com.google.guavagroupId>
<artifactId>guavaartifactId>
<version>${guava.version}version>
dependency>
引入 window function,处理行程划分采样逻辑—— DriveSampleWindowFunction
代码语言:javascript复制//创建驾驶行程采集数据自定义函数开发类 窗口内数据按照5m,10m,20m维度进行数据的收集和分析,此类继承于RichWindowFunction 抽象类
//1.重写 apply 方法
//1.1 将迭代器转换成集合列表
//1.2 对集合列表的数据进行排序操作
//1.3 首先要获取排序后的第一条数据从而得到周期(5m等)的开始时间
//1.4 采样数据的soc(剩余电量百分比)、mileage(累计里程)、speed(车速)、gps(经度 维度)、terminalTime(终端时间)字段属性需要拼接到一个字符串返回
//soc(剩余电量百分比)
//mileage(累计里程)
//speed(车速)
//gps:地理位置
//terminalTime:终端时间
//1.5 获取排序后的最后一条数据作为当前窗口的最后一条数据
//1.6 获取窗口的第一条数据的终端时间作为开始时间戳
//1.7 获取窗口的最后一条数据的终端时间作为结束时间戳
//1.8 遍历窗口内的每条数据,计算5m采样周期内的数据
//1.9 创建字符串数组类型用于存储采集到的车辆唯一编码,终端时间戳,剩余电量,总里程数,车速,地理gps,终端时间
//1.10 返回数据
驾驶行程采样入库 hbase
创建 HBase 的表空间 : TRIPDB
代码语言:javascript复制create_namespace 'TRIPDB'
list_namespace
创建入采样入库的表 : trip_sample,压缩方式是:snappy
代码语言:javascript复制create "TRIPDB:trip_sample",{ NAME => 'cf', COMPRESSION => 'SNAPPY' }
自定义sink类用于保存采样数据—— TripSampleToHBaseSink
代码语言:javascript复制//将数据保存到 TRIPDB:trip_sample 表
//数组对象生成Put对象
//通过车辆唯一编码 终端时间作为rowkey
//将soc,mileage,speed,gps,terminalTime,processTime=当前日期时间字符串 封装成put
驾驶行程入库
使用之前已经生成的按 vin 分组并分配水印的车辆数据
引入 TripModel 对象,用于接收返回的数据
代码语言:javascript复制import cn.itcast.utils.DateUtil;
import lombok.Data;
/**
* 定义驾驶行程计算结果对应的JavaBean对象
*/
@Data
public class TripModel {
//车架号
private String vin = "";
//上次报文soc
private Double lastSoc = -999999D;
//上次报文里程数
private Double lastMileage = -999999D;
//行程开始时间
private String tripStartTime = "";
//行程开始soc
private int start_BMS_SOC = -999999;
//行程开始经度
private Double start_longitude = -999999D;
//行程开始纬度
private Double start_latitude = -999999D;
//行程开始里程
private Double start_mileage = -999999D;
//结束soc
private int end_BMS_SOC = -999999;
//结束经度
private Double end_longitude = -999999D;
//结束纬度
private Double end_latitude = -999999D;
//结束里程
private Double end_mileage = -999999D;
//行程结束时间
private String tripEndTime = "" ;
//行程里程消耗
private Double mileage = -999999D;
//最高行驶车速
private Double max_speed = 0D;
//soc消耗
private Double soc_comsuption = 0D;
//行程消耗时间(分钟)
private Double time_comsuption = -999999D;
//总低速的个数
private Long total_low_speed_nums = 0L;
//总中速的个数
private Long total_medium_speed_nums = 0L;
//总高速个数
private Long total_high_speed_nums = 0L;
//低速soc消耗
private Double Low_BMS_SOC = 0D;
//中速soc消耗
private Double Medium_BMS_SOC = 0D;
//高速soc消耗
private Double High_BMS_SOC = 0D;
//低速里程
private Double Low_BMS_Mileage = 0D;
//中速里程
private Double Medium_BMS_Mileage = 0D;
//高速里程
private Double High_BMS_Mileage = 0D;
//是否为异常行程 0:正常行程 1:异常行程(只有一个采样点)
private int tripStatus = -999999;
/**
* 将驾驶行程计算结果数据保存到hdfs时候需要转换成可以被hive所识别的字符串格式
* @return
*/
public String toHiveString() {
StringBuilder resultString = new StringBuilder();
if (this.vin != "") resultString.append(this.vin).append("t"); else resultString.append("NULL").append("t");
if (this.tripStartTime != "") resultString.append(this.tripStartTime).append("t"); else resultString.append("NULL").append("t");
if (this.tripEndTime != "") resultString.append(this.tripEndTime).append("t"); else resultString.append("NULL").append("t");
if (this.lastSoc != -999999 ) resultString.append(this.lastSoc).append("t"); else resultString.append("NULL").append("t");
if (this.lastMileage != -999999 ) resultString.append(this.lastMileage).append("t"); else resultString.append("NULL").append("t");
if (this.start_BMS_SOC != -999999 ) resultString.append(this.start_BMS_SOC).append("t"); else resultString.append("NULL").append("t");
if (this.start_longitude != -999999 ) resultString.append(this.start_longitude).append("t"); else resultString.append("NULL").append("t");
if (this.start_latitude != -999999 ) resultString.append(this.start_latitude).append("t"); else resultString.append("NULL").append("t");
if (this.start_mileage != -999999 ) resultString.append(this.start_mileage).append("t"); else resultString.append("NULL").append("t");
if (this.end_BMS_SOC != -999999 ) resultString.append(this.end_BMS_SOC).append("t"); else resultString.append("NULL").append("t");
if (this.end_longitude != -999999 ) resultString.append(this.end_longitude).append("t"); else resultString.append("NULL").append("t");
if (this.end_latitude != -999999 ) resultString.append(this.end_latitude).append("t"); else resultString.append("NULL").append("t");
if (this.end_mileage != -999999 ) resultString.append(this.end_mileage).append("t"); else resultString.append("NULL").append("t");
if (this.mileage != -999999 ) resultString.append(this.mileage).append("t"); else resultString.append("NULL").append("t");
if (this.max_speed != -999999 ) resultString.append(this.max_speed).append("t"); else resultString.append("NULL").append("t");
if (this.soc_comsuption != -999999 ) resultString.append(this.soc_comsuption).append("t"); else resultString.append("NULL").append("t");
if (this.time_comsuption != -999999 ) resultString.append(this.time_comsuption).append("t"); else resultString.append("NULL").append("t");
if (this.total_low_speed_nums != -999999 ) resultString.append(this.total_low_speed_nums).append("t"); else resultString.append("NULL").append("t");
if (this.total_medium_speed_nums != -999999 ) resultString.append(this.total_medium_speed_nums).append("t"); else resultString.append("NULL").append("t");
if (this.total_high_speed_nums != -999999 ) resultString.append(this.total_high_speed_nums).append("t"); else resultString.append("NULL").append("t");
if (this.Low_BMS_SOC != -999999 ) resultString.append(this.Low_BMS_SOC).append("t"); else resultString.append("NULL").append("t");
if (this.Medium_BMS_SOC != -999999 ) resultString.append(this.Medium_BMS_SOC).append("t"); else resultString.append("NULL").append("t");
if (this.High_BMS_SOC != -999999 ) resultString.append(this.High_BMS_SOC).append("t"); else resultString.append("NULL").append("t");
if (this.Low_BMS_Mileage != -999999 ) resultString.append(this.Low_BMS_Mileage).append("t"); else resultString.append("NULL").append("t");
if (this.Medium_BMS_Mileage != -999999 ) resultString.append(this.Medium_BMS_Mileage).append("t"); else resultString.append("NULL").append("t");
if (this.High_BMS_Mileage != -999999 ) resultString.append(this.High_BMS_Mileage).append("t"); else resultString.append("NULL").append("t");
if (this.tripStatus != -999999 ) resultString.append(this.tripStatus).append("t"); else resultString.append("NULL").append("t");
resultString.append(DateUtil.getCurrentDateTime());
return resultString.toString();
}
}
引入 window function,驾驶行程数据的实时计算逻辑—— DriveTripWindowFunction
代码语言:javascript复制//创建驾驶行程窗口内数据的自定义实时计算逻辑,此类继承于RichWindowFunction 抽象类
//1.重写 apply 方法
//1.1 将迭代器转换成集合列表
//1.2 对集合列表的数据进行排序操作
//1.3 将集合对象转换成 TripModel 对象返回
//1.4 将 TripModel 对象收集返回
//2.将驾驶行程指标计算getTripModel,得到 TripModel 对象
//2.1 获取第一条数据
//2.2 将 vin(车架号) ,tripStartTime(行程开始时间),start_BMS_SOC(行程开始Soc),start_longitude(行程开始经度),start_latitude(行程开始维度),start_mileage(行程开始表显里程数) 传递给 tripModel 对象。
//2.3 从最后一条数据中得到对象并赋值给 tripEndTime(行程结束时间),end_BMS_SOC(行程结束soc),end_longitude(行程结束经度),end_latitude(行程结束维度),end_mileage(行程结束表显里程数),mileage(行程驾驶公里数),time_comsuption(行程消耗时间)、这里存储的是分钟数,lastSoc(上次的行程Soc)、将当前行程开始的电量消耗百分比作为上一个行程结束的电量消耗百分比,lastMileage(上次的里程数)
//2.4 遍历 itcastDataObj list获取一下内容
// 每条数据的速度
//获取上次行程报文的soc
//计算每条数据的soc与lastSoc进行比较差值 socDiff
//如果socDiff大于0,那么赋值给 soc_comsuption
//如果speed大于tripModel对象中保存的最大车速并且小于150,将speed保存为最大速度
//低速行驶 speed >=0 && <40,低速行驶个数 1;low_BMS_SOC=low_BMS_SOC (当次油耗和上次油耗之差);setLow_BMS_Mileage=setLow_BMS_Mileage (当次里程和上次里程之差)
//中速行驶 >=40 && <80
//高速行驶 >80 && <150
//设置当前的soc作为 lastSoc 上次的soc 和将当次的里程作为 lastMileage 上次的里程
//增加扩展字段,判断当前是否有异常数据,如果列表长度大于1说明是正常行程0,否则是异常行程1
驾驶行程数据写入到 HBase —— TripDriveToHBaseSink
创建驾驶行程的表 : trip_division ,压缩方式是:snappy
代码语言:javascript复制create "TRIPDB:trip_division",{ NAME => 'cf', COMPRESSION => 'SNAPPY' }
自定义sink类用于保存采样数据——TripDivisionHBaseSink
代码语言:javascript复制//将数据保存到表 TRIPDB:trip_division
//数组对象生成Put对象
//通过车辆唯一编码 行程开始时间作为rowkey
//将车辆行程的字段分别写入进来 ...
驾驶行程指标分析
在 phoenix 创建行程采样视图
代码语言:javascript复制CREATE VIEW TRIPDB."trip_sample" ("rowNum" varchar PRIMARY KEY, "cf"."soc" varchar, "cf"."mileage" varchar, "cf"."speed" varchar, "cf"."gps" varchar, "cf"."terminalTime" varchar, "cf"."processTime" varchar);
在 MySQL中创建表 - t_sample_result 用于前端展示
代码语言:javascript复制create table vehicle_networking.t_sample_result(
id int auto_increment comment '主键' primary key,
name varchar(25) not null comment '采样样本指标名称',
totalNum int not null comment '样本总数',
processTime varchar(25) not null comment '样本总数计算时间'
) comment '采样样本指标统计结果表';
添加依赖,用于编写 phoenix jdbc 读写数据工具类
代码语言:javascript复制<dependency>
<groupId>org.apache.phoenixgroupId>
<artifactId>phoenix-coreartifactId>
<version>${phoenix.version}version>
<exclusions>
<exclusion>
<groupId>org.glassfishgroupId>
<artifactId>javax.elartifactId>
exclusion>
exclusions>
dependency>
导入 PhoenixJDBCUtil.java 工具类
导入 JDBCUtil.java 工具类
导入车辆采样分析的 分析类 TripSamplePhoenixAnalysis .java
指标查询
- 里程、soc、行程消耗时间分析
- 高速、中速、低速soc消耗分析
- 高速、中速、低速里程分析
- 高速、中速、低速车次分析
- 驾驶行程剩余指标业务