客快物流大数据项目(六十六):车辆主题

2022-05-16 11:35:38 浏览数 (1)

车辆主题

一、背景介绍

车辆主题主要是统计各个网点、区域、公司的发车情况,反映了区域或者公司的吞吐量及运营状况。

二、指标明细

指标列表

维度

发车次数

各网点发车次数

各区域发车次数

各公司发车次数

最大发车次数

各网点最大发车次数

各区域最大发车次数

各分公司最大发车次数

最小发车次数

各网点最小发车次数

各区域最小发车次数

各分公司最小发车次数

平均发车次数

各网点平均发车次数

各区域平均发车次数

各分公司平均发车次数

三、表关联关系

1、事实表

表名

描述

tbl_transport_tool

车辆事实表

tbl_warehouse_transport_tool

仓库车辆关联表

2、维度表

表名

描述

tbl_dot

网点表

tbl_company

公司表

tbl_warehouse

仓库表

tbl_company_warehouse_map

公司仓库关联表

tbl_transport_tool

车辆表

3、关联关系

车辆表与维度表的关联关系如下:

四、车辆数据拉宽开发

1、拉宽后的字段

1.1、拉宽网点车辆表

字段名

别名

字段描述

tbl_transport_tool

id

id

运输工具ID

tbl_transport_tool

brand

brand

运输工具品牌

tbl_transport_tool

model

model

运输工具型号

tbl_transport_tool

type

type

运输工具类型

tbl_codes

codeDesc/ttTypeName

type_name

车辆类型描述

tbl_transport_tool

givenLoad

given_load

额定载重

tbl_transport_tool

loadCnUnit

load_cn_unit

中文载重单位

tbl_transport_tool

loadEnUnit

load_en_unit

英文载重单位

tbl_transport_tool

buyDt

buy_dt

购买时间

tbl_transport_tool

licensePlate

license_plate

牌照

tbl_transport_tool

state

state

运输工具状态

tbl_codes

codeDesc/ttStateName

state_name

运输工具状态描述

tbl_transport_tool

cdt

cdt

创建时间

tbl_transport_tool

udt

udt

修改时间

tbl_transport_tool

remark

remark

备注

tbl_dot

id

dot_id

网点id

tbl_dot

dotNumber

dot_number

网点编号

tbl_dot

dotName

dot_name

网点名称

tbl_dot

dotAddr

dot_addr

网点地址

tbl_dot

dotGisAddr

dot_gis_addr

网点GIS地址

tbl_dot

dotTel

dot_tel

网点电话

tbl_dot

manageAreaId

manage_area_id

网点管理辖区ID

tbl_dot

manageAreaGis

manage_area_gis

网点管理辖区地理围栏

tbl_company

id

company_id

公司ID

tbl_company

companyName

company_name

公司名称

tbl_company

cityId

city_id

城市ID

tbl_company

companyNumber

company_number

公司编号

tbl_company

companyAddr

company_addr

公司地址

tbl_company

companyAddrGis

company_addr_gis

公司gis地址

tbl_company

companyTel

company_tel

公司电话

tbl_company

isSubCompany

is_sub_company

母公司ID

tbl_transport_tool

yyyyMMdd(cdt)

day

创建时间年月日格式

1.2、拉宽仓库车辆表

字段名

别名

字段描述

tbl_transport_tool

id

id

运输工具ID

tbl_transport_tool

brand

brand

运输工具品牌

tbl_transport_tool

model

model

运输工具型号

tbl_transport_tool

type

type

运输工具类型

tbl_codes

codeDesc/ttTypeName

type_name

车辆类型描述

tbl_transport_tool

givenLoad

given_load

额定载重

tbl_transport_tool

loadCnUnit

load_cn_unit

中文载重单位

tbl_transport_tool

loadEnUnit

load_en_unit

英文载重单位

tbl_transport_tool

buyDt

buy_dt

购买时间

tbl_transport_tool

licensePlate

license_plate

牌照

tbl_transport_tool

state

state

运输工具状态

tbl_transport_tool

cdt

cdt

创建时间

tbl_transport_tool

udt

udt

修改时间

tbl_transport_tool

remark

remark

备注

tbl_warehouse

id

ws_id

仓库ID

tbl_warehouse

name

name

仓库名称

tbl_warehouse

addr

addr

仓库地址

tbl_warehouse

addrGis

addr_gis

仓库gis地址

tbl_warehouse

employeeId

employee_id

仓库负责人

tbl_warehouse

type

ws_type

仓库类型

tbl_warehouse

area

area

占地面积

tbl_warehouse

isLease

is_lease

是否租赁

tbl_company

id

company_id

公司ID

tbl_company

companyName

company_name

公司名称

tbl_company

cityId

city_id

城市ID

tbl_company

companyNumber

company_number

公司编号

tbl_company

companyAddr

company_addr

公司地址

tbl_company

companyAddrGis

company_addr_gis

公司gis地址

tbl_company

companyTel

company_tel

公司电话

tbl_company

isSubCompany

is_sub_company

母公司ID

tbl_transport_tool

yyyyMMdd(cdt)

day

创建时间年月日格式

2、SQL语句

2.1、拉宽网点车辆表

代码语言:javascript复制
SELECT 
ttl."id" ,
ttl."brand" ,
ttl."model" ,
ttl."type" ,
TTL ."given_load" ,
TTL ."load_cn_unit" ,
TTL ."load_en_unit" ,
TTL ."buy_dt" ,
TTL ."license_plate" ,
ttl."state" ,
ttl."cdt" ,
ttl."udt" ,
ttl."remark" ,
dot."id" AS dot_id,
dot."dot_name" ,
dot."dot_number" ,
dot."dot_addr" ,
dot."dot_gis_addr" ,
dot."dot_tel" ,
dot."manage_area_id" ,
dot."manage_area_gis" ,
COMPANY ."id" AS company_id,
COMPANY ."company_name",
COMPANY ."company_number",
COMPANY ."city_id",
COMPANY ."company_addr",
COMPANY ."company_addr_gis",
COMPANY ."company_tel",
COMPANY ."is_sub_company"
FROM "tbl_transport_tool"  ttl
LEFT JOIN "tbl_dot_transport_tool" tdtl ON ttl."id" = tdtl."transport_tool_id" 
LEFT JOIN "tbl_dot" dot ON DOT ."id" = TDTL ."dot_id"
LEFT JOIN "tbl_company_dot_map" companydot ON companydot."dot_id" = TDTL ."dot_id"
LEFT JOIN "tbl_company" company ON company."id" = companydot."company_id"

2.2、拉宽仓库车辆表

代码语言:javascript复制
SELECT 
ttl."id" ,
ttl."brand" ,
ttl."model" ,
ttl."type" ,
ttl."given_load" ,
ttl."load_cn_unit" ,
ttl."load_en_unit" ,
ttl."buy_dt" ,
ttl."license_plate" ,
ttl."state" ,
ttl."cdt" ,
ttl."udt" ,
ttl."remark" ,
warehouse."id" ,
warehouse."name",
warehouse."addr",
warehouse."addr_gis",
warehouse."employee_id",
warehouse."type",
warehouse."area",
warehouse."is_lease",
COMPANY ."id" AS company_id,
COMPANY ."company_name",
COMPANY ."company_number",
COMPANY ."city_id",
COMPANY ."company_addr",
COMPANY ."company_addr_gis",
COMPANY ."company_tel",
COMPANY ."is_sub_company"
FROM "tbl_warehouse_transport_tool"  twt
LEFT JOIN "tbl_transport_tool" ttl ON twt."transport_tool_id" = ttl."id" 
LEFT JOIN "tbl_warehouse" warehouse ON WAREHOUSE ."id" = twt."warehouse_id"
LEFT JOIN "tbl_company_warehouse_map" warehouse_map ON warehouse_map."warehouse_id" = warehouse."id" 
LEFT JOIN "tbl_company" company ON company."id" = warehouse_map."company_id"

3、Spark实现

实现步骤:

  • dwd目录下创建TransportToolDWD 单例对象,继承自OfflineApp特质
  • 初始化环境的参数,创建SparkSession对象
  • 获取运输工具表(tbl_transport_tool)数据,并缓存数据
  • 判断是否是首次运行,如果是首次运行的话,则全量装载数据(含历史数据)
  • 获取网点运输工具关联表(tbl_dot_transport_tool)数据,并缓存数据
  • 获取网点表(tbl_dot)数据,并缓存数据
  • 获取公司网点关联表(tbl_company_dot_map)数据,并缓存数据
  • 获取仓库运输工具关联表(tbl_warehouse_transport_tool)数据,并缓存数据
  • 获取公司仓库关联表(tbl_company_warehouse_map)数据,并缓存数据
  • 获取仓库表(tbl_warehouse)数据,并缓存数据
  • 获取公司表(tbl_company)数据,并缓存数据
  • 根据以下方式拉宽仓库车辆明细数据
  • 根据交通工具id,在交通工具表中获取交通工具数据
  • 根据网点id,在网点表中获取网点数据
  • 根据公司id,在公司表中获取公司数据
  • 根据仓库id,在仓库表中获取仓库数据
  • 创建网点车辆明细宽表(若存在则不创建)
  • 创建仓库车辆明细宽表(若存在则不创建)
  • 将仓库车辆明细宽表数据写入到kudu数据表中
  • 删除缓存数据

3.1、初始化环境变量

初始化运单明细拉宽作业的环境变量

代码语言:javascript复制
package cn.it.logistics.offline.dwd

import cn.it.logistics.common.{Configuration, SparkUtils}
import cn.it.logistics.offline.OfflineApp
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession

/**
 * 车辆主题开发
 * 将车辆相关的表进行关联拉宽,将拉宽后的数据写入到车辆宽表中
 * 1)网点车辆关联表->派送网点所拥有的车辆(三轮车)
 * 2)仓库车辆关联表->仓库与仓库之间的运输工具(货车、货机)
 */
object TransportToolDWD extends OfflineApp{
  //定义应用的名称
  val appName = this.getClass.getSimpleName

  /**
   * 入口函数
   * @param args
   */
  def main(args: Array[String]): Unit = {
    /**
     * 实现步骤:
     * 1)初始化sparkConf对象
     * 2)创建sparkSession对象
     * 3)加载kudu中的事实表和维度表的数据(将加载后的数据进行缓存)
     * 4)定义维度表与事实表的关联
     * 5)将拉宽后的数据再次写回到kudu数据库中(DWD明细层)
     * 5.1:创建车辆明细宽表的schema表结构
     * 5.2:创建车辆宽表(判断宽表是否存在,如果不存在则创建)
     * 5.3:将数据写入到kudu中
     * 6)将缓存的数据删除掉
     * 7)停止任务
     */

    //1)初始化sparkConf对象
    val sparkConf: SparkConf = SparkUtils.autoSettingEnv(
      SparkUtils.sparkConf(appName)
    )

    //2)创建sparkSession对象
    val sparkSession: SparkSession = SparkUtils.getSparkSession(sparkConf)
    sparkSession.sparkContext.setLogLevel(Configuration.LOG_OFF)

    //数据处理
    execute(sparkSession)
  }
  /**
   * 数据处理
   *
   * @param sparkSession
   */
  override def execute(sparkSession: SparkSession): Unit = {
    sparkSession.stop()
  }
}

3.2、加载运输工具表及车辆相关的表并缓存

  • 加载运输工具表的时候,需要指定日期条件,因为车辆主题最终需要Azkaban定时调度执行,每天执行一次增量数据,因此需要指定日期。
  • 判断是否是首次运行,如果是首次运行的话,则全量装载数据(含历史数据)
代码语言:javascript复制
//TODO 3)加载kudu中的事实表和维度表的数据(将加载后的数据进行缓存)
//加载车辆表数据(事实表)
val ttDF: DataFrame = getKuduSource(sparkSession, TableMapping.transportTool, Configuration.isFirstRunnable).persist(StorageLevel.DISK_ONLY_2)

//加载仓库车辆表数据
val ttDotDF: DataFrame = getKuduSource(sparkSession, TableMapping.dotTransportTool, true).persist(StorageLevel.DISK_ONLY_2)

//加载网点表的数据
val dotDF: DataFrame = getKuduSource(sparkSession, TableMapping.dot, true).persist(StorageLevel.DISK_ONLY_2)

//加载公司网点关联表的数据
val companyDotDF: DataFrame = getKuduSource(sparkSession, TableMapping.companyDotMap, true).persist(StorageLevel.DISK_ONLY_2)

//加载公司表的数据
val companyDF: DataFrame = getKuduSource(sparkSession, TableMapping.company, true).persist(StorageLevel.DISK_ONLY_2)

//加载仓库车辆关联表数据(事实表)
val ttWsDF: DataFrame = getKuduSource(sparkSession, TableMapping.warehouseTransportTool, Configuration.isFirstRunnable).persist(StorageLevel.DISK_ONLY_2)

//加载仓库公司关联表
val companyWareHouseMapDF: DataFrame = getKuduSource(sparkSession, TableMapping.companyWarehouseMap, true).persist(StorageLevel.DISK_ONLY_2)

//加载仓库表数据
val wsDF: DataFrame = getKuduSource(sparkSession, TableMapping.warehouse, true).persist(StorageLevel.DISK_ONLY_2)

//加载物流码表数据
val codesDF: DataFrame = getKuduSource(sparkSession, TableMapping.codes, true).persist(StorageLevel.DISK_ONLY_2)

import sparkSession.implicits._
//获取运输工具类型
val transportTypeDF: DataFrame = codesDF.where($"type" === CodeTypeMapping.TransportType).select($"code".as("ttType"), $"codeDesc".as("ttTypeName"))

//获取运输工具状态
val transportStatusDF: DataFrame = codesDF.where($"type" === CodeTypeMapping.TransportStatus).select($"code".as("ttStatus"), $"codeDesc".as("ttStateName"))

3.3、定义网点车辆宽表的关联关系

  • 为了在DWS层任务中方便的获取每日增量网点车辆表数据(根据日期),因此在DataFrame基础上动态增加列(day),指定日期格式为yyyyMMdd

代码如下:

代码语言:javascript复制
//TODO 4)定义维度表与事实表的关联
val left_outer: String = "left_outer"
// 4.1:拉宽网点车辆表
val ttDotDetailDF = ttDotDF.join(ttDF, ttDotDF.col("transportToolId") === ttDF.col("id"), left_outer) //网点车辆表关联车辆表
  .join(transportTypeDF, transportTypeDF("ttType") === ttDF("type"), left_outer) //车辆表类型关联字典表类型
  .join(transportStatusDF, transportStatusDF("ttStatus") === ttDF("state"), left_outer) //车辆表状态管理字典表状态
  .join(dotDF, dotDF.col("id") === ttDotDF.col("dotId"), left_outer) //网点车辆表关联网点
  .join(companyDotDF, ttDotDF.col("dotId") === companyDotDF.col("dotId"), left_outer) //网点车辆管连网点公司关联表
  .join(companyDF, companyDotDF.col("companyId") === companyDF.col("id"), left_outer) //网点车辆表关联公司表
  .withColumn("day", date_format(ttDotDF("cdt"), "yyyyMMdd"))//虚拟列,可以根据这个日期列作为分区字段,可以保证同一天的数据保存在同一个分区中
  .sort(ttDF.col("cdt").asc)
  .select(
    ttDF("id"), //车辆表id
    ttDF("brand"), //车辆表brand
    ttDF("model"), //车辆表model
    ttDF("type").cast(IntegerType), //车辆表type
    transportTypeDF("ttTypeName").as("type_name"), // 车辆表type对应字典表车辆类型的具体描述
    ttDF("givenLoad").cast(IntegerType).as("given_load"), //车辆表given_load
    ttDF("loadCnUnit").as("load_cn_unit"), //车辆表load_cn_unit
    ttDF("loadEnUnit").as("load_en_unit"), //车辆表load_en_unit
    ttDF("buyDt").as("buy_dt"), //车辆表buy_dt
    ttDF("licensePlate").as("license_plate"), //车辆表license_plate
    ttDF("state").cast(IntegerType), //车辆表state
    transportStatusDF("ttStateName").as("state_name"), // 车辆表state对应字典表类型的具体描述
    ttDF("cdt"), //车辆表cdt
    ttDF("udt"), //车辆表udt
    ttDF("remark"), //车辆表remark
    dotDF("id").as("dot_id"), //网点表dot_id
    dotDF("dotNumber").as("dot_number"), //网点表dot_number
    dotDF("dotName").as("dot_name"), //网点表dot_name
    dotDF("dotAddr").as("dot_addr"), //网点表dot_addr
    dotDF("dotGisAddr").as("dot_gis_addr"), //网点表dot_gis_addr
    dotDF("dotTel").as("dot_tel"), //网点表dot_tel
    dotDF("manageAreaId").as("manage_area_id"), //网点表manage_area_id
    dotDF("manageAreaGis").as("manage_area_gis"), //网点表manage_area_gis
    companyDF("id").alias("company_id"), //公司表id
    companyDF("companyName").as("company_name"), //公司表company_name
    companyDF("cityId").as("city_id"), //公司表city_id
    companyDF("companyNumber").as("company_number"), //公司表company_number
    companyDF("companyAddr").as("company_addr"), //公司表company_addr
    companyDF("companyAddrGis").as("company_addr_gis"), //公司表company_addr_gis
    companyDF("companyTel").as("company_tel"), //公司表company_tel
    companyDF("isSubCompany").as("is_sub_company"), //公司表is_sub_company
    $"day"
  )

3.4、创建网点车辆明细宽表并将网点车辆明细数据写入到kudu数据表中

网点车辆明细宽表数据需要保存到kudu中,因此在第一次执行网点车辆明细拉宽操作时,网点车辆明细宽表是不存在的,因此需要实现自动判断宽表是否存在,如果不存在则创建

实现步骤:

  • TransportToolDWD 单例对象中调用save方法

实现过程:

  • TransportToolDWD 单例对象Main方法中调用save方法
代码语言:javascript复制
//TODO 5)将拉宽后的数据再次写回到kudu数据库中(DWD明细层)
save(ttDotDetailDF, OfflineTableDefine.dotTransportToolDetail)

3.5、定义仓库车辆宽表的关联关系

  • 为了在DWS层任务中方便的获取每日增量仓库车辆表数据(根据日期),因此在DataFrame基础上动态增加列(day),指定日期格式为yyyyMMdd

代码如下:

代码语言:javascript复制
// 4.2:拉宽仓库车辆表
// 拉宽仓库车辆表
val ttWsDetailDF = ttWsDF.join(ttDF, ttWsDF.col("transportToolId") === ttDF.col("id"), left_outer) //仓库车辆表关联车辆表
  .join(transportTypeDF, transportTypeDF("ttType") === ttDF("type"), left_outer) //车辆表类型关联字典表类型
  .join(transportStatusDF, transportStatusDF("ttStatus") === ttDF("state"), left_outer) //车辆表状态管理字典表状态
  .join(wsDF, wsDF.col("id") === ttWsDF.col("warehouseId"), left_outer) //仓库车辆表关联仓库
  .join(companyWareHouseMapDF, ttWsDF.col("warehouseId") === companyWareHouseMapDF.col("warehouseId"), left_outer) //仓库车辆管连仓库公司关联表
  .join(companyDF, companyDF.col("id") === companyWareHouseMapDF.col("companyId"), left_outer)
  .withColumn("day", date_format(ttWsDF("cdt"), "yyyyMMdd"))//虚拟列,可以根据这个日期列作为分区字段,可以保证同一天的数据保存在同一个分区中
  .sort(ttDF.col("cdt").asc)
  .select(
    ttDF("id"), //车辆表id
    ttDF("brand"), //车辆表brand
    ttDF("model"), //车辆表model
    ttDF("type").cast(IntegerType), //车辆表type
    transportTypeDF("ttTypeName").as("type_name"), // 车辆表type对应字典表车辆类型的具体描述
    ttDF("givenLoad").as("given_load").cast(IntegerType), //车辆表given_load
    ttDF("loadCnUnit").as("load_cn_unit"), //车辆表load_cn_unit
    ttDF("loadEnUnit").as("load_en_unit"), //车辆表load_en_unit
    ttDF("buyDt").as("buy_dt"), //车辆表buy_dt
    ttDF("licensePlate").as("license_plate"), //车辆表license_plate
    ttDF("state").cast(IntegerType), //车辆表state
    transportStatusDF("ttStateName").as("state_name"), // 车辆表type对应字典表车辆类型的具体描述
    ttDF("cdt"), //车辆表cdt
    ttDF("udt"), //车辆表udt
    ttDF("remark"), //车辆表remark
    wsDF("id").as("ws_id"), //仓库表id
    wsDF("name"), //仓库表name
    wsDF("addr"), //仓库表addr
    wsDF("addrGis").as("addr_gis"), //仓库表addr_gis
    wsDF("employeeId").as("employee_id"), //仓库表employee_id
    wsDF("type").as("ws_type").cast(IntegerType), //仓库表type
    wsDF("area"), //仓库表area
    wsDF("isLease").as("is_lease").cast(IntegerType), //仓库表is_lease
    companyDF("id").alias("company_id"), //公司表id
    companyDF("companyName").as("company_name"), //公司表company_name
    companyDF("cityId").as("city_id"), //公司表city_id
    companyDF("companyNumber").as("company_number"), //公司表company_number
    companyDF("companyAddr").as("company_addr"), //公司表company_addr
    companyDF("companyAddrGis").as("company_addr_gis"), //公司表company_addr_gis
    companyDF("companyTel").as("company_tel"), //公司表company_tel
    companyDF("isSubCompany").as("is_sub_company"), //公司表is_sub_company
    $"day"
  )

3.6、创建仓库车辆明细宽表并将仓库车辆明细数据写入到kudu数据表中

仓库车辆明细宽表数据需要保存到kudu中,因此在第一次执行仓库车辆明细拉宽操作时,仓库车辆明细宽表是不存在的,因此需要实现自动判断宽表是否存在,如果不存在则创建

实现步骤:

  • TransportToolDWD 单例对象中调用save方法

实现过程:

  • TransportToolDWD 单例对象Main方法中调用save方法
代码语言:javascript复制
save(ttWsDetailDF, OfflineTableDefine.warehouseTransportToolDetail)

3.7、删除缓存数据

为了释放资源,车辆明细宽表数据计算完成以后,需要将缓存的源表数据删除。

代码语言:javascript复制
//TODO 6)将缓存的数据删除掉
ttDF.unpersist()
ttDotDF.unpersist()
dotDF.unpersist()
companyDotDF.unpersist()
companyDF.unpersist()
ttWsDF.unpersist()
companyWareHouseMapDF.unpersist()
wsDF.unpersist()
codesDF.unpersist()

3.8、完整代码

代码语言:javascript复制
package cn.it.logistics.offline.dwd

import cn.it.logistics.common.{CodeTypeMapping, Configuration, OfflineTableDefine, SparkUtils, TableMapping}
import cn.it.logistics.offline.OfflineApp
import org.apache.spark.SparkConf
import org.apache.spark.sql.functions.date_format
import org.apache.spark.sql.types.IntegerType
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.storage.StorageLevel

/**
 * 车辆主题开发
 * 将车辆相关的表进行关联拉宽,将拉宽后的数据写入到车辆宽表中
 * 1)网点车辆关联表->派送网点所拥有的车辆(三轮车)
 * 2)仓库车辆关联表->仓库与仓库之间的运输工具(货车、货机)
 */
object TransportToolDWD extends OfflineApp{
  //定义应用的名称
  val appName = this.getClass.getSimpleName

  /**
   * 入口函数
   * @param args
   */
  def main(args: Array[String]): Unit = {
    /**
     * 实现步骤:
     * 1)初始化sparkConf对象
     * 2)创建sparkSession对象
     * 3)加载kudu中的事实表和维度表的数据(将加载后的数据进行缓存)
     * 4)定义维度表与事实表的关联
     * 5)将拉宽后的数据再次写回到kudu数据库中(DWD明细层)
     * 5.1:创建车辆明细宽表的schema表结构
     * 5.2:创建车辆宽表(判断宽表是否存在,如果不存在则创建)
     * 5.3:将数据写入到kudu中
     * 6)将缓存的数据删除掉
     * 7)停止任务
     */

    //1)初始化sparkConf对象
    val sparkConf: SparkConf = SparkUtils.autoSettingEnv(
      SparkUtils.sparkConf(appName)
    )

    //2)创建sparkSession对象
    val sparkSession: SparkSession = SparkUtils.getSparkSession(sparkConf)
    sparkSession.sparkContext.setLogLevel(Configuration.LOG_OFF)

    //数据处理
    execute(sparkSession)
  }
  /**
   * 数据处理
   *
   * @param sparkSession
   */
  override def execute(sparkSession: SparkSession): Unit = {
    //TODO 3)加载kudu中的事实表和维度表的数据(将加载后的数据进行缓存)
    //加载车辆表数据(事实表)
    val ttDF: DataFrame = getKuduSource(sparkSession, TableMapping.transportTool, Configuration.isFirstRunnable).persist(StorageLevel.DISK_ONLY_2)

    //加载仓库车辆表数据
    val ttDotDF: DataFrame = getKuduSource(sparkSession, TableMapping.dotTransportTool, true).persist(StorageLevel.DISK_ONLY_2)

    //加载网点表的数据
    val dotDF: DataFrame = getKuduSource(sparkSession, TableMapping.dot, true).persist(StorageLevel.DISK_ONLY_2)

    //加载公司网点关联表的数据
    val companyDotDF: DataFrame = getKuduSource(sparkSession, TableMapping.companyDotMap, true).persist(StorageLevel.DISK_ONLY_2)

    //加载公司表的数据
    val companyDF: DataFrame = getKuduSource(sparkSession, TableMapping.company, true).persist(StorageLevel.DISK_ONLY_2)

    //加载仓库车辆关联表数据(事实表)
    val ttWsDF: DataFrame = getKuduSource(sparkSession, TableMapping.warehouseTransportTool, Configuration.isFirstRunnable).persist(StorageLevel.DISK_ONLY_2)

    //加载仓库公司关联表
    val companyWareHouseMapDF: DataFrame = getKuduSource(sparkSession, TableMapping.companyWarehouseMap, true).persist(StorageLevel.DISK_ONLY_2)

    //加载仓库表数据
    val wsDF: DataFrame = getKuduSource(sparkSession, TableMapping.warehouse, true).persist(StorageLevel.DISK_ONLY_2)

    //加载物流码表数据
    val codesDF: DataFrame = getKuduSource(sparkSession, TableMapping.codes, true).persist(StorageLevel.DISK_ONLY_2)

    import sparkSession.implicits._
    //获取运输工具类型
    val transportTypeDF: DataFrame = codesDF.where($"type" === CodeTypeMapping.TransportType).select($"code".as("ttType"), $"codeDesc".as("ttTypeName"))

    //获取运输工具状态
    val transportStatusDF: DataFrame = codesDF.where($"type" === CodeTypeMapping.TransportStatus).select($"code".as("ttStatus"), $"codeDesc".as("ttStateName"))

    //TODO 4)定义维度表与事实表的关联
    val left_outer: String = "left_outer"
    // 4.1:拉宽网点车辆表
    val ttDotDetailDF = ttDotDF.join(ttDF, ttDotDF.col("transportToolId") === ttDF.col("id"), left_outer) //网点车辆表关联车辆表
      .join(transportTypeDF, transportTypeDF("ttType") === ttDF("type"), left_outer) //车辆表类型关联字典表类型
      .join(transportStatusDF, transportStatusDF("ttStatus") === ttDF("state"), left_outer) //车辆表状态管理字典表状态
      .join(dotDF, dotDF.col("id") === ttDotDF.col("dotId"), left_outer) //网点车辆表关联网点
      .join(companyDotDF, ttDotDF.col("dotId") === companyDotDF.col("dotId"), left_outer) //网点车辆管连网点公司关联表
      .join(companyDF, companyDotDF.col("companyId") === companyDF.col("id"), left_outer) //网点车辆表关联公司表
      .withColumn("day", date_format(ttDotDF("cdt"), "yyyyMMdd"))//虚拟列,可以根据这个日期列作为分区字段,可以保证同一天的数据保存在同一个分区中
      .sort(ttDF.col("cdt").asc)
      .select(
        ttDF("id"), //车辆表id
        ttDF("brand"), //车辆表brand
        ttDF("model"), //车辆表model
        ttDF("type").cast(IntegerType), //车辆表type
        transportTypeDF("ttTypeName").as("type_name"), // 车辆表type对应字典表车辆类型的具体描述
        ttDF("givenLoad").cast(IntegerType).as("given_load"), //车辆表given_load
        ttDF("loadCnUnit").as("load_cn_unit"), //车辆表load_cn_unit
        ttDF("loadEnUnit").as("load_en_unit"), //车辆表load_en_unit
        ttDF("buyDt").as("buy_dt"), //车辆表buy_dt
        ttDF("licensePlate").as("license_plate"), //车辆表license_plate
        ttDF("state").cast(IntegerType), //车辆表state
        transportStatusDF("ttStateName").as("state_name"), // 车辆表state对应字典表类型的具体描述
        ttDF("cdt"), //车辆表cdt
        ttDF("udt"), //车辆表udt
        ttDF("remark"), //车辆表remark
        dotDF("id").as("dot_id"), //网点表dot_id
        dotDF("dotNumber").as("dot_number"), //网点表dot_number
        dotDF("dotName").as("dot_name"), //网点表dot_name
        dotDF("dotAddr").as("dot_addr"), //网点表dot_addr
        dotDF("dotGisAddr").as("dot_gis_addr"), //网点表dot_gis_addr
        dotDF("dotTel").as("dot_tel"), //网点表dot_tel
        dotDF("manageAreaId").as("manage_area_id"), //网点表manage_area_id
        dotDF("manageAreaGis").as("manage_area_gis"), //网点表manage_area_gis
        companyDF("id").alias("company_id"), //公司表id
        companyDF("companyName").as("company_name"), //公司表company_name
        companyDF("cityId").as("city_id"), //公司表city_id
        companyDF("companyNumber").as("company_number"), //公司表company_number
        companyDF("companyAddr").as("company_addr"), //公司表company_addr
        companyDF("companyAddrGis").as("company_addr_gis"), //公司表company_addr_gis
        companyDF("companyTel").as("company_tel"), //公司表company_tel
        companyDF("isSubCompany").as("is_sub_company"), //公司表is_sub_company
        $"day"
      )

    // 4.2:拉宽仓库车辆表
    // 拉宽仓库车辆表
    val ttWsDetailDF = ttWsDF.join(ttDF, ttWsDF.col("transportToolId") === ttDF.col("id"), left_outer) //仓库车辆表关联车辆表
      .join(transportTypeDF, transportTypeDF("ttType") === ttDF("type"), left_outer) //车辆表类型关联字典表类型
      .join(transportStatusDF, transportStatusDF("ttStatus") === ttDF("state"), left_outer) //车辆表状态管理字典表状态
      .join(wsDF, wsDF.col("id") === ttWsDF.col("warehouseId"), left_outer) //仓库车辆表关联仓库
      .join(companyWareHouseMapDF, ttWsDF.col("warehouseId") === companyWareHouseMapDF.col("warehouseId"), left_outer) //仓库车辆管连仓库公司关联表
      .join(companyDF, companyDF.col("id") === companyWareHouseMapDF.col("companyId"), left_outer)
      .withColumn("day", date_format(ttWsDF("cdt"), "yyyyMMdd"))//虚拟列,可以根据这个日期列作为分区字段,可以保证同一天的数据保存在同一个分区中
      .sort(ttDF.col("cdt").asc)
      .select(
        ttDF("id"), //车辆表id
        ttDF("brand"), //车辆表brand
        ttDF("model"), //车辆表model
        ttDF("type").cast(IntegerType), //车辆表type
        transportTypeDF("ttTypeName").as("type_name"), // 车辆表type对应字典表车辆类型的具体描述
        ttDF("givenLoad").as("given_load").cast(IntegerType), //车辆表given_load
        ttDF("loadCnUnit").as("load_cn_unit"), //车辆表load_cn_unit
        ttDF("loadEnUnit").as("load_en_unit"), //车辆表load_en_unit
        ttDF("buyDt").as("buy_dt"), //车辆表buy_dt
        ttDF("licensePlate").as("license_plate"), //车辆表license_plate
        ttDF("state").cast(IntegerType), //车辆表state
        transportStatusDF("ttStateName").as("state_name"), // 车辆表type对应字典表车辆类型的具体描述
        ttDF("cdt"), //车辆表cdt
        ttDF("udt"), //车辆表udt
        ttDF("remark"), //车辆表remark
        wsDF("id").as("ws_id"), //仓库表id
        wsDF("name"), //仓库表name
        wsDF("addr"), //仓库表addr
        wsDF("addrGis").as("addr_gis"), //仓库表addr_gis
        wsDF("employeeId").as("employee_id"), //仓库表employee_id
        wsDF("type").as("ws_type").cast(IntegerType), //仓库表type
        wsDF("area"), //仓库表area
        wsDF("isLease").as("is_lease").cast(IntegerType), //仓库表is_lease
        companyDF("id").alias("company_id"), //公司表id
        companyDF("companyName").as("company_name"), //公司表company_name
        companyDF("cityId").as("city_id"), //公司表city_id
        companyDF("companyNumber").as("company_number"), //公司表company_number
        companyDF("companyAddr").as("company_addr"), //公司表company_addr
        companyDF("companyAddrGis").as("company_addr_gis"), //公司表company_addr_gis
        companyDF("companyTel").as("company_tel"), //公司表company_tel
        companyDF("isSubCompany").as("is_sub_company"), //公司表is_sub_company
        $"day"
      )

    //TODO 5)将拉宽后的数据再次写回到kudu数据库中(DWD明细层)
    save(ttDotDetailDF, OfflineTableDefine.dotTransportToolDetail)
    save(ttWsDetailDF, OfflineTableDefine.warehouseTransportToolDetail)

    //TODO 6)将缓存的数据删除掉
    ttDF.unpersist()
    ttDotDF.unpersist()
    dotDF.unpersist()
    companyDotDF.unpersist()
    companyDF.unpersist()
    ttWsDF.unpersist()
    companyWareHouseMapDF.unpersist()
    wsDF.unpersist()
    codesDF.unpersist()

    sparkSession.stop()
  }
}

五、车辆数据指标开发

1、计算的字段

字段名

字段描述

id

数据产生时间

ttDotTotalCount

网点总发车次数

maxTtDotTotalCount

各网点最大发车次数

minTtDotTotalCount

各网点最小发车次数

avgTtDotTotalCount

各网点平均发车次数

areaDotTotalCount

区域总发车次数

maxAreaDotTotalCount

各区域最大发车次数

minAreaDotTotalCount

各区域最小发车次数

avgAreaDotTotalCount

各区域平均发车次数

companyDotTotalCount

公司总发车次数

maxCompanyDotTotalCount

各公司最大发车次数

minCompanyDotTotalCount

各公司最小发车次数

avgCompanyDotTotalCount

各公司平均发车次数

2、Spark实现

实现步骤:

  • 在dws目录下创建TransportToolDWS 单例对象,继承自OfflineApp特质
  • 初始化环境的参数,创建SparkSession对象
  • 根据指定的日期获取拉宽后的车辆主题宽表(tbl_dot_transport_tool_detail、tbl_dot_transport_tool_detail)增量数据,并缓存数据
  • 判断是否是首次运行,如果是首次运行的话,则全量装载数据(含历史数据)
  • 指标计算
  • 各网点发车次数
  • 各网点最大发车次数
  • 各网点最小发车次数
  • 各网点平均发车次数
  • 各区域发车次数
  • 各区域最大发车次数
  • 各区域最小发车次数
  • 各区域平均发车次数
  • 各公司发车次数
  • 各公司最大发车次数
  • 各公司最小发车次数
  • 各公司平均发车次数
  • 获取当前时间yyyyMMddHH
  • 构建要持久化的指标数据(需要判断计算的指标是否有值,若没有需要赋值默认值
  • 通过StructType构建指定Schema
  • 创建车辆主题指标数据表(若存在则不创建)
  • 持久化指标数据到kudu表

2.1、初始化环境变量

代码语言:javascript复制
package cn.it.logistics.offline.dws

import cn.it.logistics.common.{Configuration, SparkUtils}
import cn.it.logistics.offline.OfflineApp
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession

/**
 * 车辆主题指标开发
 */
object TransportToolDWS extends OfflineApp{
  //定义应用程序的名称
  val appName = this.getClass.getSimpleName

  /**
   * 入口函数
   * @param args
   */
  def main(args: Array[String]): Unit = {
    /**
     * 实现步骤:
     * 1)创建SparkConf对象
     * 2)创建SparkSession对象
     * 3)读取车辆明细宽表的数据
     * 4)对车辆明细宽表的数据进行指标的计算
     * 5)将计算好的指标数据写入到kudu数据库中
     * 5.1:定义指标结果表的schema信息
     * 5.2:组织需要写入到kudu表的数据
     * 5.3:判断指标结果表是否存在,如果不存在则创建
     * 5.4:将数据写入到kudu表中
     * 6)删除缓存数据
     * 7)停止任务,退出sparksession
     */

    //TODO 1)创建SparkConf对象
    val sparkConf: SparkConf = SparkUtils.autoSettingEnv(
      SparkUtils.sparkConf(appName)
    )

    //TODO 2)创建SparkSession对象
    val sparkSession: SparkSession = SparkUtils.getSparkSession(sparkConf)
    sparkSession.sparkContext.setLogLevel(Configuration.LOG_OFF)

    //处理数据
    execute(sparkSession)
  }
  
  /**
   * 数据处理
   *
   * @param sparkSession
   */
  override def execute(sparkSession: SparkSession): Unit = {
    sparkSession.stop()
  }
}

2.2、加载车辆宽表增量数据并缓存

加载车辆宽表的时候,需要指定日期条件,因为车辆主题最终需要Azkaban定时调度执行,每天执行一次增量数据,因此需要指定日期。

代码语言:javascript复制
//TODO 3)读取车辆明细宽表的数据
val ttDotDetailDF: DataFrame = getKuduSource(sparkSession, OfflineTableDefine.dotTransportToolDetail, Configuration.isFirstRunnable).persist(StorageLevel.DISK_ONLY_2)
val ttWarehouseDetailDF: DataFrame = getKuduSource(sparkSession, OfflineTableDefine.warehouseTransportToolDetail, Configuration.isFirstRunnable).persist(StorageLevel.DISK_ONLY_2)

2.3、指标计算

代码语言:javascript复制
//根据网点车辆的日期进行分组
val ttDotDetailGroupByDayDF: DataFrame = ttDotDetailDF.select("day").groupBy("day").count().cache()

//导入隐式转换
import sparkSession.implicits._

//定义计算好的指标结果集合对象
val dotRows: ArrayBuffer[Row] = ArrayBuffer[Row]()
//循环遍历网点车辆每个日期的车辆明细宽表数据
ttDotDetailGroupByDayDF.collect().foreach(row=> {
  //获取到要处理的数据所在的日期
  val day: String = row.getAs[String](0)

  //返回指定日期的仓库明细数据
  val ttDotDetailByDayDF: DataFrame = ttDotDetailDF.where(col("day") === day).toDF().persist(StorageLevel.DISK_ONLY_2)

  //各网点的发车次数(西三旗:10,西二旗:20)
  val ttDotTotalCountDF: DataFrame = ttDotDetailByDayDF.groupBy($"dot_id").agg(count("dot_id").alias("cnt")).persist(StorageLevel.DISK_ONLY_2)
  //计算各网点的总发车次数
  val ttDotTotalCount: Row = ttDotTotalCountDF.agg(sum("cnt")).first()
  //各网点的最大发车次数、最小发车次数、平均发车次数
  val maxAndMinAndAvgTTDotTotalCount: Row = ttDotTotalCountDF.agg(max("cnt"), min("cnt"), avg("cnt")).first()
  val maxTTDotTotalCount: Any = maxAndMinAndAvgTTDotTotalCount(0)
  val minTTDotTotalCount: Any = maxAndMinAndAvgTTDotTotalCount(1)
  val avgTTDotTotalCount: Any = maxAndMinAndAvgTTDotTotalCount(2)

  // 各区域发车次数
  val areaDotTotalCountDF = ttDotDetailByDayDF.groupBy("city_id").agg(count($"id").alias("cnt")).persist(StorageLevel.DISK_ONLY_2)
  val areaDotTotalCount = areaDotTotalCountDF.agg(sum($"cnt")).first()
  // 各区域最大发车次数、最小发车次数和平均发车次数
  val maxAndMinAndAvgAreaDotTotalCount = areaDotTotalCountDF.agg(max($"cnt"), min($"cnt"), avg($"cnt")).first()
  val maxAreaDotTotalCount = maxAndMinAndAvgAreaDotTotalCount(0)
  val minAreaDotTotalCount = maxAndMinAndAvgAreaDotTotalCount(1)
  val avgAreaDotTotalCount = maxAndMinAndAvgAreaDotTotalCount(2)

  // 各公司发车次数
  val companyDotTotalCountDF = ttDotDetailByDayDF.groupBy("company_id").agg(count($"id").alias("cnt")).persist(StorageLevel.DISK_ONLY_2)
  val companyDotTotalCount = companyDotTotalCountDF.agg(sum($"cnt")).first()
  // 各公司最大发车次数、最小发车次数和平均发车次数
  val maxAndMinAndAvgCompanyDotTotalCount = companyDotTotalCountDF.agg(max($"cnt"), min($"cnt"), avg($"cnt")).first()
  val maxCompanyDotTotalCount = maxAndMinAndAvgCompanyDotTotalCount(0)
  val minCompanyDotTotalCount = maxAndMinAndAvgCompanyDotTotalCount(1)
  val avgCompanyDotTotalCount = maxAndMinAndAvgCompanyDotTotalCount(2)

  //将计算好的指标数据封装到row对象中
  val rowInfo: Row = Row(
    day,
    if(ttDotTotalCount.isNullAt(0)) 0L else ttDotTotalCount.get(0).asInstanceOf[Number].longValue(),
    if(maxTTDotTotalCount==null) 0L else maxTTDotTotalCount.asInstanceOf[Number].longValue(),
    if(minTTDotTotalCount==null) 0L else minTTDotTotalCount.asInstanceOf[Number].longValue(),
    if(avgTTDotTotalCount==null) 0L else avgTTDotTotalCount.asInstanceOf[Number].longValue(),
    if(areaDotTotalCount.isNullAt(0)) 0L else areaDotTotalCount.get(0).asInstanceOf[Number].longValue(),
    if(maxAreaDotTotalCount==null) 0L else maxAreaDotTotalCount.asInstanceOf[Number].longValue(),
    if(minAreaDotTotalCount==null) 0L else minAreaDotTotalCount.asInstanceOf[Number].longValue(),
    if(avgAreaDotTotalCount==null) 0L else avgAreaDotTotalCount.asInstanceOf[Number].longValue(),
    if(companyDotTotalCount.isNullAt(0)) 0L else ttDotTotalCount.get(0).asInstanceOf[Number].longValue(),
    if(maxCompanyDotTotalCount==null) 0L else maxCompanyDotTotalCount.asInstanceOf[Number].longValue(),
    if(minCompanyDotTotalCount==null) 0L else minCompanyDotTotalCount.asInstanceOf[Number].longValue(),
    if(avgCompanyDotTotalCount==null) 0L else avgCompanyDotTotalCount.asInstanceOf[Number].longValue()
  )

  dotRows.append(rowInfo)
  println(rowInfo)

  ttDotDetailByDayDF.unpersist()
  ttDotTotalCountDF.unpersist()
  areaDotTotalCountDF.unpersist()
  companyDotTotalCountDF.unpersist()
})

//根据仓库车辆的日期进行分组
val ttWsDetailGroupByDayDF: DataFrame = ttWarehouseDetailDF.select("day").groupBy("day").count().cache()
//定义计算好的指标结果集合对象
val wsRows: ArrayBuffer[Row] = ArrayBuffer[Row]()
//循环遍历仓库车辆每个日期的车辆明细宽表数据
ttWsDetailGroupByDayDF.collect().foreach(row=> {
  //获取到要处理的数据所在的日期
  val day: String = row.getAs[String](0)

  //返回指定日期的仓库明细数据
  val ttWsDetailByDayDF: DataFrame = ttWarehouseDetailDF.where(col("day") === day).toDF().persist(StorageLevel.DISK_ONLY_2)

  val whTransportToolTotalCountDF: DataFrame = ttWsDetailByDayDF.groupBy($"ws_id").agg(count("ws_id").alias("cnt")).persist(StorageLevel.DISK_ONLY_2)
  //计算各仓库的总发车次数
  val whTransportToolTotalCount: Row = whTransportToolTotalCountDF.agg(sum("cnt")).first()
  //各仓库的最大发车次数、最小发车次数、平均发车次数
  val maxAndMinAndAvgWhTransportToolTotalCount: Row = whTransportToolTotalCountDF.agg(max("cnt"), min("cnt"), avg("cnt")).first()
  val maxTransportToolCount: Any = maxAndMinAndAvgWhTransportToolTotalCount(0)
  val minTransportToolCount: Any = maxAndMinAndAvgWhTransportToolTotalCount(1)
  val avgTransportToolCount: Any = maxAndMinAndAvgWhTransportToolTotalCount(2)

  // 各区域发车次数
  val areaTransportToolTotalCountDF = ttWsDetailByDayDF.groupBy("city_id").agg(count($"id").alias("cnt")).persist(StorageLevel.DISK_ONLY_2)
  val areaTransportToolTotalCount = areaTransportToolTotalCountDF.agg(sum($"cnt")).first()
  // 各区域最大发车次数、最小发车次数和平均发车次数
  val maxAndMinAndAvgAreaTransportToolTotalCount = areaTransportToolTotalCountDF.agg(max($"cnt"), min($"cnt"), avg($"cnt")).first()
  val maxAreaTransportToolTotalCount: Any = maxAndMinAndAvgAreaTransportToolTotalCount(0)
  val minAreaTransportToolTotalCount = maxAndMinAndAvgAreaTransportToolTotalCount(1)
  val avgAreaTransportToolTotalCount = maxAndMinAndAvgAreaTransportToolTotalCount(2)

  // 各公司发车次数
  val companyTransportToolTotalCountDF = ttWsDetailByDayDF.groupBy("company_id").agg(count($"id").alias("cnt")).persist(StorageLevel.DISK_ONLY_2)
  val companyTransportToolTotalCount = companyTransportToolTotalCountDF.agg(sum($"cnt")).first()
  // 各公司最大发车次数、最小发车次数和平均发车次数
  val maxAndMinAndAvgCompanyTransportToolTotalCount = companyTransportToolTotalCountDF.agg(max($"cnt"), min($"cnt"), avg($"cnt")).first()
  val maxCompanyTransportToolTotalCount = maxAndMinAndAvgCompanyTransportToolTotalCount(0)
  val minCompanyTransportToolTotalCount = maxAndMinAndAvgCompanyTransportToolTotalCount(1)
  val avgCompanyTransportToolTotalCount = maxAndMinAndAvgCompanyTransportToolTotalCount(2)

  //将计算好的指标数据封装到row对象中
  val rowInfo: Row = Row(
    day,
    if(whTransportToolTotalCount.isNullAt(0)) 0L else whTransportToolTotalCount.get(0).asInstanceOf[Number].longValue(),
    if(maxTransportToolCount==null) 0L else maxTransportToolCount.asInstanceOf[Number].longValue(),
    if(minTransportToolCount==null) 0L else minTransportToolCount.asInstanceOf[Number].longValue(),
    if(avgTransportToolCount==null) 0L else avgTransportToolCount.asInstanceOf[Number].longValue(),
    if(areaTransportToolTotalCount.isNullAt(0)) 0L else areaTransportToolTotalCount.get(0).asInstanceOf[Number].longValue(),
    if(maxAreaTransportToolTotalCount==null) 0L else maxAreaTransportToolTotalCount.asInstanceOf[Number].longValue(),
    if(minAreaTransportToolTotalCount==null) 0L else minAreaTransportToolTotalCount.asInstanceOf[Number].longValue(),
    if(avgAreaTransportToolTotalCount==null) 0L else avgAreaTransportToolTotalCount.asInstanceOf[Number].longValue(),
    if(companyTransportToolTotalCount.isNullAt(0)) 0L else companyTransportToolTotalCount.get(0).asInstanceOf[Number].longValue(),
    if(maxCompanyTransportToolTotalCount==null) 0L else maxCompanyTransportToolTotalCount.asInstanceOf[Number].longValue(),
    if(minCompanyTransportToolTotalCount==null) 0L else minCompanyTransportToolTotalCount.asInstanceOf[Number].longValue(),
    if(avgCompanyTransportToolTotalCount==null) 0L else avgCompanyTransportToolTotalCount.asInstanceOf[Number].longValue()
  )

  wsRows.append(rowInfo)
  println(rowInfo)

  ttWsDetailByDayDF.unpersist()
  areaTransportToolTotalCountDF.unpersist()
  companyTransportToolTotalCountDF.unpersist()
  whTransportToolTotalCountDF.unpersist()
})

2.4、通过StructType构建指定Schema

代码语言:javascript复制
//定义指标结果表的shema信息
//网点车辆相关的表结构数据
val schemaDot: StructType = StructType(Array(
  StructField("id", StringType, false, Metadata.empty),
  StructField("ttDotTotalCount", LongType, true, Metadata.empty),           //各网点发车次数
  StructField("maxTtDotTotalCount", LongType, true, Metadata.empty),        //各网点最大发车次数
  StructField("minTtDotTotalCount", LongType, true, Metadata.empty),        //各网点最小发车次数
  StructField("avgTtDotTotalCount", LongType, true, Metadata.empty),        //各网点平均发车次数
  StructField("areaDotTotalCount", LongType, true, Metadata.empty),         //各区域发车次数
  StructField("maxAreaDotTotalCount", LongType, true, Metadata.empty),      //各区域最大发车次数
  StructField("minAreaDotTotalCount", LongType, true, Metadata.empty),      //各区域最小发车次数
  StructField("avgAreaDotTotalCount", LongType, true, Metadata.empty),      //各区域平均发车次数
  StructField("companyDotTotalCount", LongType, true, Metadata.empty),      //各公司发车次数
  StructField("maxCompanyDotTotalCount", LongType, true, Metadata.empty),   //各公司最大发车次数
  StructField("minCompanyDotTotalCount", LongType, true, Metadata.empty),   //各公司最小发车次数
  StructField("avgCompanyDotTotalCount", LongType, true, Metadata.empty)    //各公司平均发车次数
))

//仓库车辆相关的表结构数据
val schemaWs: StructType = StructType(Array(
  StructField("id", StringType, false, Metadata.empty),
  StructField("whTransportToolTotalCount", LongType, true, Metadata.empty),           //各仓库发车次数
  StructField("maxWhTransportToolTotalCount", LongType, true, Metadata.empty),        //各仓库最大发车次数
  StructField("minWhTransportToolTotalCount", LongType, true, Metadata.empty),        //各仓库最小发车次数
  StructField("avgWhTransportToolTotalCount", LongType, true, Metadata.empty),        //各仓库平均发车次数
  StructField("areaTransportToolTotalCount", LongType, true, Metadata.empty),         //各区域发车次数
  StructField("maxAreaTransportToolTotalCount", LongType, true, Metadata.empty),      //各区域最大发车次数
  StructField("minAreaTransportToolTotalCount", LongType, true, Metadata.empty),      //各区域最小发车次数
  StructField("avgAreaTransportToolTotalCount", LongType, true, Metadata.empty),      //各区域平均发车次数
  StructField("companyTransportToolTotalCount", LongType, true, Metadata.empty),      //各公司发车次数
  StructField("maxCompanyTransportToolTotalCount", LongType, true, Metadata.empty),   //各公司最大发车次数
  StructField("minCompanyTransportToolTotalCount", LongType, true, Metadata.empty),   //各公司最小发车次数
  StructField("avgCompanyTransportToolTotalCount", LongType, true, Metadata.empty)    //各公司平均发车次数
))

2.5、持久化指标数据到kudu表

代码语言:javascript复制
//TODO 5)将计算好的指标数据写入到kudu数据库中
val dotRDD: RDD[Row] = sparkSession.sparkContext.makeRDD(dotRows)
val wsRDD: RDD[Row] = sparkSession.sparkContext.makeRDD(wsRows)

val dotDataFrame: DataFrame = sparkSession.createDataFrame(dotRDD, schemaDot)
val wsDataFrame: DataFrame = sparkSession.createDataFrame(wsRDD, schemaWs)

save(dotDataFrame, OfflineTableDefine.ttDotSummary)
save(wsDataFrame, OfflineTableDefine.ttWsSummary)

0 人点赞