车辆主题
一、背景介绍
车辆主题主要是统计各个网点、区域、公司的发车情况,反映了区域或者公司的吞吐量及运营状况。
二、指标明细
指标列表 | 维度 |
---|---|
发车次数 | 各网点发车次数 |
各区域发车次数 | |
各公司发车次数 | |
最大发车次数 | 各网点最大发车次数 |
各区域最大发车次数 | |
各分公司最大发车次数 | |
最小发车次数 | 各网点最小发车次数 |
各区域最小发车次数 | |
各分公司最小发车次数 | |
平均发车次数 | 各网点平均发车次数 |
各区域平均发车次数 | |
各分公司平均发车次数 |
三、表关联关系
1、事实表
表名 | 描述 |
---|---|
tbl_transport_tool | 车辆事实表 |
tbl_warehouse_transport_tool | 仓库车辆关联表 |
2、维度表
表名 | 描述 |
---|---|
tbl_dot | 网点表 |
tbl_company | 公司表 |
tbl_warehouse | 仓库表 |
tbl_company_warehouse_map | 公司仓库关联表 |
tbl_transport_tool | 车辆表 |
3、关联关系
车辆表与维度表的关联关系如下:
四、车辆数据拉宽开发
1、拉宽后的字段
1.1、拉宽网点车辆表
表 | 字段名 | 别名 | 字段描述 |
---|---|---|---|
tbl_transport_tool | id | id | 运输工具ID |
tbl_transport_tool | brand | brand | 运输工具品牌 |
tbl_transport_tool | model | model | 运输工具型号 |
tbl_transport_tool | type | type | 运输工具类型 |
tbl_codes | codeDesc/ttTypeName | type_name | 车辆类型描述 |
tbl_transport_tool | givenLoad | given_load | 额定载重 |
tbl_transport_tool | loadCnUnit | load_cn_unit | 中文载重单位 |
tbl_transport_tool | loadEnUnit | load_en_unit | 英文载重单位 |
tbl_transport_tool | buyDt | buy_dt | 购买时间 |
tbl_transport_tool | licensePlate | license_plate | 牌照 |
tbl_transport_tool | state | state | 运输工具状态 |
tbl_codes | codeDesc/ttStateName | state_name | 运输工具状态描述 |
tbl_transport_tool | cdt | cdt | 创建时间 |
tbl_transport_tool | udt | udt | 修改时间 |
tbl_transport_tool | remark | remark | 备注 |
tbl_dot | id | dot_id | 网点id |
tbl_dot | dotNumber | dot_number | 网点编号 |
tbl_dot | dotName | dot_name | 网点名称 |
tbl_dot | dotAddr | dot_addr | 网点地址 |
tbl_dot | dotGisAddr | dot_gis_addr | 网点GIS地址 |
tbl_dot | dotTel | dot_tel | 网点电话 |
tbl_dot | manageAreaId | manage_area_id | 网点管理辖区ID |
tbl_dot | manageAreaGis | manage_area_gis | 网点管理辖区地理围栏 |
tbl_company | id | company_id | 公司ID |
tbl_company | companyName | company_name | 公司名称 |
tbl_company | cityId | city_id | 城市ID |
tbl_company | companyNumber | company_number | 公司编号 |
tbl_company | companyAddr | company_addr | 公司地址 |
tbl_company | companyAddrGis | company_addr_gis | 公司gis地址 |
tbl_company | companyTel | company_tel | 公司电话 |
tbl_company | isSubCompany | is_sub_company | 母公司ID |
tbl_transport_tool | yyyyMMdd(cdt) | day | 创建时间年月日格式 |
1.2、拉宽仓库车辆表
表 | 字段名 | 别名 | 字段描述 |
---|---|---|---|
tbl_transport_tool | id | id | 运输工具ID |
tbl_transport_tool | brand | brand | 运输工具品牌 |
tbl_transport_tool | model | model | 运输工具型号 |
tbl_transport_tool | type | type | 运输工具类型 |
tbl_codes | codeDesc/ttTypeName | type_name | 车辆类型描述 |
tbl_transport_tool | givenLoad | given_load | 额定载重 |
tbl_transport_tool | loadCnUnit | load_cn_unit | 中文载重单位 |
tbl_transport_tool | loadEnUnit | load_en_unit | 英文载重单位 |
tbl_transport_tool | buyDt | buy_dt | 购买时间 |
tbl_transport_tool | licensePlate | license_plate | 牌照 |
tbl_transport_tool | state | state | 运输工具状态 |
tbl_transport_tool | cdt | cdt | 创建时间 |
tbl_transport_tool | udt | udt | 修改时间 |
tbl_transport_tool | remark | remark | 备注 |
tbl_warehouse | id | ws_id | 仓库ID |
tbl_warehouse | name | name | 仓库名称 |
tbl_warehouse | addr | addr | 仓库地址 |
tbl_warehouse | addrGis | addr_gis | 仓库gis地址 |
tbl_warehouse | employeeId | employee_id | 仓库负责人 |
tbl_warehouse | type | ws_type | 仓库类型 |
tbl_warehouse | area | area | 占地面积 |
tbl_warehouse | isLease | is_lease | 是否租赁 |
tbl_company | id | company_id | 公司ID |
tbl_company | companyName | company_name | 公司名称 |
tbl_company | cityId | city_id | 城市ID |
tbl_company | companyNumber | company_number | 公司编号 |
tbl_company | companyAddr | company_addr | 公司地址 |
tbl_company | companyAddrGis | company_addr_gis | 公司gis地址 |
tbl_company | companyTel | company_tel | 公司电话 |
tbl_company | isSubCompany | is_sub_company | 母公司ID |
tbl_transport_tool | yyyyMMdd(cdt) | day | 创建时间年月日格式 |
2、SQL语句
2.1、拉宽网点车辆表
代码语言:javascript复制SELECT
ttl."id" ,
ttl."brand" ,
ttl."model" ,
ttl."type" ,
TTL ."given_load" ,
TTL ."load_cn_unit" ,
TTL ."load_en_unit" ,
TTL ."buy_dt" ,
TTL ."license_plate" ,
ttl."state" ,
ttl."cdt" ,
ttl."udt" ,
ttl."remark" ,
dot."id" AS dot_id,
dot."dot_name" ,
dot."dot_number" ,
dot."dot_addr" ,
dot."dot_gis_addr" ,
dot."dot_tel" ,
dot."manage_area_id" ,
dot."manage_area_gis" ,
COMPANY ."id" AS company_id,
COMPANY ."company_name",
COMPANY ."company_number",
COMPANY ."city_id",
COMPANY ."company_addr",
COMPANY ."company_addr_gis",
COMPANY ."company_tel",
COMPANY ."is_sub_company"
FROM "tbl_transport_tool" ttl
LEFT JOIN "tbl_dot_transport_tool" tdtl ON ttl."id" = tdtl."transport_tool_id"
LEFT JOIN "tbl_dot" dot ON DOT ."id" = TDTL ."dot_id"
LEFT JOIN "tbl_company_dot_map" companydot ON companydot."dot_id" = TDTL ."dot_id"
LEFT JOIN "tbl_company" company ON company."id" = companydot."company_id"
2.2、拉宽仓库车辆表
代码语言:javascript复制SELECT
ttl."id" ,
ttl."brand" ,
ttl."model" ,
ttl."type" ,
ttl."given_load" ,
ttl."load_cn_unit" ,
ttl."load_en_unit" ,
ttl."buy_dt" ,
ttl."license_plate" ,
ttl."state" ,
ttl."cdt" ,
ttl."udt" ,
ttl."remark" ,
warehouse."id" ,
warehouse."name",
warehouse."addr",
warehouse."addr_gis",
warehouse."employee_id",
warehouse."type",
warehouse."area",
warehouse."is_lease",
COMPANY ."id" AS company_id,
COMPANY ."company_name",
COMPANY ."company_number",
COMPANY ."city_id",
COMPANY ."company_addr",
COMPANY ."company_addr_gis",
COMPANY ."company_tel",
COMPANY ."is_sub_company"
FROM "tbl_warehouse_transport_tool" twt
LEFT JOIN "tbl_transport_tool" ttl ON twt."transport_tool_id" = ttl."id"
LEFT JOIN "tbl_warehouse" warehouse ON WAREHOUSE ."id" = twt."warehouse_id"
LEFT JOIN "tbl_company_warehouse_map" warehouse_map ON warehouse_map."warehouse_id" = warehouse."id"
LEFT JOIN "tbl_company" company ON company."id" = warehouse_map."company_id"
3、Spark实现
实现步骤:
- 在dwd目录下创建TransportToolDWD 单例对象,继承自OfflineApp特质
- 初始化环境的参数,创建SparkSession对象
- 获取运输工具表(tbl_transport_tool)数据,并缓存数据
- 判断是否是首次运行,如果是首次运行的话,则全量装载数据(含历史数据)
- 获取网点运输工具关联表(tbl_dot_transport_tool)数据,并缓存数据
- 获取网点表(tbl_dot)数据,并缓存数据
- 获取公司网点关联表(tbl_company_dot_map)数据,并缓存数据
- 获取仓库运输工具关联表(tbl_warehouse_transport_tool)数据,并缓存数据
- 获取公司仓库关联表(tbl_company_warehouse_map)数据,并缓存数据
- 获取仓库表(tbl_warehouse)数据,并缓存数据
- 获取公司表(tbl_company)数据,并缓存数据
- 根据以下方式拉宽仓库车辆明细数据
- 根据交通工具id,在交通工具表中获取交通工具数据
- 根据网点id,在网点表中获取网点数据
- 根据公司id,在公司表中获取公司数据
- 根据仓库id,在仓库表中获取仓库数据
- 创建网点车辆明细宽表(若存在则不创建)
- 创建仓库车辆明细宽表(若存在则不创建)
- 将仓库车辆明细宽表数据写入到kudu数据表中
- 删除缓存数据
3.1、初始化环境变量
初始化运单明细拉宽作业的环境变量
代码语言:javascript复制package cn.it.logistics.offline.dwd
import cn.it.logistics.common.{Configuration, SparkUtils}
import cn.it.logistics.offline.OfflineApp
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
/**
* 车辆主题开发
* 将车辆相关的表进行关联拉宽,将拉宽后的数据写入到车辆宽表中
* 1)网点车辆关联表->派送网点所拥有的车辆(三轮车)
* 2)仓库车辆关联表->仓库与仓库之间的运输工具(货车、货机)
*/
object TransportToolDWD extends OfflineApp{
//定义应用的名称
val appName = this.getClass.getSimpleName
/**
* 入口函数
* @param args
*/
def main(args: Array[String]): Unit = {
/**
* 实现步骤:
* 1)初始化sparkConf对象
* 2)创建sparkSession对象
* 3)加载kudu中的事实表和维度表的数据(将加载后的数据进行缓存)
* 4)定义维度表与事实表的关联
* 5)将拉宽后的数据再次写回到kudu数据库中(DWD明细层)
* 5.1:创建车辆明细宽表的schema表结构
* 5.2:创建车辆宽表(判断宽表是否存在,如果不存在则创建)
* 5.3:将数据写入到kudu中
* 6)将缓存的数据删除掉
* 7)停止任务
*/
//1)初始化sparkConf对象
val sparkConf: SparkConf = SparkUtils.autoSettingEnv(
SparkUtils.sparkConf(appName)
)
//2)创建sparkSession对象
val sparkSession: SparkSession = SparkUtils.getSparkSession(sparkConf)
sparkSession.sparkContext.setLogLevel(Configuration.LOG_OFF)
//数据处理
execute(sparkSession)
}
/**
* 数据处理
*
* @param sparkSession
*/
override def execute(sparkSession: SparkSession): Unit = {
sparkSession.stop()
}
}
3.2、加载运输工具表及车辆相关的表并缓存
- 加载运输工具表的时候,需要指定日期条件,因为车辆主题最终需要Azkaban定时调度执行,每天执行一次增量数据,因此需要指定日期。
- 判断是否是首次运行,如果是首次运行的话,则全量装载数据(含历史数据)
//TODO 3)加载kudu中的事实表和维度表的数据(将加载后的数据进行缓存)
//加载车辆表数据(事实表)
val ttDF: DataFrame = getKuduSource(sparkSession, TableMapping.transportTool, Configuration.isFirstRunnable).persist(StorageLevel.DISK_ONLY_2)
//加载仓库车辆表数据
val ttDotDF: DataFrame = getKuduSource(sparkSession, TableMapping.dotTransportTool, true).persist(StorageLevel.DISK_ONLY_2)
//加载网点表的数据
val dotDF: DataFrame = getKuduSource(sparkSession, TableMapping.dot, true).persist(StorageLevel.DISK_ONLY_2)
//加载公司网点关联表的数据
val companyDotDF: DataFrame = getKuduSource(sparkSession, TableMapping.companyDotMap, true).persist(StorageLevel.DISK_ONLY_2)
//加载公司表的数据
val companyDF: DataFrame = getKuduSource(sparkSession, TableMapping.company, true).persist(StorageLevel.DISK_ONLY_2)
//加载仓库车辆关联表数据(事实表)
val ttWsDF: DataFrame = getKuduSource(sparkSession, TableMapping.warehouseTransportTool, Configuration.isFirstRunnable).persist(StorageLevel.DISK_ONLY_2)
//加载仓库公司关联表
val companyWareHouseMapDF: DataFrame = getKuduSource(sparkSession, TableMapping.companyWarehouseMap, true).persist(StorageLevel.DISK_ONLY_2)
//加载仓库表数据
val wsDF: DataFrame = getKuduSource(sparkSession, TableMapping.warehouse, true).persist(StorageLevel.DISK_ONLY_2)
//加载物流码表数据
val codesDF: DataFrame = getKuduSource(sparkSession, TableMapping.codes, true).persist(StorageLevel.DISK_ONLY_2)
import sparkSession.implicits._
//获取运输工具类型
val transportTypeDF: DataFrame = codesDF.where($"type" === CodeTypeMapping.TransportType).select($"code".as("ttType"), $"codeDesc".as("ttTypeName"))
//获取运输工具状态
val transportStatusDF: DataFrame = codesDF.where($"type" === CodeTypeMapping.TransportStatus).select($"code".as("ttStatus"), $"codeDesc".as("ttStateName"))
3.3、定义网点车辆宽表的关联关系
- 为了在DWS层任务中方便的获取每日增量网点车辆表数据(根据日期),因此在DataFrame基础上动态增加列(day),指定日期格式为yyyyMMdd
代码如下:
代码语言:javascript复制//TODO 4)定义维度表与事实表的关联
val left_outer: String = "left_outer"
// 4.1:拉宽网点车辆表
val ttDotDetailDF = ttDotDF.join(ttDF, ttDotDF.col("transportToolId") === ttDF.col("id"), left_outer) //网点车辆表关联车辆表
.join(transportTypeDF, transportTypeDF("ttType") === ttDF("type"), left_outer) //车辆表类型关联字典表类型
.join(transportStatusDF, transportStatusDF("ttStatus") === ttDF("state"), left_outer) //车辆表状态管理字典表状态
.join(dotDF, dotDF.col("id") === ttDotDF.col("dotId"), left_outer) //网点车辆表关联网点
.join(companyDotDF, ttDotDF.col("dotId") === companyDotDF.col("dotId"), left_outer) //网点车辆管连网点公司关联表
.join(companyDF, companyDotDF.col("companyId") === companyDF.col("id"), left_outer) //网点车辆表关联公司表
.withColumn("day", date_format(ttDotDF("cdt"), "yyyyMMdd"))//虚拟列,可以根据这个日期列作为分区字段,可以保证同一天的数据保存在同一个分区中
.sort(ttDF.col("cdt").asc)
.select(
ttDF("id"), //车辆表id
ttDF("brand"), //车辆表brand
ttDF("model"), //车辆表model
ttDF("type").cast(IntegerType), //车辆表type
transportTypeDF("ttTypeName").as("type_name"), // 车辆表type对应字典表车辆类型的具体描述
ttDF("givenLoad").cast(IntegerType).as("given_load"), //车辆表given_load
ttDF("loadCnUnit").as("load_cn_unit"), //车辆表load_cn_unit
ttDF("loadEnUnit").as("load_en_unit"), //车辆表load_en_unit
ttDF("buyDt").as("buy_dt"), //车辆表buy_dt
ttDF("licensePlate").as("license_plate"), //车辆表license_plate
ttDF("state").cast(IntegerType), //车辆表state
transportStatusDF("ttStateName").as("state_name"), // 车辆表state对应字典表类型的具体描述
ttDF("cdt"), //车辆表cdt
ttDF("udt"), //车辆表udt
ttDF("remark"), //车辆表remark
dotDF("id").as("dot_id"), //网点表dot_id
dotDF("dotNumber").as("dot_number"), //网点表dot_number
dotDF("dotName").as("dot_name"), //网点表dot_name
dotDF("dotAddr").as("dot_addr"), //网点表dot_addr
dotDF("dotGisAddr").as("dot_gis_addr"), //网点表dot_gis_addr
dotDF("dotTel").as("dot_tel"), //网点表dot_tel
dotDF("manageAreaId").as("manage_area_id"), //网点表manage_area_id
dotDF("manageAreaGis").as("manage_area_gis"), //网点表manage_area_gis
companyDF("id").alias("company_id"), //公司表id
companyDF("companyName").as("company_name"), //公司表company_name
companyDF("cityId").as("city_id"), //公司表city_id
companyDF("companyNumber").as("company_number"), //公司表company_number
companyDF("companyAddr").as("company_addr"), //公司表company_addr
companyDF("companyAddrGis").as("company_addr_gis"), //公司表company_addr_gis
companyDF("companyTel").as("company_tel"), //公司表company_tel
companyDF("isSubCompany").as("is_sub_company"), //公司表is_sub_company
$"day"
)
3.4、创建网点车辆明细宽表并将网点车辆明细数据写入到kudu数据表中
网点车辆明细宽表数据需要保存到kudu中,因此在第一次执行网点车辆明细拉宽操作时,网点车辆明细宽表是不存在的,因此需要实现自动判断宽表是否存在,如果不存在则创建
实现步骤:
- 在TransportToolDWD 单例对象中调用save方法
实现过程:
- 在TransportToolDWD 单例对象Main方法中调用save方法
//TODO 5)将拉宽后的数据再次写回到kudu数据库中(DWD明细层)
save(ttDotDetailDF, OfflineTableDefine.dotTransportToolDetail)
3.5、定义仓库车辆宽表的关联关系
- 为了在DWS层任务中方便的获取每日增量仓库车辆表数据(根据日期),因此在DataFrame基础上动态增加列(day),指定日期格式为yyyyMMdd
代码如下:
代码语言:javascript复制// 4.2:拉宽仓库车辆表
// 拉宽仓库车辆表
val ttWsDetailDF = ttWsDF.join(ttDF, ttWsDF.col("transportToolId") === ttDF.col("id"), left_outer) //仓库车辆表关联车辆表
.join(transportTypeDF, transportTypeDF("ttType") === ttDF("type"), left_outer) //车辆表类型关联字典表类型
.join(transportStatusDF, transportStatusDF("ttStatus") === ttDF("state"), left_outer) //车辆表状态管理字典表状态
.join(wsDF, wsDF.col("id") === ttWsDF.col("warehouseId"), left_outer) //仓库车辆表关联仓库
.join(companyWareHouseMapDF, ttWsDF.col("warehouseId") === companyWareHouseMapDF.col("warehouseId"), left_outer) //仓库车辆管连仓库公司关联表
.join(companyDF, companyDF.col("id") === companyWareHouseMapDF.col("companyId"), left_outer)
.withColumn("day", date_format(ttWsDF("cdt"), "yyyyMMdd"))//虚拟列,可以根据这个日期列作为分区字段,可以保证同一天的数据保存在同一个分区中
.sort(ttDF.col("cdt").asc)
.select(
ttDF("id"), //车辆表id
ttDF("brand"), //车辆表brand
ttDF("model"), //车辆表model
ttDF("type").cast(IntegerType), //车辆表type
transportTypeDF("ttTypeName").as("type_name"), // 车辆表type对应字典表车辆类型的具体描述
ttDF("givenLoad").as("given_load").cast(IntegerType), //车辆表given_load
ttDF("loadCnUnit").as("load_cn_unit"), //车辆表load_cn_unit
ttDF("loadEnUnit").as("load_en_unit"), //车辆表load_en_unit
ttDF("buyDt").as("buy_dt"), //车辆表buy_dt
ttDF("licensePlate").as("license_plate"), //车辆表license_plate
ttDF("state").cast(IntegerType), //车辆表state
transportStatusDF("ttStateName").as("state_name"), // 车辆表type对应字典表车辆类型的具体描述
ttDF("cdt"), //车辆表cdt
ttDF("udt"), //车辆表udt
ttDF("remark"), //车辆表remark
wsDF("id").as("ws_id"), //仓库表id
wsDF("name"), //仓库表name
wsDF("addr"), //仓库表addr
wsDF("addrGis").as("addr_gis"), //仓库表addr_gis
wsDF("employeeId").as("employee_id"), //仓库表employee_id
wsDF("type").as("ws_type").cast(IntegerType), //仓库表type
wsDF("area"), //仓库表area
wsDF("isLease").as("is_lease").cast(IntegerType), //仓库表is_lease
companyDF("id").alias("company_id"), //公司表id
companyDF("companyName").as("company_name"), //公司表company_name
companyDF("cityId").as("city_id"), //公司表city_id
companyDF("companyNumber").as("company_number"), //公司表company_number
companyDF("companyAddr").as("company_addr"), //公司表company_addr
companyDF("companyAddrGis").as("company_addr_gis"), //公司表company_addr_gis
companyDF("companyTel").as("company_tel"), //公司表company_tel
companyDF("isSubCompany").as("is_sub_company"), //公司表is_sub_company
$"day"
)
3.6、创建仓库车辆明细宽表并将仓库车辆明细数据写入到kudu数据表中
仓库车辆明细宽表数据需要保存到kudu中,因此在第一次执行仓库车辆明细拉宽操作时,仓库车辆明细宽表是不存在的,因此需要实现自动判断宽表是否存在,如果不存在则创建
实现步骤:
- 在TransportToolDWD 单例对象中调用save方法
实现过程:
- 在TransportToolDWD 单例对象Main方法中调用save方法
save(ttWsDetailDF, OfflineTableDefine.warehouseTransportToolDetail)
3.7、删除缓存数据
为了释放资源,车辆明细宽表数据计算完成以后,需要将缓存的源表数据删除。
代码语言:javascript复制//TODO 6)将缓存的数据删除掉
ttDF.unpersist()
ttDotDF.unpersist()
dotDF.unpersist()
companyDotDF.unpersist()
companyDF.unpersist()
ttWsDF.unpersist()
companyWareHouseMapDF.unpersist()
wsDF.unpersist()
codesDF.unpersist()
3.8、完整代码
代码语言:javascript复制package cn.it.logistics.offline.dwd
import cn.it.logistics.common.{CodeTypeMapping, Configuration, OfflineTableDefine, SparkUtils, TableMapping}
import cn.it.logistics.offline.OfflineApp
import org.apache.spark.SparkConf
import org.apache.spark.sql.functions.date_format
import org.apache.spark.sql.types.IntegerType
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.storage.StorageLevel
/**
* 车辆主题开发
* 将车辆相关的表进行关联拉宽,将拉宽后的数据写入到车辆宽表中
* 1)网点车辆关联表->派送网点所拥有的车辆(三轮车)
* 2)仓库车辆关联表->仓库与仓库之间的运输工具(货车、货机)
*/
object TransportToolDWD extends OfflineApp{
//定义应用的名称
val appName = this.getClass.getSimpleName
/**
* 入口函数
* @param args
*/
def main(args: Array[String]): Unit = {
/**
* 实现步骤:
* 1)初始化sparkConf对象
* 2)创建sparkSession对象
* 3)加载kudu中的事实表和维度表的数据(将加载后的数据进行缓存)
* 4)定义维度表与事实表的关联
* 5)将拉宽后的数据再次写回到kudu数据库中(DWD明细层)
* 5.1:创建车辆明细宽表的schema表结构
* 5.2:创建车辆宽表(判断宽表是否存在,如果不存在则创建)
* 5.3:将数据写入到kudu中
* 6)将缓存的数据删除掉
* 7)停止任务
*/
//1)初始化sparkConf对象
val sparkConf: SparkConf = SparkUtils.autoSettingEnv(
SparkUtils.sparkConf(appName)
)
//2)创建sparkSession对象
val sparkSession: SparkSession = SparkUtils.getSparkSession(sparkConf)
sparkSession.sparkContext.setLogLevel(Configuration.LOG_OFF)
//数据处理
execute(sparkSession)
}
/**
* 数据处理
*
* @param sparkSession
*/
override def execute(sparkSession: SparkSession): Unit = {
//TODO 3)加载kudu中的事实表和维度表的数据(将加载后的数据进行缓存)
//加载车辆表数据(事实表)
val ttDF: DataFrame = getKuduSource(sparkSession, TableMapping.transportTool, Configuration.isFirstRunnable).persist(StorageLevel.DISK_ONLY_2)
//加载仓库车辆表数据
val ttDotDF: DataFrame = getKuduSource(sparkSession, TableMapping.dotTransportTool, true).persist(StorageLevel.DISK_ONLY_2)
//加载网点表的数据
val dotDF: DataFrame = getKuduSource(sparkSession, TableMapping.dot, true).persist(StorageLevel.DISK_ONLY_2)
//加载公司网点关联表的数据
val companyDotDF: DataFrame = getKuduSource(sparkSession, TableMapping.companyDotMap, true).persist(StorageLevel.DISK_ONLY_2)
//加载公司表的数据
val companyDF: DataFrame = getKuduSource(sparkSession, TableMapping.company, true).persist(StorageLevel.DISK_ONLY_2)
//加载仓库车辆关联表数据(事实表)
val ttWsDF: DataFrame = getKuduSource(sparkSession, TableMapping.warehouseTransportTool, Configuration.isFirstRunnable).persist(StorageLevel.DISK_ONLY_2)
//加载仓库公司关联表
val companyWareHouseMapDF: DataFrame = getKuduSource(sparkSession, TableMapping.companyWarehouseMap, true).persist(StorageLevel.DISK_ONLY_2)
//加载仓库表数据
val wsDF: DataFrame = getKuduSource(sparkSession, TableMapping.warehouse, true).persist(StorageLevel.DISK_ONLY_2)
//加载物流码表数据
val codesDF: DataFrame = getKuduSource(sparkSession, TableMapping.codes, true).persist(StorageLevel.DISK_ONLY_2)
import sparkSession.implicits._
//获取运输工具类型
val transportTypeDF: DataFrame = codesDF.where($"type" === CodeTypeMapping.TransportType).select($"code".as("ttType"), $"codeDesc".as("ttTypeName"))
//获取运输工具状态
val transportStatusDF: DataFrame = codesDF.where($"type" === CodeTypeMapping.TransportStatus).select($"code".as("ttStatus"), $"codeDesc".as("ttStateName"))
//TODO 4)定义维度表与事实表的关联
val left_outer: String = "left_outer"
// 4.1:拉宽网点车辆表
val ttDotDetailDF = ttDotDF.join(ttDF, ttDotDF.col("transportToolId") === ttDF.col("id"), left_outer) //网点车辆表关联车辆表
.join(transportTypeDF, transportTypeDF("ttType") === ttDF("type"), left_outer) //车辆表类型关联字典表类型
.join(transportStatusDF, transportStatusDF("ttStatus") === ttDF("state"), left_outer) //车辆表状态管理字典表状态
.join(dotDF, dotDF.col("id") === ttDotDF.col("dotId"), left_outer) //网点车辆表关联网点
.join(companyDotDF, ttDotDF.col("dotId") === companyDotDF.col("dotId"), left_outer) //网点车辆管连网点公司关联表
.join(companyDF, companyDotDF.col("companyId") === companyDF.col("id"), left_outer) //网点车辆表关联公司表
.withColumn("day", date_format(ttDotDF("cdt"), "yyyyMMdd"))//虚拟列,可以根据这个日期列作为分区字段,可以保证同一天的数据保存在同一个分区中
.sort(ttDF.col("cdt").asc)
.select(
ttDF("id"), //车辆表id
ttDF("brand"), //车辆表brand
ttDF("model"), //车辆表model
ttDF("type").cast(IntegerType), //车辆表type
transportTypeDF("ttTypeName").as("type_name"), // 车辆表type对应字典表车辆类型的具体描述
ttDF("givenLoad").cast(IntegerType).as("given_load"), //车辆表given_load
ttDF("loadCnUnit").as("load_cn_unit"), //车辆表load_cn_unit
ttDF("loadEnUnit").as("load_en_unit"), //车辆表load_en_unit
ttDF("buyDt").as("buy_dt"), //车辆表buy_dt
ttDF("licensePlate").as("license_plate"), //车辆表license_plate
ttDF("state").cast(IntegerType), //车辆表state
transportStatusDF("ttStateName").as("state_name"), // 车辆表state对应字典表类型的具体描述
ttDF("cdt"), //车辆表cdt
ttDF("udt"), //车辆表udt
ttDF("remark"), //车辆表remark
dotDF("id").as("dot_id"), //网点表dot_id
dotDF("dotNumber").as("dot_number"), //网点表dot_number
dotDF("dotName").as("dot_name"), //网点表dot_name
dotDF("dotAddr").as("dot_addr"), //网点表dot_addr
dotDF("dotGisAddr").as("dot_gis_addr"), //网点表dot_gis_addr
dotDF("dotTel").as("dot_tel"), //网点表dot_tel
dotDF("manageAreaId").as("manage_area_id"), //网点表manage_area_id
dotDF("manageAreaGis").as("manage_area_gis"), //网点表manage_area_gis
companyDF("id").alias("company_id"), //公司表id
companyDF("companyName").as("company_name"), //公司表company_name
companyDF("cityId").as("city_id"), //公司表city_id
companyDF("companyNumber").as("company_number"), //公司表company_number
companyDF("companyAddr").as("company_addr"), //公司表company_addr
companyDF("companyAddrGis").as("company_addr_gis"), //公司表company_addr_gis
companyDF("companyTel").as("company_tel"), //公司表company_tel
companyDF("isSubCompany").as("is_sub_company"), //公司表is_sub_company
$"day"
)
// 4.2:拉宽仓库车辆表
// 拉宽仓库车辆表
val ttWsDetailDF = ttWsDF.join(ttDF, ttWsDF.col("transportToolId") === ttDF.col("id"), left_outer) //仓库车辆表关联车辆表
.join(transportTypeDF, transportTypeDF("ttType") === ttDF("type"), left_outer) //车辆表类型关联字典表类型
.join(transportStatusDF, transportStatusDF("ttStatus") === ttDF("state"), left_outer) //车辆表状态管理字典表状态
.join(wsDF, wsDF.col("id") === ttWsDF.col("warehouseId"), left_outer) //仓库车辆表关联仓库
.join(companyWareHouseMapDF, ttWsDF.col("warehouseId") === companyWareHouseMapDF.col("warehouseId"), left_outer) //仓库车辆管连仓库公司关联表
.join(companyDF, companyDF.col("id") === companyWareHouseMapDF.col("companyId"), left_outer)
.withColumn("day", date_format(ttWsDF("cdt"), "yyyyMMdd"))//虚拟列,可以根据这个日期列作为分区字段,可以保证同一天的数据保存在同一个分区中
.sort(ttDF.col("cdt").asc)
.select(
ttDF("id"), //车辆表id
ttDF("brand"), //车辆表brand
ttDF("model"), //车辆表model
ttDF("type").cast(IntegerType), //车辆表type
transportTypeDF("ttTypeName").as("type_name"), // 车辆表type对应字典表车辆类型的具体描述
ttDF("givenLoad").as("given_load").cast(IntegerType), //车辆表given_load
ttDF("loadCnUnit").as("load_cn_unit"), //车辆表load_cn_unit
ttDF("loadEnUnit").as("load_en_unit"), //车辆表load_en_unit
ttDF("buyDt").as("buy_dt"), //车辆表buy_dt
ttDF("licensePlate").as("license_plate"), //车辆表license_plate
ttDF("state").cast(IntegerType), //车辆表state
transportStatusDF("ttStateName").as("state_name"), // 车辆表type对应字典表车辆类型的具体描述
ttDF("cdt"), //车辆表cdt
ttDF("udt"), //车辆表udt
ttDF("remark"), //车辆表remark
wsDF("id").as("ws_id"), //仓库表id
wsDF("name"), //仓库表name
wsDF("addr"), //仓库表addr
wsDF("addrGis").as("addr_gis"), //仓库表addr_gis
wsDF("employeeId").as("employee_id"), //仓库表employee_id
wsDF("type").as("ws_type").cast(IntegerType), //仓库表type
wsDF("area"), //仓库表area
wsDF("isLease").as("is_lease").cast(IntegerType), //仓库表is_lease
companyDF("id").alias("company_id"), //公司表id
companyDF("companyName").as("company_name"), //公司表company_name
companyDF("cityId").as("city_id"), //公司表city_id
companyDF("companyNumber").as("company_number"), //公司表company_number
companyDF("companyAddr").as("company_addr"), //公司表company_addr
companyDF("companyAddrGis").as("company_addr_gis"), //公司表company_addr_gis
companyDF("companyTel").as("company_tel"), //公司表company_tel
companyDF("isSubCompany").as("is_sub_company"), //公司表is_sub_company
$"day"
)
//TODO 5)将拉宽后的数据再次写回到kudu数据库中(DWD明细层)
save(ttDotDetailDF, OfflineTableDefine.dotTransportToolDetail)
save(ttWsDetailDF, OfflineTableDefine.warehouseTransportToolDetail)
//TODO 6)将缓存的数据删除掉
ttDF.unpersist()
ttDotDF.unpersist()
dotDF.unpersist()
companyDotDF.unpersist()
companyDF.unpersist()
ttWsDF.unpersist()
companyWareHouseMapDF.unpersist()
wsDF.unpersist()
codesDF.unpersist()
sparkSession.stop()
}
}
五、车辆数据指标开发
1、计算的字段
字段名 | 字段描述 |
---|---|
id | 数据产生时间 |
ttDotTotalCount | 网点总发车次数 |
maxTtDotTotalCount | 各网点最大发车次数 |
minTtDotTotalCount | 各网点最小发车次数 |
avgTtDotTotalCount | 各网点平均发车次数 |
areaDotTotalCount | 区域总发车次数 |
maxAreaDotTotalCount | 各区域最大发车次数 |
minAreaDotTotalCount | 各区域最小发车次数 |
avgAreaDotTotalCount | 各区域平均发车次数 |
companyDotTotalCount | 公司总发车次数 |
maxCompanyDotTotalCount | 各公司最大发车次数 |
minCompanyDotTotalCount | 各公司最小发车次数 |
avgCompanyDotTotalCount | 各公司平均发车次数 |
2、Spark实现
实现步骤:
- 在dws目录下创建TransportToolDWS 单例对象,继承自OfflineApp特质
- 初始化环境的参数,创建SparkSession对象
- 根据指定的日期获取拉宽后的车辆主题宽表(tbl_dot_transport_tool_detail、tbl_dot_transport_tool_detail)增量数据,并缓存数据
- 判断是否是首次运行,如果是首次运行的话,则全量装载数据(含历史数据)
- 指标计算
- 各网点发车次数
- 各网点最大发车次数
- 各网点最小发车次数
- 各网点平均发车次数
- 各区域发车次数
- 各区域最大发车次数
- 各区域最小发车次数
- 各区域平均发车次数
- 各公司发车次数
- 各公司最大发车次数
- 各公司最小发车次数
- 各公司平均发车次数
- 获取当前时间yyyyMMddHH
- 构建要持久化的指标数据(需要判断计算的指标是否有值,若没有需要赋值默认值)
- 通过StructType构建指定Schema
- 创建车辆主题指标数据表(若存在则不创建)
- 持久化指标数据到kudu表
2.1、初始化环境变量
代码语言:javascript复制package cn.it.logistics.offline.dws
import cn.it.logistics.common.{Configuration, SparkUtils}
import cn.it.logistics.offline.OfflineApp
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
/**
* 车辆主题指标开发
*/
object TransportToolDWS extends OfflineApp{
//定义应用程序的名称
val appName = this.getClass.getSimpleName
/**
* 入口函数
* @param args
*/
def main(args: Array[String]): Unit = {
/**
* 实现步骤:
* 1)创建SparkConf对象
* 2)创建SparkSession对象
* 3)读取车辆明细宽表的数据
* 4)对车辆明细宽表的数据进行指标的计算
* 5)将计算好的指标数据写入到kudu数据库中
* 5.1:定义指标结果表的schema信息
* 5.2:组织需要写入到kudu表的数据
* 5.3:判断指标结果表是否存在,如果不存在则创建
* 5.4:将数据写入到kudu表中
* 6)删除缓存数据
* 7)停止任务,退出sparksession
*/
//TODO 1)创建SparkConf对象
val sparkConf: SparkConf = SparkUtils.autoSettingEnv(
SparkUtils.sparkConf(appName)
)
//TODO 2)创建SparkSession对象
val sparkSession: SparkSession = SparkUtils.getSparkSession(sparkConf)
sparkSession.sparkContext.setLogLevel(Configuration.LOG_OFF)
//处理数据
execute(sparkSession)
}
/**
* 数据处理
*
* @param sparkSession
*/
override def execute(sparkSession: SparkSession): Unit = {
sparkSession.stop()
}
}
2.2、加载车辆宽表增量数据并缓存
加载车辆宽表的时候,需要指定日期条件,因为车辆主题最终需要Azkaban定时调度执行,每天执行一次增量数据,因此需要指定日期。
代码语言:javascript复制//TODO 3)读取车辆明细宽表的数据
val ttDotDetailDF: DataFrame = getKuduSource(sparkSession, OfflineTableDefine.dotTransportToolDetail, Configuration.isFirstRunnable).persist(StorageLevel.DISK_ONLY_2)
val ttWarehouseDetailDF: DataFrame = getKuduSource(sparkSession, OfflineTableDefine.warehouseTransportToolDetail, Configuration.isFirstRunnable).persist(StorageLevel.DISK_ONLY_2)
2.3、指标计算
代码语言:javascript复制//根据网点车辆的日期进行分组
val ttDotDetailGroupByDayDF: DataFrame = ttDotDetailDF.select("day").groupBy("day").count().cache()
//导入隐式转换
import sparkSession.implicits._
//定义计算好的指标结果集合对象
val dotRows: ArrayBuffer[Row] = ArrayBuffer[Row]()
//循环遍历网点车辆每个日期的车辆明细宽表数据
ttDotDetailGroupByDayDF.collect().foreach(row=> {
//获取到要处理的数据所在的日期
val day: String = row.getAs[String](0)
//返回指定日期的仓库明细数据
val ttDotDetailByDayDF: DataFrame = ttDotDetailDF.where(col("day") === day).toDF().persist(StorageLevel.DISK_ONLY_2)
//各网点的发车次数(西三旗:10,西二旗:20)
val ttDotTotalCountDF: DataFrame = ttDotDetailByDayDF.groupBy($"dot_id").agg(count("dot_id").alias("cnt")).persist(StorageLevel.DISK_ONLY_2)
//计算各网点的总发车次数
val ttDotTotalCount: Row = ttDotTotalCountDF.agg(sum("cnt")).first()
//各网点的最大发车次数、最小发车次数、平均发车次数
val maxAndMinAndAvgTTDotTotalCount: Row = ttDotTotalCountDF.agg(max("cnt"), min("cnt"), avg("cnt")).first()
val maxTTDotTotalCount: Any = maxAndMinAndAvgTTDotTotalCount(0)
val minTTDotTotalCount: Any = maxAndMinAndAvgTTDotTotalCount(1)
val avgTTDotTotalCount: Any = maxAndMinAndAvgTTDotTotalCount(2)
// 各区域发车次数
val areaDotTotalCountDF = ttDotDetailByDayDF.groupBy("city_id").agg(count($"id").alias("cnt")).persist(StorageLevel.DISK_ONLY_2)
val areaDotTotalCount = areaDotTotalCountDF.agg(sum($"cnt")).first()
// 各区域最大发车次数、最小发车次数和平均发车次数
val maxAndMinAndAvgAreaDotTotalCount = areaDotTotalCountDF.agg(max($"cnt"), min($"cnt"), avg($"cnt")).first()
val maxAreaDotTotalCount = maxAndMinAndAvgAreaDotTotalCount(0)
val minAreaDotTotalCount = maxAndMinAndAvgAreaDotTotalCount(1)
val avgAreaDotTotalCount = maxAndMinAndAvgAreaDotTotalCount(2)
// 各公司发车次数
val companyDotTotalCountDF = ttDotDetailByDayDF.groupBy("company_id").agg(count($"id").alias("cnt")).persist(StorageLevel.DISK_ONLY_2)
val companyDotTotalCount = companyDotTotalCountDF.agg(sum($"cnt")).first()
// 各公司最大发车次数、最小发车次数和平均发车次数
val maxAndMinAndAvgCompanyDotTotalCount = companyDotTotalCountDF.agg(max($"cnt"), min($"cnt"), avg($"cnt")).first()
val maxCompanyDotTotalCount = maxAndMinAndAvgCompanyDotTotalCount(0)
val minCompanyDotTotalCount = maxAndMinAndAvgCompanyDotTotalCount(1)
val avgCompanyDotTotalCount = maxAndMinAndAvgCompanyDotTotalCount(2)
//将计算好的指标数据封装到row对象中
val rowInfo: Row = Row(
day,
if(ttDotTotalCount.isNullAt(0)) 0L else ttDotTotalCount.get(0).asInstanceOf[Number].longValue(),
if(maxTTDotTotalCount==null) 0L else maxTTDotTotalCount.asInstanceOf[Number].longValue(),
if(minTTDotTotalCount==null) 0L else minTTDotTotalCount.asInstanceOf[Number].longValue(),
if(avgTTDotTotalCount==null) 0L else avgTTDotTotalCount.asInstanceOf[Number].longValue(),
if(areaDotTotalCount.isNullAt(0)) 0L else areaDotTotalCount.get(0).asInstanceOf[Number].longValue(),
if(maxAreaDotTotalCount==null) 0L else maxAreaDotTotalCount.asInstanceOf[Number].longValue(),
if(minAreaDotTotalCount==null) 0L else minAreaDotTotalCount.asInstanceOf[Number].longValue(),
if(avgAreaDotTotalCount==null) 0L else avgAreaDotTotalCount.asInstanceOf[Number].longValue(),
if(companyDotTotalCount.isNullAt(0)) 0L else ttDotTotalCount.get(0).asInstanceOf[Number].longValue(),
if(maxCompanyDotTotalCount==null) 0L else maxCompanyDotTotalCount.asInstanceOf[Number].longValue(),
if(minCompanyDotTotalCount==null) 0L else minCompanyDotTotalCount.asInstanceOf[Number].longValue(),
if(avgCompanyDotTotalCount==null) 0L else avgCompanyDotTotalCount.asInstanceOf[Number].longValue()
)
dotRows.append(rowInfo)
println(rowInfo)
ttDotDetailByDayDF.unpersist()
ttDotTotalCountDF.unpersist()
areaDotTotalCountDF.unpersist()
companyDotTotalCountDF.unpersist()
})
//根据仓库车辆的日期进行分组
val ttWsDetailGroupByDayDF: DataFrame = ttWarehouseDetailDF.select("day").groupBy("day").count().cache()
//定义计算好的指标结果集合对象
val wsRows: ArrayBuffer[Row] = ArrayBuffer[Row]()
//循环遍历仓库车辆每个日期的车辆明细宽表数据
ttWsDetailGroupByDayDF.collect().foreach(row=> {
//获取到要处理的数据所在的日期
val day: String = row.getAs[String](0)
//返回指定日期的仓库明细数据
val ttWsDetailByDayDF: DataFrame = ttWarehouseDetailDF.where(col("day") === day).toDF().persist(StorageLevel.DISK_ONLY_2)
val whTransportToolTotalCountDF: DataFrame = ttWsDetailByDayDF.groupBy($"ws_id").agg(count("ws_id").alias("cnt")).persist(StorageLevel.DISK_ONLY_2)
//计算各仓库的总发车次数
val whTransportToolTotalCount: Row = whTransportToolTotalCountDF.agg(sum("cnt")).first()
//各仓库的最大发车次数、最小发车次数、平均发车次数
val maxAndMinAndAvgWhTransportToolTotalCount: Row = whTransportToolTotalCountDF.agg(max("cnt"), min("cnt"), avg("cnt")).first()
val maxTransportToolCount: Any = maxAndMinAndAvgWhTransportToolTotalCount(0)
val minTransportToolCount: Any = maxAndMinAndAvgWhTransportToolTotalCount(1)
val avgTransportToolCount: Any = maxAndMinAndAvgWhTransportToolTotalCount(2)
// 各区域发车次数
val areaTransportToolTotalCountDF = ttWsDetailByDayDF.groupBy("city_id").agg(count($"id").alias("cnt")).persist(StorageLevel.DISK_ONLY_2)
val areaTransportToolTotalCount = areaTransportToolTotalCountDF.agg(sum($"cnt")).first()
// 各区域最大发车次数、最小发车次数和平均发车次数
val maxAndMinAndAvgAreaTransportToolTotalCount = areaTransportToolTotalCountDF.agg(max($"cnt"), min($"cnt"), avg($"cnt")).first()
val maxAreaTransportToolTotalCount: Any = maxAndMinAndAvgAreaTransportToolTotalCount(0)
val minAreaTransportToolTotalCount = maxAndMinAndAvgAreaTransportToolTotalCount(1)
val avgAreaTransportToolTotalCount = maxAndMinAndAvgAreaTransportToolTotalCount(2)
// 各公司发车次数
val companyTransportToolTotalCountDF = ttWsDetailByDayDF.groupBy("company_id").agg(count($"id").alias("cnt")).persist(StorageLevel.DISK_ONLY_2)
val companyTransportToolTotalCount = companyTransportToolTotalCountDF.agg(sum($"cnt")).first()
// 各公司最大发车次数、最小发车次数和平均发车次数
val maxAndMinAndAvgCompanyTransportToolTotalCount = companyTransportToolTotalCountDF.agg(max($"cnt"), min($"cnt"), avg($"cnt")).first()
val maxCompanyTransportToolTotalCount = maxAndMinAndAvgCompanyTransportToolTotalCount(0)
val minCompanyTransportToolTotalCount = maxAndMinAndAvgCompanyTransportToolTotalCount(1)
val avgCompanyTransportToolTotalCount = maxAndMinAndAvgCompanyTransportToolTotalCount(2)
//将计算好的指标数据封装到row对象中
val rowInfo: Row = Row(
day,
if(whTransportToolTotalCount.isNullAt(0)) 0L else whTransportToolTotalCount.get(0).asInstanceOf[Number].longValue(),
if(maxTransportToolCount==null) 0L else maxTransportToolCount.asInstanceOf[Number].longValue(),
if(minTransportToolCount==null) 0L else minTransportToolCount.asInstanceOf[Number].longValue(),
if(avgTransportToolCount==null) 0L else avgTransportToolCount.asInstanceOf[Number].longValue(),
if(areaTransportToolTotalCount.isNullAt(0)) 0L else areaTransportToolTotalCount.get(0).asInstanceOf[Number].longValue(),
if(maxAreaTransportToolTotalCount==null) 0L else maxAreaTransportToolTotalCount.asInstanceOf[Number].longValue(),
if(minAreaTransportToolTotalCount==null) 0L else minAreaTransportToolTotalCount.asInstanceOf[Number].longValue(),
if(avgAreaTransportToolTotalCount==null) 0L else avgAreaTransportToolTotalCount.asInstanceOf[Number].longValue(),
if(companyTransportToolTotalCount.isNullAt(0)) 0L else companyTransportToolTotalCount.get(0).asInstanceOf[Number].longValue(),
if(maxCompanyTransportToolTotalCount==null) 0L else maxCompanyTransportToolTotalCount.asInstanceOf[Number].longValue(),
if(minCompanyTransportToolTotalCount==null) 0L else minCompanyTransportToolTotalCount.asInstanceOf[Number].longValue(),
if(avgCompanyTransportToolTotalCount==null) 0L else avgCompanyTransportToolTotalCount.asInstanceOf[Number].longValue()
)
wsRows.append(rowInfo)
println(rowInfo)
ttWsDetailByDayDF.unpersist()
areaTransportToolTotalCountDF.unpersist()
companyTransportToolTotalCountDF.unpersist()
whTransportToolTotalCountDF.unpersist()
})
2.4、通过StructType构建指定Schema
代码语言:javascript复制//定义指标结果表的shema信息
//网点车辆相关的表结构数据
val schemaDot: StructType = StructType(Array(
StructField("id", StringType, false, Metadata.empty),
StructField("ttDotTotalCount", LongType, true, Metadata.empty), //各网点发车次数
StructField("maxTtDotTotalCount", LongType, true, Metadata.empty), //各网点最大发车次数
StructField("minTtDotTotalCount", LongType, true, Metadata.empty), //各网点最小发车次数
StructField("avgTtDotTotalCount", LongType, true, Metadata.empty), //各网点平均发车次数
StructField("areaDotTotalCount", LongType, true, Metadata.empty), //各区域发车次数
StructField("maxAreaDotTotalCount", LongType, true, Metadata.empty), //各区域最大发车次数
StructField("minAreaDotTotalCount", LongType, true, Metadata.empty), //各区域最小发车次数
StructField("avgAreaDotTotalCount", LongType, true, Metadata.empty), //各区域平均发车次数
StructField("companyDotTotalCount", LongType, true, Metadata.empty), //各公司发车次数
StructField("maxCompanyDotTotalCount", LongType, true, Metadata.empty), //各公司最大发车次数
StructField("minCompanyDotTotalCount", LongType, true, Metadata.empty), //各公司最小发车次数
StructField("avgCompanyDotTotalCount", LongType, true, Metadata.empty) //各公司平均发车次数
))
//仓库车辆相关的表结构数据
val schemaWs: StructType = StructType(Array(
StructField("id", StringType, false, Metadata.empty),
StructField("whTransportToolTotalCount", LongType, true, Metadata.empty), //各仓库发车次数
StructField("maxWhTransportToolTotalCount", LongType, true, Metadata.empty), //各仓库最大发车次数
StructField("minWhTransportToolTotalCount", LongType, true, Metadata.empty), //各仓库最小发车次数
StructField("avgWhTransportToolTotalCount", LongType, true, Metadata.empty), //各仓库平均发车次数
StructField("areaTransportToolTotalCount", LongType, true, Metadata.empty), //各区域发车次数
StructField("maxAreaTransportToolTotalCount", LongType, true, Metadata.empty), //各区域最大发车次数
StructField("minAreaTransportToolTotalCount", LongType, true, Metadata.empty), //各区域最小发车次数
StructField("avgAreaTransportToolTotalCount", LongType, true, Metadata.empty), //各区域平均发车次数
StructField("companyTransportToolTotalCount", LongType, true, Metadata.empty), //各公司发车次数
StructField("maxCompanyTransportToolTotalCount", LongType, true, Metadata.empty), //各公司最大发车次数
StructField("minCompanyTransportToolTotalCount", LongType, true, Metadata.empty), //各公司最小发车次数
StructField("avgCompanyTransportToolTotalCount", LongType, true, Metadata.empty) //各公司平均发车次数
))
2.5、持久化指标数据到kudu表
代码语言:javascript复制//TODO 5)将计算好的指标数据写入到kudu数据库中
val dotRDD: RDD[Row] = sparkSession.sparkContext.makeRDD(dotRows)
val wsRDD: RDD[Row] = sparkSession.sparkContext.makeRDD(wsRows)
val dotDataFrame: DataFrame = sparkSession.createDataFrame(dotRDD, schemaDot)
val wsDataFrame: DataFrame = sparkSession.createDataFrame(wsRDD, schemaWs)
save(dotDataFrame, OfflineTableDefine.ttDotSummary)
save(wsDataFrame, OfflineTableDefine.ttWsSummary)