客快物流大数据项目(六十四):运单主题

2022-05-16 13:43:26 浏览数 (2)

运单主题

一、背景介绍

运单是运输合同的证明,是承运人已经接收货物的收据。一份运单,填写一个托运人、收货人、起运港、到达港。如同一托运人的货物分别属到达港的两个或两个以上收货人,则应分别填制运单。”

运单统计根据区域、公司、网点、线路、运输工具等维度进行统计,可以对各个维度运单数量进行排行,如对网点运单进行统计可以反映该网点的运营情况,对线路运单进行统计可以观察每个线路的运力情况

二、指标明细

指标列表

维度

运单数

总运单数

最大运单数

最大区域运单数

各分公司最大运单数

各网点最大运单数

各线路最大运单数

各运输工具最大运单数

各类客户最大运单数

最小运单数

各区域最小运单数

各分公司最小运单数

各网点最小运单数

各线路最小运单数

各运输工具最小运单数

各类客户最小运单数

平均运单数

各区域平均运单数

各分公司平均运单数

各网点平均运单数

各线路平均运单数

各运输工具平均运单数

各类客户平均运单数

三、​​​​​​​表关联关系

​​​​​​​1、事实表

表名

描述

tbl_waybill

运单表

2、​​​​​​​维度表

表名

描述

tbl_courier

快递员表

tbl_transport_record

转运记录表

tbl_areas

区域表

tbl_route

线路表

tbl_dot

网点表

tbl_company

公司表

tbl_warehouse

仓库表

tbl_customer

客户表

tbl_company_warehouse_map

公司仓库关联表

tbl_transport_tool

车辆表

tbl_codes

物流系统码表

3、​​​​​​​关联关系

运单表与维度表的关联关系如下:

四、​​​​​​​运单数据拉宽开发

1、​​​​​​​拉宽后的字段

字段名

别名

字段描述

tbl_waybill

id

id

运单id

tbl_waybill

expressBillNumber

express_bill_number

快递单编号

tbl_waybill

waybillNumber

waybill_number

运单编号

tbl_waybill

cid

cid

客户id

tbl_customer

name

cname

客户名字

tbl_codes

consumerType

ctype

客户类型编号

tbl_codes

consumerTypeName

ctype_name

客户类型名称

tbl_waybill

eid

eid

收件员工ID

tbl_courier

name

ename

收件员工名称

tbl_dot

id

dot_id

网点id

tbl_dot

dotName

dot_name

网点名称

tbl_areas

id

area_id

区域id

tbl_areas

name

area_name

区域名称

tbl_waybill

orderChannelId

order_channel_id

下单渠道id

tbl_codes

orderChannelName

order_channel_name

下单渠道名称

tbl_waybill

orderDt

order_dt

下单时间

tbl_waybill

orderTerminalType

order_terminal_type

下单设备类型ID

tbl_waybill

orderTerminalOsType

order_terminal_os_type

下单设备操作系统ID

tbl_waybill

reserveDt

reserve_dt

预约取件时间

tbl_waybill

isCollectPackageTimeout

is_collect_package_timeout

是否取件超时

tbl_waybill

pkgId

pkg_id

订装ID

tbl_waybill

pkgNumber

pkg_number

订装编号

tbl_waybill

timeoutDt

timeout_dt

超时时间

tbl_waybill

transformType

transform_type

运输方式

tbl_waybill

deliveryCustomerName

delivery_customer_name

发货人

tbl_waybill

deliveryAddr

delivery_addr

发货地址

tbl_waybill

deliveryMobile

delivery_mobile

发货人手机

tbl_waybill

deliveryTel

delivery_tel

发货人电话

tbl_waybill

receiveCustomerName

receive_customer_name

收货人

tbl_waybill

receiveAddr

receive_addr

收货地址

tbl_waybill

receiveMobile

receive_mobile

收货人手机

tbl_waybill

receiveTel

receive_tel

收货人电话

tbl_waybill

cdt

cdt

创建时间

tbl_waybill

udt

udt

修改时间

tbl_waybill

remark

remark

运单备注

tbl_transport_record

swId

sw_id

发货仓库id

tbl_warehouse

name

sw_name

发货仓库名称

tbl_company

company_name

sw_company_name

发货公司名称

tbl_transport_record

ewId

ew_id

下一站仓库id

tbl_warehouse

name

ew_name

下一站仓库名称

tbl_company

companyName

ew_company_name

下一站公司名称

tbl_transport_tool

id

tt_id

运输工具id

tbl_transport_tool

licensePlate

tt_name

运输工具牌照

tbl_transport_record

routeId

route_id

运输线路id

tbl_route

start_station end_station

route_name

运输线路起点和终点组合

tbl_waybill

yyyyMMdd(cdt)

Day

创建时间 年月日格式

2、​​​​​​​SQL语句

代码语言:javascript复制
SELECT 
WAYBILL."id" ,
WAYBILL."express_bill_number",
WAYBILL."waybill_number" ,
WAYBILL."cid" ,
customer."name" AS cname,
customercodes."code" AS ctype,
customercodes."code_desc" AS ctype_name,
WAYBILL."eid" ,
courier."name" AS ename,
DOT ."id" AS dot_id,
DOT ."dot_name",
AREA."id"  AS area_id,
AREA ."name" AS area_name,
WAYBILL."order_channel_id" ,
orderchannelcodes."code_desc" AS "order_channel_name",
WAYBILL."order_dt" ,
WAYBILL."order_terminal_type" ,
WAYBILL."order_terminal_os_type" ,
WAYBILL."reserve_dt" ,
WAYBILL."is_collect_package_timeout" ,
WAYBILL."pkg_id" ,
WAYBILL."pkg_number" ,
WAYBILL."timeout_dt" ,
WAYBILL."transform_type",
WAYBILL."delivery_customer_name" ,
WAYBILL."delivery_addr" ,
WAYBILL."delivery_mobile" ,
WAYBILL."delivery_tel" ,
WAYBILL."cdt" ,
WAYBILL."udt" ,
WAYBILL."remark" ,
record."sw_id",
swarehouse."name" AS sw_name,
scompany."company_name" AS sw_company_name,
record."ew_id",
ewarehouse."name" AS ew_name,
ecompany."company_name" AS ew_company_name,
tool."id" AS tt_id,
tool."license_plate"  AS tt_name,
RECORD ."route_id",
concat(ROUTE ."start_station",ROUTE ."end_station") AS route_name
FROM "tbl_waybill" WAYBILL
LEFT JOIN "tbl_codes" orderchannelcodes ON orderchannelcodes."type" =18 AND WAYBILL."order_channel_id" =orderchannelcodes."code"  
LEFT JOIN "tbl_customer" customer ON WAYBILL."cid" = CUSTOMER ."id" 
LEFT JOIN "tbl_codes" customercodes ON customercodes."type" =16 AND customer."type" =customercodes."code"  
LEFT JOIN "tbl_courier" courier ON WAYBILL."eid" = courier."id" 
LEFT JOIN "tbl_dot" dot ON courier."dot_id" =dot."id" 
LEFT JOIN "tbl_areas" area ON area."id" = DOT ."manage_area_id"
LEFT JOIN "tbl_transport_record" record ON RECORD ."pw_waybill_number" = WAYBILL."waybill_number" 
LEFT JOIN "tbl_warehouse" swarehouse ON swarehouse."id" = record."sw_id"
LEFT JOIN "tbl_warehouse" ewarehouse ON ewarehouse."id" = record."ew_id"
LEFT JOIN "tbl_transport_tool" tool ON tool."id"=record."transport_tool_id"
LEFT JOIN "tbl_route" route ON record."route_id"=ROUTE ."id" 
LEFT JOIN "tbl_company_warehouse_map" swarehouse_map ON record."sw_id"=swarehouse_map."warehouse_id" 
LEFT JOIN "tbl_company"  scompany ON scompany."id"=swarehouse_map."company_id" 
LEFT JOIN "tbl_company_warehouse_map" ewarehouse_map ON record."ew_id"=ewarehouse_map."warehouse_id" 
LEFT JOIN "tbl_company"  ecompany ON ecompany."id"=ewarehouse_map."company_id" 
ORDER BY WAYBILL."id"  ASC 

3、​​​​​​​Spark实现

​​​​​​​初始化环境变量

初始化运单明细拉宽作业的环境变量

代码语言:javascript复制
package cn.it.logistics.offline.dwd

import cn.it.logistics.common.{Configuration, SparkUtils}
import cn.it.logistics.offline.OfflineApp
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession

object WayBillDWD extends OfflineApp{
  //定义应用的名称
  val appName = this.getClass.getSimpleName

  /**
   * 入口方法
   * @param args
   */
  def main(args: Array[String]): Unit = {
    /**
     * 实现步骤:
     * 1)创建SparkConf对象
     * 2)创建SparkSession对象
     * 3)读取运单明细宽表的数据
     * 4)对运单明细宽表的数据进行指标的计算
     * 5)将计算好的指标数据写入到kudu数据库中
     * 5.1:定义指标结果表的schema信息
     * 5.2:组织需要写入到kudu表的数据
     * 5.3:判断指标结果表是否存在,如果不存在则创建
     * 5.4:将数据写入到kudu表中
     * 6)删除缓存数据
     * 7)停止任务,退出sparksession
     */

    //TODO 1)创建SparkConf对象
    val sparkConf: SparkConf = SparkUtils.autoSettingEnv(
      SparkUtils.sparkConf(appName)
    )

    //TODO 2)创建SparkSession对象
    val sparkSession: SparkSession = SparkUtils.getSparkSession(sparkConf)
    sparkSession.sparkContext.setLogLevel(Configuration.LOG_OFF)

    //处理数据
    execute(sparkSession)
  }

  /**
   * 数据处理
   *
   * @param sparkSession
   */
  override def execute(sparkSession: SparkSession): Unit = {
    sparkSession.stop()
  }
}

​​​​加载快递单相关的表数据并缓存

  • 加载运单表的时候,需要指定日期条件,因为运单主题最终需要Azkaban定时调度执行,每天执行一次增量数据,因此需要指定日期。
  • 判断是否是首次运行,如果是首次运行的话,则全量装载数据(含历史数据)
代码语言:javascript复制
//TODO 3)加载kudu中的事实表和维度表的数据(将加载后的数据进行缓存)
//3.1:加载运单事实表的数据
val wayBillDF: DataFrame = getKuduSource(sparkSession, TableMapping.waybill, Configuration.isFirstRunnable)
  .persist(StorageLevel.DISK_ONLY_2) //将数据缓存两个节点的磁盘目录,避免单机故障导致的缓存数据丢失

//3.2:加载快递员表
val courierDF: DataFrame = getKuduSource(sparkSession, TableMapping.courier, true).persist(StorageLevel.DISK_ONLY_2)

//3.3:加载网点表
val dotDF: DataFrame = getKuduSource(sparkSession, TableMapping.dot, true).persist(StorageLevel.DISK_ONLY_2)

//3.4:加载区域表
val areasDF: DataFrame = getKuduSource(sparkSession, TableMapping.areas, true).persist(StorageLevel.DISK_ONLY_2)

//3.5:加载转运记录表
val recordDF: DataFrame = getKuduSource(sparkSession, TableMapping.transportRecord, true).persist(StorageLevel.DISK_ONLY_2)

//3.6:加载起始仓库表
val startWarehouseDF: DataFrame = getKuduSource(sparkSession, TableMapping.warehouse, true).persist(StorageLevel.DISK_ONLY_2)

//3.7:加载到达仓库表
val endWarehouseDF: DataFrame = getKuduSource(sparkSession, TableMapping.warehouse, true).persist(StorageLevel.DISK_ONLY_2)

//3.8:加载车辆表
val toolDF: DataFrame = getKuduSource(sparkSession, TableMapping.transportTool, true).persist(StorageLevel.DISK_ONLY_2)

//3.9:加载线路表
val routeDF: DataFrame = getKuduSource(sparkSession, TableMapping.route, true).persist(StorageLevel.DISK_ONLY_2)

//3.10:加载起始仓库关联表
val startCompanyWarehouseDF: DataFrame = getKuduSource(sparkSession, TableMapping.companyWarehouseMap, true).persist(StorageLevel.DISK_ONLY_2)

//3.11:加载到达仓库关联表
val endCompanyWarehouseDF: DataFrame = getKuduSource(sparkSession, TableMapping.companyWarehouseMap, true).persist(StorageLevel.DISK_ONLY_2)

//3.12:加载起始仓库所在公司表
val startCompanyDF: DataFrame = getKuduSource(sparkSession, TableMapping.company, true).persist(StorageLevel.DISK_ONLY_2)

//3.13:加载到达仓库所在公司表
val endCompanyDF: DataFrame = getKuduSource(sparkSession, TableMapping.company, true).persist(StorageLevel.DISK_ONLY_2)

//3.14:加载物流码表
val codesDF: DataFrame = getKuduSource(sparkSession, TableMapping.codes, true).persist(StorageLevel.DISK_ONLY_2)

//3.15:加载客户表
val customerDF: DataFrame = getKuduSource(sparkSession, TableMapping.customer, true).persist(StorageLevel.DISK_ONLY_2)

//导入隐士转换
import  sparkSession.implicits._

//下单渠道类型表
val orderChannelTypeDF: DataFrame = codesDF.where($"type" === CodeTypeMapping.OrderChannelType).select(
  $"code".as("orderChannelTypeCode"), $"codeDesc".as("orderChannelTypeName"))

//客户类型表
val customerTypeDF: DataFrame = codesDF.where($"type" === CodeTypeMapping.CustomType).select(
  $"code".as("customerTypeCode"), $"codeDesc".as("customerTypeName"))

​​​​​​​定义表的关联关系

  • 为了在DWS层任务中方便的获取每日增量运单数据(根据日期),因此在DataFrame基础上动态增加列(day),指定日期格式为yyyyMMdd

代码如下:​​​​​​​

代码语言:javascript复制
//TODO 4)定义维度表与事实表的关联
val left_outer = "left_outer"
val wayBillDetailDF = wayBillDF
  .join(courierDF, wayBillDF("eid") === courierDF("id"), left_outer) //运单表与快递员表进行关联
  .join(dotDF, courierDF("dotId") === dotDF("id"), left_outer) //网点表与快递员表进行关联
  .join(areasDF, areasDF("id") === dotDF("manageAreaId"), left_outer) //网点表与区域表进行关联
  .join(recordDF, recordDF("pwWaybillNumber") === wayBillDF("waybillNumber"), left_outer) //转运记录表与运单表关联
  .join(startWarehouseDF, startWarehouseDF("id") === recordDF("swId"), left_outer) //起始仓库与转运记录表关联
  .join(endWarehouseDF, endWarehouseDF("id") === recordDF("ewId"), left_outer) //到达仓库与转运记录表关联
  .join(toolDF, toolDF("id") === recordDF("transportToolId"), left_outer) //转运记录表与交通工具表关联
  .join(routeDF, routeDF("id") === recordDF("routeId"), left_outer) //转运记录表与路线表关联
  .join(startCompanyWarehouseDF, startCompanyWarehouseDF("warehouseId") === startWarehouseDF("id"), left_outer) //起始仓库表与仓库公司关联表关联
  .join(startCompanyDF, startCompanyDF("id") === startCompanyWarehouseDF("companyId"), left_outer) //公司表与起始仓库公司关联表关联
  .join(endCompanyWarehouseDF, endCompanyWarehouseDF("warehouseId") === endWarehouseDF("id"), left_outer) //到达仓库表与仓库公司关联表关联
  .join(endCompanyDF, endCompanyDF("id") === endCompanyWarehouseDF("companyId"), left_outer) //公司表与到达仓库公司关联表关联
  .join(customerDF, customerDF("id") === wayBillDF("cid"), left_outer) //运单表与客户表关联
  .join(orderChannelTypeDF, orderChannelTypeDF("orderChannelTypeCode") ===  wayBillDF("orderChannelId"), left_outer) //下单渠道表与运单表关联
  .join(customerTypeDF, customerTypeDF("customerTypeCode") === customerDF("type"), left_outer) //客户类型表与客户表关联
  .withColumn("day", date_format(wayBillDF("cdt"), "yyyyMMdd")) //增加日期列
  .sort(wayBillDF.col("cdt").asc) //根据运单表的创建时间顺序排序
  .select(
    wayBillDF("id"),//运单id
    wayBillDF("expressBillNumber").as("express_bill_number"), //快递单编号
    wayBillDF("waybillNumber").as("waybill_number"),//运单编号
    wayBillDF("cid"), //客户id
    customerDF("name").as("cname"),//客户名称
    customerDF("type").as("ctype"),//客户类型
    customerTypeDF("customerTypeName").as("ctype_name"),//客户类型名称
    wayBillDF("eid"), //快递员id
    courierDF("name").as("ename"),//快递员名称
    dotDF("id").as("dot_id"),//网点id
    dotDF("dotName").as("dot_name"),//网点名称
    areasDF("id").as("area_id"),//区域id
    areasDF("name").as("area_name"),//区域名称
    wayBillDF("orderChannelId").as("order_channel_id"),//渠道id
    orderChannelTypeDF("orderChannelTypeName").as("order_chanel_name"),//渠道名称
    wayBillDF("orderDt").as("order_dt"),//下单时间
    wayBillDF("orderTerminalType").as("order_terminal_type"),//下单设备类型
    wayBillDF("orderTerminalOsType").as("order_terminal_os_type"),//下单设备操作系统类型
    wayBillDF("reserveDt").as("reserve_dt"),//预约取件时间
    wayBillDF("isCollectPackageTimeout").as("is_collect_package_timeout"),//是否取件超时
    wayBillDF("pkgId").as("pkg_id"),//订装ID
    wayBillDF("pkgNumber").as("pkg_number"),//订装编号
    wayBillDF("timeoutDt").as("timeout_dt"),//超时时间
    wayBillDF("transformType").as("transform_type"),//运输方式
    wayBillDF("deliveryAddr").as("delivery_addr"),
    wayBillDF("deliveryCustomerName").as("delivery_customer_name"),
    wayBillDF("deliveryMobile").as("delivery_mobile"),
    wayBillDF("deliveryTel").as("delivery_tel"),
    wayBillDF("receiveAddr").as("receive_addr"),
    wayBillDF("receiveCustomerName").as("receive_customer_name"),
    wayBillDF("receiveMobile").as("receive_mobile"),
    wayBillDF("receiveTel").as("receive_tel"),
    wayBillDF("cdt"),
    wayBillDF("udt"),
    wayBillDF("remark"),
    recordDF("swId").as("sw_id"),
    startWarehouseDF("name").as("sw_name"),
    startCompanyDF("id").as("sw_company_id"),
    startCompanyDF("companyName").as("sw_company_name"),
    recordDF("ewId").as("ew_id"),
    endWarehouseDF("name").as("ew_name"),
    endCompanyDF("id").as("ew_company_id"),
    endCompanyDF("companyName").as("ew_company_name"),
    toolDF("id").as("tt_id"),
    toolDF("licensePlate").as("tt_name"),
    recordDF("routeId").as("route_id"),
    functions.concat(routeDF("startStation"), routeDF("endStation")).as("route_name"),
    $"day"
  )

​​​​​​​创建运单明细宽表并将运单明细数据写入到kudu表中

运单宽表数据需要保存到kudu中,因此在第一次执行快递单明细拉宽操作时,运单明细宽表是不存在的,因此需要实现自动判断宽表是否存在,如果不存在则创建

实现步骤:

  • WaybillDWD 单例对象中调用save方法

实现过程:

  • WaybillDWD 单例对象Main方法中调用save方法
代码语言:javascript复制
//TODO 5)将拉宽后的数据再次写回到kudu数据库中(DWD明细层)
save(wayBillDetailDF, OfflineTableDefine.wayBillDetail)

​​​​​​​删除缓存数据

为了释放资源,运单明细宽表数据计算完成以后,需要将缓存的源表数据删除。

代码语言:javascript复制
//TODO 6):将缓存的数据删除掉
wayBillDF.unpersist()
courierDF.unpersist()
dotDF.unpersist()
areasDF.unpersist()
recordDF.unpersist()
startWarehouseDF.unpersist()
endWarehouseDF.unpersist()
toolDF.unpersist()
routeDF.unpersist()
startCompanyWarehouseDF.unpersist()
startCompanyDF.unpersist()
endCompanyWarehouseDF.unpersist()
endCompanyDF.unpersist()
customerDF.unpersist()
orderChannelTypeDF.unpersist()
customerTypeDF.unpersist()

​​​​​​​完整代码

代码语言:javascript复制
package cn.it.logistics.offline.dwd

import cn.it.logistics.common.{CodeTypeMapping, Configuration, OfflineTableDefine, SparkUtils, TableMapping}
import cn.it.logistics.offline.OfflineApp
import org.apache.spark.SparkConf
import org.apache.spark.sql.functions.date_format
import org.apache.spark.sql.{DataFrame, SparkSession, functions}
import org.apache.spark.storage.StorageLevel

object WayBillDWD extends OfflineApp{
  //定义应用的名称
  val appName = this.getClass.getSimpleName

  /**
   * 入口方法
   * @param args
   */
  def main(args: Array[String]): Unit = {
    /**
     * 实现步骤:
     * 1)创建SparkConf对象
     * 2)创建SparkSession对象
     * 3)读取运单明细宽表的数据
     * 4)对运单明细宽表的数据进行指标的计算
     * 5)将计算好的指标数据写入到kudu数据库中
     * 5.1:定义指标结果表的schema信息
     * 5.2:组织需要写入到kudu表的数据
     * 5.3:判断指标结果表是否存在,如果不存在则创建
     * 5.4:将数据写入到kudu表中
     * 6)删除缓存数据
     * 7)停止任务,退出sparksession
     */

    //TODO 1)创建SparkConf对象
    val sparkConf: SparkConf = SparkUtils.autoSettingEnv(
      SparkUtils.sparkConf(appName)
    )

    //TODO 2)创建SparkSession对象
    val sparkSession: SparkSession = SparkUtils.getSparkSession(sparkConf)
    sparkSession.sparkContext.setLogLevel(Configuration.LOG_OFF)

    //处理数据
    execute(sparkSession)
  }

  /**
   * 数据处理
   *
   * @param sparkSession
   */
  override def execute(sparkSession: SparkSession): Unit = {
    //TODO 3)加载kudu中的事实表和维度表的数据(将加载后的数据进行缓存)
    //3.1:加载运单事实表的数据
    val wayBillDF: DataFrame = getKuduSource(sparkSession, TableMapping.waybill, Configuration.isFirstRunnable)
      .persist(StorageLevel.DISK_ONLY_2) //将数据缓存两个节点的磁盘目录,避免单机故障导致的缓存数据丢失

    //3.2:加载快递员表
    val courierDF: DataFrame = getKuduSource(sparkSession, TableMapping.courier, true).persist(StorageLevel.DISK_ONLY_2)

    //3.3:加载网点表
    val dotDF: DataFrame = getKuduSource(sparkSession, TableMapping.dot, true).persist(StorageLevel.DISK_ONLY_2)

    //3.4:加载区域表
    val areasDF: DataFrame = getKuduSource(sparkSession, TableMapping.areas, true).persist(StorageLevel.DISK_ONLY_2)

    //3.5:加载转运记录表
    val recordDF: DataFrame = getKuduSource(sparkSession, TableMapping.transportRecord, true).persist(StorageLevel.DISK_ONLY_2)

    //3.6:加载起始仓库表
    val startWarehouseDF: DataFrame = getKuduSource(sparkSession, TableMapping.warehouse, true).persist(StorageLevel.DISK_ONLY_2)

    //3.7:加载到达仓库表
    val endWarehouseDF: DataFrame = getKuduSource(sparkSession, TableMapping.warehouse, true).persist(StorageLevel.DISK_ONLY_2)

    //3.8:加载车辆表
    val toolDF: DataFrame = getKuduSource(sparkSession, TableMapping.transportTool, true).persist(StorageLevel.DISK_ONLY_2)

    //3.9:加载线路表
    val routeDF: DataFrame = getKuduSource(sparkSession, TableMapping.route, true).persist(StorageLevel.DISK_ONLY_2)

    //3.10:加载起始仓库关联表
    val startCompanyWarehouseDF: DataFrame = getKuduSource(sparkSession, TableMapping.companyWarehouseMap, true).persist(StorageLevel.DISK_ONLY_2)

    //3.11:加载到达仓库关联表
    val endCompanyWarehouseDF: DataFrame = getKuduSource(sparkSession, TableMapping.companyWarehouseMap, true).persist(StorageLevel.DISK_ONLY_2)

    //3.12:加载起始仓库所在公司表
    val startCompanyDF: DataFrame = getKuduSource(sparkSession, TableMapping.company, true).persist(StorageLevel.DISK_ONLY_2)

    //3.13:加载到达仓库所在公司表
    val endCompanyDF: DataFrame = getKuduSource(sparkSession, TableMapping.company, true).persist(StorageLevel.DISK_ONLY_2)

    //3.14:加载物流码表
    val codesDF: DataFrame = getKuduSource(sparkSession, TableMapping.codes, true).persist(StorageLevel.DISK_ONLY_2)

    //3.15:加载客户表
    val customerDF: DataFrame = getKuduSource(sparkSession, TableMapping.customer, true).persist(StorageLevel.DISK_ONLY_2)

    //导入隐士转换
    import  sparkSession.implicits._

    //下单渠道类型表
    val orderChannelTypeDF: DataFrame = codesDF.where($"type" === CodeTypeMapping.OrderChannelType).select(
      $"code".as("orderChannelTypeCode"), $"codeDesc".as("orderChannelTypeName"))

    //客户类型表
    val customerTypeDF: DataFrame = codesDF.where($"type" === CodeTypeMapping.CustomType).select(
      $"code".as("customerTypeCode"), $"codeDesc".as("customerTypeName"))

    //TODO 4)定义维度表与事实表的关联
    val left_outer = "left_outer"
    val wayBillDetailDF = wayBillDF
      .join(courierDF, wayBillDF("eid") === courierDF("id"), left_outer) //运单表与快递员表进行关联
      .join(dotDF, courierDF("dotId") === dotDF("id"), left_outer) //网点表与快递员表进行关联
      .join(areasDF, areasDF("id") === dotDF("manageAreaId"), left_outer) //网点表与区域表进行关联
      .join(recordDF, recordDF("pwWaybillNumber") === wayBillDF("waybillNumber"), left_outer) //转运记录表与运单表关联
      .join(startWarehouseDF, startWarehouseDF("id") === recordDF("swId"), left_outer) //起始仓库与转运记录表关联
      .join(endWarehouseDF, endWarehouseDF("id") === recordDF("ewId"), left_outer) //到达仓库与转运记录表关联
      .join(toolDF, toolDF("id") === recordDF("transportToolId"), left_outer) //转运记录表与交通工具表关联
      .join(routeDF, routeDF("id") === recordDF("routeId"), left_outer) //转运记录表与路线表关联
      .join(startCompanyWarehouseDF, startCompanyWarehouseDF("warehouseId") === startWarehouseDF("id"), left_outer) //起始仓库表与仓库公司关联表关联
      .join(startCompanyDF, startCompanyDF("id") === startCompanyWarehouseDF("companyId"), left_outer) //公司表与起始仓库公司关联表关联
      .join(endCompanyWarehouseDF, endCompanyWarehouseDF("warehouseId") === endWarehouseDF("id"), left_outer) //到达仓库表与仓库公司关联表关联
      .join(endCompanyDF, endCompanyDF("id") === endCompanyWarehouseDF("companyId"), left_outer) //公司表与到达仓库公司关联表关联
      .join(customerDF, customerDF("id") === wayBillDF("cid"), left_outer) //运单表与客户表关联
      .join(orderChannelTypeDF, orderChannelTypeDF("orderChannelTypeCode") ===  wayBillDF("orderChannelId"), left_outer) //下单渠道表与运单表关联
      .join(customerTypeDF, customerTypeDF("customerTypeCode") === customerDF("type"), left_outer) //客户类型表与客户表关联
      .withColumn("day", date_format(wayBillDF("cdt"), "yyyyMMdd")) //增加日期列
      .sort(wayBillDF.col("cdt").asc) //根据运单表的创建时间顺序排序
      .select(
        wayBillDF("id"),//运单id
        wayBillDF("expressBillNumber").as("express_bill_number"), //快递单编号
        wayBillDF("waybillNumber").as("waybill_number"),//运单编号
        wayBillDF("cid"), //客户id
        customerDF("name").as("cname"),//客户名称
        customerDF("type").as("ctype"),//客户类型
        customerTypeDF("customerTypeName").as("ctype_name"),//客户类型名称
        wayBillDF("eid"), //快递员id
        courierDF("name").as("ename"),//快递员名称
        dotDF("id").as("dot_id"),//网点id
        dotDF("dotName").as("dot_name"),//网点名称
        areasDF("id").as("area_id"),//区域id
        areasDF("name").as("area_name"),//区域名称
        wayBillDF("orderChannelId").as("order_channel_id"),//渠道id
        orderChannelTypeDF("orderChannelTypeName").as("order_chanel_name"),//渠道名称
        wayBillDF("orderDt").as("order_dt"),//下单时间
        wayBillDF("orderTerminalType").as("order_terminal_type"),//下单设备类型
        wayBillDF("orderTerminalOsType").as("order_terminal_os_type"),//下单设备操作系统类型
        wayBillDF("reserveDt").as("reserve_dt"),//预约取件时间
        wayBillDF("isCollectPackageTimeout").as("is_collect_package_timeout"),//是否取件超时
        wayBillDF("pkgId").as("pkg_id"),//订装ID
        wayBillDF("pkgNumber").as("pkg_number"),//订装编号
        wayBillDF("timeoutDt").as("timeout_dt"),//超时时间
        wayBillDF("transformType").as("transform_type"),//运输方式
        wayBillDF("deliveryAddr").as("delivery_addr"),
        wayBillDF("deliveryCustomerName").as("delivery_customer_name"),
        wayBillDF("deliveryMobile").as("delivery_mobile"),
        wayBillDF("deliveryTel").as("delivery_tel"),
        wayBillDF("receiveAddr").as("receive_addr"),
        wayBillDF("receiveCustomerName").as("receive_customer_name"),
        wayBillDF("receiveMobile").as("receive_mobile"),
        wayBillDF("receiveTel").as("receive_tel"),
        wayBillDF("cdt"),
        wayBillDF("udt"),
        wayBillDF("remark"),
        recordDF("swId").as("sw_id"),
        startWarehouseDF("name").as("sw_name"),
        startCompanyDF("id").as("sw_company_id"),
        startCompanyDF("companyName").as("sw_company_name"),
        recordDF("ewId").as("ew_id"),
        endWarehouseDF("name").as("ew_name"),
        endCompanyDF("id").as("ew_company_id"),
        endCompanyDF("companyName").as("ew_company_name"),
        toolDF("id").as("tt_id"),
        toolDF("licensePlate").as("tt_name"),
        recordDF("routeId").as("route_id"),
        functions.concat(routeDF("startStation"), routeDF("endStation")).as("route_name"),
        $"day"
      )

    //TODO 5)将拉宽后的数据再次写回到kudu数据库中(DWD明细层)
    save(wayBillDetailDF, OfflineTableDefine.wayBillDetail)

    //TODO 6):将缓存的数据删除掉
    wayBillDF.unpersist()
    courierDF.unpersist()
    dotDF.unpersist()
    areasDF.unpersist()
    recordDF.unpersist()
    startWarehouseDF.unpersist()
    endWarehouseDF.unpersist()
    toolDF.unpersist()
    routeDF.unpersist()
    startCompanyWarehouseDF.unpersist()
    startCompanyDF.unpersist()
    endCompanyWarehouseDF.unpersist()
    endCompanyDF.unpersist()
    customerDF.unpersist()
    orderChannelTypeDF.unpersist()
    customerTypeDF.unpersist()

    sparkSession.stop()
  }
}

4、​​​​​​​测试验证

启动WaybillDWD

五、​​​​​​​运单数据指标计算开发

1、​​​​​​​计算的字段

字段名

字段描述

id

数据产生时间

totalWaybillCount

总运单数

maxAreaTotalCount

最大区域运单数

minAreaTotalCount

最小区域运单数

avgAreaTotalCount

各区域平均运单数

maxCompanyTotalCount

各分公司最大运单数

minCompanyTotalCount

各分公司最小运单数

avgCompanyTotalCount

各分公司平均运单数

maxDotTotalCount

各网点最大运单数

minDotTotalCount

各网点最小运单数

avgDotTotalCount

各网点平均运单数

maxRouteTotalCount

各线路最大运单数

minRouteTotalCount

各线路最小运单数

avgRouteTotalCount

各线路平均运单数

maxTransportToolTotalCount

各运输工具最大运单数

minTransportToolTotalCount

各运输工具最小运单数

avgTransportToolTotalCount

各运输工具平均运单数

maxCtypeTotalCount

各类客户类型最大运单数

minCtypeTotalCount

各类客户类型最小运单数

avgCtypeTotalCount

各类客户类型平均运单数

2、​​​​​​​Spark实现

实现步骤:

  • 在dws目录下创建 WaybillDWS 单例对象,继承自OfflineApp特质
  • 初始化环境的参数,创建SparkSession对象
  • 根据指定的日期获取拉宽后的运单宽表(tbl_waybill_detail)增量数据,并缓存数据
  • 判断是否是首次运行,如果是首次运行的话,则全量装载数据(含历史数据)
  • 指标计算
    • 计算总运单数
    • 最大区域运单数
    • 最小区域运单数
    • 各区域平均运单数
    • 各分公司最大运单数
    • 各分公司最小运单数
    • 各分公司平均运单数
    • 各网点最大运单数
    • 各网点最小运单数
    • 各网点平均运单数
    • 各线路最大运单数
    • 各线路最小运单数
    • 各线路平均运单数
    • 各运输工具最大运单数
    • 各运输工具最小运单数
    • 各运输工具平均运单数
    • 各类客户最大运单数
    • 各类客户最小运单数
    • 各类客户平均运单数
    • 获取当前时间yyyyMMddHH
  • 构建要持久化的指标数据(需要判断计算的指标是否有值,若没有需要赋值默认值
  • 通过StructType构建指定Schema
  • 创建运单指标数据表(若存在则不创建)
  • 持久化指标数据到kudu表

​​​​​​​初始化环境变量

代码语言:javascript复制
package cn.it.logistics.offline.dws

import cn.it.logistics.common.{Configuration, SparkUtils}
import cn.it.logistics.offline.OfflineApp
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession

/**
 * 运单指标开发
 */
object WayBillDWS extends  OfflineApp{
  //定义应用的名称
  val appName = this.getClass.getSimpleName

  /**
   * 入口方法
   * @param args
   */
  def main(args: Array[String]): Unit = {
    /**
     * 实现步骤:
     * 1)创建SparkConf对象
     * 2)创建SparkSession对象
     * 3)读取运单明细宽表的数据
     * 4)对运单明细宽表的数据进行指标的计算
     * 5)将计算好的指标数据写入到kudu数据库中
     * 5.1:定义指标结果表的schema信息
     * 5.2:组织需要写入到kudu表的数据
     * 5.3:判断指标结果表是否存在,如果不存在则创建
     * 5.4:将数据写入到kudu表中
     * 6)删除缓存数据
     * 7)停止任务,退出sparksession
     */

    //TODO 1)创建SparkConf对象
    val sparkConf: SparkConf = SparkUtils.autoSettingEnv(
      SparkUtils.sparkConf(appName)
    )

    //TODO 2)创建SparkSession对象
    val sparkSession: SparkSession = SparkUtils.getSparkSession(sparkConf)
    sparkSession.sparkContext.setLogLevel(Configuration.LOG_OFF)

    //处理数据
    execute(sparkSession)
  }

  /**
   * 数据处理
   *
   * @param sparkSession
   */
  override def execute(sparkSession: SparkSession): Unit = {
    sparkSession.stop()
  }
}

​​​​​​​加载运单宽表增量数据并缓存

加载运单宽表的时候,需要指定日期条件,因为运单主题最终需要Azkaban定时调度执行,每天执行一次增量数据,因此需要指定日期。

代码语言:javascript复制
//TODO 3)读取运单明细宽表的数据
val wayBillDetailDF: DataFrame = getKuduSource(sparkSession, OfflineTableDefine.wayBillDetail, Configuration.isFirstRunnable).toDF()

​​​​​​​指标计算

代码语言:javascript复制
//根据运单的日期进行分组
val wayBillDetailGroupByDayDF: DataFrame = wayBillDetailDF.select("day").groupBy("day").count().cache()

//导入隐式转换
import sparkSession.implicits._

//定义计算好的指标结果集合对象
val rows: ArrayBuffer[Row] = ArrayBuffer[Row]()

//循环遍历日期的数据
wayBillDetailGroupByDayDF.collect().foreach(row => {
   //获取到要处理的数据所在的日期
  val day: String = row.getAs[String](0)
  //返回指定日期的运单明细数据
  val wayBillDetailByDayDF: DataFrame = wayBillDetailDF.where(col("day") === day).toDF().persist(StorageLevel.DISK_ONLY_2)

  //TODO 4)对运单明细宽表的数据进行指标的计算
  //总运单数
  val totalWayBillCount: Long = wayBillDetailByDayDF.agg(count("waybill_number")).first().getLong(0)

  //各区域的运单数
  val areaCountDF: DataFrame = wayBillDetailByDayDF.groupBy("area_id").agg(first($"area_id"), count($"id").alias("areaTotalCount")).cache()
  //各区域的最大运单数
  val maxAreaTotalCount: Row = areaCountDF.orderBy($"areaTotalCount".desc).first()
  //各区域的最小运单数
  val minAreaTotalCount: Row = areaCountDF.orderBy($"areaTotalCount".asc).first()
  //各区域的平均运单数
  val avgAreaTotalCount: Row = areaCountDF.agg(avg($"areaTotalCount")).first()

  //各公司的运单数
  val companyCountDF: DataFrame = wayBillDetailByDayDF.groupBy("sw_company_id").agg(first($"sw_company_id"), count($"id").alias("companyTotalCount")).cache()
  //各公司的最大运单数
  val maxCompanyTotalCount: Row = companyCountDF.orderBy($"companyTotalCount".desc).first()
  //各公司的最小运单数
  val minCompanyTotalCount: Row = companyCountDF.orderBy($"companyTotalCount".asc).first()
  //各公司的平均运单数
  val avgCompanyTotalCount: Row = companyCountDF.agg(avg($"companyTotalCount")).first()

  //各网点的运单数
  val dotCountDF: DataFrame = wayBillDetailByDayDF.groupBy("dot_id").agg(first($"dot_id"), count($"id").alias("dotTotalCount")).cache()
  //各网点的最大运单数
  val maxDotTotalCount: Row = dotCountDF.orderBy($"dotTotalCount".desc).first()
  //各网点的最小运单数
  val minDotTotalCount: Row = dotCountDF.orderBy($"dotTotalCount".asc).first()
  //各网点的平均运单数
  val avgDotTotalCount: Row = dotCountDF.agg(avg($"dotTotalCount")).first()

  //各线路的运单数
  val routeCountDF: DataFrame = wayBillDetailByDayDF.groupBy("route_id").agg(first($"route_id"), count($"id").alias("routeTotalCount")).cache()
  //各线路的最大运单数
  val maxRouteTotalCount: Row = routeCountDF.orderBy($"routeTotalCount".desc).first()
  //各线路的最小运单数
  val minRouteTotalCount: Row = routeCountDF.orderBy($"routeTotalCount".asc).first()
  //各线路的平均运单数
  val avgRouteTotalCount: Row = routeCountDF.agg(avg($"routeTotalCount")).first()

  //各运输工具的运单数
  val ttCountDF: DataFrame = wayBillDetailByDayDF.groupBy("tt_id").agg(first($"tt_id"), count($"id").alias("ttTotalCount")).cache()
  //各线路的最大运单数
  val maxToolTotalCount: Row = ttCountDF.orderBy($"ttTotalCount".desc).first()
  //各线路的最小运单数
  val minToolTotalCount: Row = ttCountDF.orderBy($"ttTotalCount".asc).first()
  //各线路的平均运单数
  val avgToolTotalCount: Row = ttCountDF.agg(avg($"ttTotalCount")).first()

  //各类客户的运单数
  val cTypeCountDF: DataFrame = wayBillDetailByDayDF.groupBy("ctype").agg(first($"ctype"), count($"id").alias("cTypeTotalCount")).cache()
  //各线路的最大运单数
  val maxCTypeTotalCount: Row = cTypeCountDF.orderBy($"cTypeTotalCount".desc).first()
  //各线路的最小运单数
  val minCTypeTotalCount: Row = cTypeCountDF.orderBy($"cTypeTotalCount".asc).first()
  //各线路的平均运单数
  val avgCTypeTotalCount: Row = cTypeCountDF.agg(avg($"cTypeTotalCount")).first()

  //  5.2:组织要写入到kudu表的数据
  val rowInfo: Row = Row(
    day,
    totalWayBillCount,
    if (maxAreaTotalCount.isNullAt(0)) 0L else maxAreaTotalCount.get(0).asInstanceOf[Number].longValue(),
    if (minAreaTotalCount.isNullAt(0)) 0L else minAreaTotalCount.get(0).asInstanceOf[Number].longValue(),
    if (avgAreaTotalCount.isNullAt(0)) 0L else avgAreaTotalCount.get(0).asInstanceOf[Number].longValue(),
    if (maxCompanyTotalCount.isNullAt(0)) 0L else maxCompanyTotalCount.get(0).asInstanceOf[Number].longValue(),
    if (minCompanyTotalCount.isNullAt(0)) 0L else minCompanyTotalCount.get(0).asInstanceOf[Number].longValue(),
    if (avgCompanyTotalCount.isNullAt(0)) 0L else avgCompanyTotalCount.get(0).asInstanceOf[Number].longValue(),
    if (maxDotTotalCount.isNullAt(0)) 0L else maxDotTotalCount.get(0).asInstanceOf[Number].longValue(),
    if (minDotTotalCount.isNullAt(0)) 0L else minDotTotalCount.get(0).asInstanceOf[Number].longValue(),
    if (avgDotTotalCount.isNullAt(0)) 0L else avgDotTotalCount.get(0).asInstanceOf[Number].longValue(),
    if (maxRouteTotalCount.isNullAt(0)) 0L else maxRouteTotalCount.get(0).asInstanceOf[Number].longValue(),
    if (minRouteTotalCount.isNullAt(0)) 0L else minRouteTotalCount.get(0).asInstanceOf[Number].longValue(),
    if (avgRouteTotalCount.isNullAt(0)) 0L else avgRouteTotalCount.get(0).asInstanceOf[Number].longValue(),
    if (maxToolTotalCount.isNullAt(0)) 0L else maxToolTotalCount.get(0).asInstanceOf[Number].longValue(),
    if (minToolTotalCount.isNullAt(0)) 0L else minToolTotalCount.get(0).asInstanceOf[Number].longValue(),
    if (avgToolTotalCount.isNullAt(0)) 0L else avgToolTotalCount.get(0).asInstanceOf[Number].longValue(),
    if (maxCTypeTotalCount.isNullAt(0)) 0L else maxCTypeTotalCount.get(0).asInstanceOf[Number].longValue(),
    if (minCTypeTotalCount.isNullAt(0)) 0L else minCTypeTotalCount.get(0).asInstanceOf[Number].longValue(),
    if (avgCTypeTotalCount.isNullAt(0)) 0L else avgCTypeTotalCount.get(0).asInstanceOf[Number].longValue()
  )

  rows.append(rowInfo)
  println(rowInfo)

  //释放资源
  wayBillDetailByDayDF.unpersist()
  areaCountDF.unpersist()
  areaCountDF.unpersist()
  companyCountDF.unpersist()
  dotCountDF.unpersist()
  routeCountDF.unpersist()
  ttCountDF.unpersist()
  cTypeCountDF.unpersist()
})

​​​​​​​通过StructType构建指定Schema

代码语言:javascript复制
//TODO 5)将计算好的指标数据写入到kudu数据库中
//5.1:定义指标结果表的shcma信息
val schema: StructType = StructType(Array(
  StructField("id", StringType, true, Metadata.empty),
  StructField("totalWaybillCount", LongType, true, Metadata.empty),
  StructField("maxAreaTotalCount", LongType, true, Metadata.empty),
  StructField("minAreaTotalCount", LongType, true, Metadata.empty),
  StructField("avgAreaTotalCount", LongType, true, Metadata.empty),
  StructField("maxCompanyTotalCount", LongType, true, Metadata.empty),
  StructField("minCompanyTotalCount", LongType, true, Metadata.empty),
  StructField("avgCompanyTotalCount", LongType, true, Metadata.empty),
  StructField("maxDotTotalCount", LongType, true, Metadata.empty),
  StructField("minDotTotalCount", LongType, true, Metadata.empty),
  StructField("avgDotTotalCount", LongType, true, Metadata.empty),
  StructField("maxRouteTotalCount", LongType, true, Metadata.empty),
  StructField("minRouteTotalCount", LongType, true, Metadata.empty),
  StructField("avgRouteTotalCount", LongType, true, Metadata.empty),
  StructField("maxTransportToolTotalCount", LongType, true, Metadata.empty),
  StructField("minTransportToolTotalCount", LongType, true, Metadata.empty),
  StructField("avgTransportToolTotalCount", LongType, true, Metadata.empty),
  StructField("maxCtypeTotalCount", LongType, true, Metadata.empty),
  StructField("minCtypeTotalCount", LongType, true, Metadata.empty),
  StructField("avgCtypeTotalCount", LongType, true, Metadata.empty)
))

​​​​​​​持久化指标数据到kudu表

代码语言:javascript复制
//5.2:使用rdd和schema创建DataFrame
val data: RDD[Row] = sparkSession.sparkContext.makeRDD(rows)
val quotaDF: DataFrame = sparkSession.createDataFrame(data, schema)

//将数据写入到kudu数据库中
save(quotaDF, OfflineTableDefine.wayBillSummary)

​​​​​​​完整代码

代码语言:javascript复制
package cn.it.logistics.offline.dws

import cn.it.logistics.common.{Configuration, OfflineTableDefine, SparkUtils}
import cn.it.logistics.offline.OfflineApp
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.functions.{avg, col, count, first}
import org.apache.spark.sql.types.{LongType, Metadata, StringType, StructField, StructType}
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import org.apache.spark.storage.StorageLevel

import scala.collection.mutable.ArrayBuffer

/**
 * 运单指标开发
 */
object WayBillDWS extends  OfflineApp{
  //定义应用的名称
  val appName = this.getClass.getSimpleName

  /**
   * 入口方法
   * @param args
   */
  def main(args: Array[String]): Unit = {
    /**
     * 实现步骤:
     * 1)创建SparkConf对象
     * 2)创建SparkSession对象
     * 3)读取运单明细宽表的数据
     * 4)对运单明细宽表的数据进行指标的计算
     * 5)将计算好的指标数据写入到kudu数据库中
     * 5.1:定义指标结果表的schema信息
     * 5.2:组织需要写入到kudu表的数据
     * 5.3:判断指标结果表是否存在,如果不存在则创建
     * 5.4:将数据写入到kudu表中
     * 6)删除缓存数据
     * 7)停止任务,退出sparksession
     */

    //TODO 1)创建SparkConf对象
    val sparkConf: SparkConf = SparkUtils.autoSettingEnv(
      SparkUtils.sparkConf(appName)
    )

    //TODO 2)创建SparkSession对象
    val sparkSession: SparkSession = SparkUtils.getSparkSession(sparkConf)
    sparkSession.sparkContext.setLogLevel(Configuration.LOG_OFF)

    //处理数据
    execute(sparkSession)
  }

  /**
   * 数据处理
   *
   * @param sparkSession
   */
  override def execute(sparkSession: SparkSession): Unit = {
    //TODO 3)读取运单明细宽表的数据
    val wayBillDetailDF: DataFrame = getKuduSource(sparkSession, OfflineTableDefine.wayBillDetail, Configuration.isFirstRunnable).toDF()

    //根据运单的日期进行分组
    val wayBillDetailGroupByDayDF: DataFrame = wayBillDetailDF.select("day").groupBy("day").count().cache()

    //导入隐式转换
    import sparkSession.implicits._

    //定义计算好的指标结果集合对象
    val rows: ArrayBuffer[Row] = ArrayBuffer[Row]()

    //循环遍历日期的数据
    wayBillDetailGroupByDayDF.collect().foreach(row => {
      //获取到要处理的数据所在的日期
      val day: String = row.getAs[String](0)
      //返回指定日期的运单明细数据
      val wayBillDetailByDayDF: DataFrame = wayBillDetailDF.where(col("day") === day).toDF().persist(StorageLevel.DISK_ONLY_2)

      //TODO 4)对运单明细宽表的数据进行指标的计算
      //总运单数
      val totalWayBillCount: Long = wayBillDetailByDayDF.agg(count("waybill_number")).first().getLong(0)

      //各区域的运单数
      val areaCountDF: DataFrame = wayBillDetailByDayDF.groupBy("area_id").agg(first($"area_id"), count($"id").alias("areaTotalCount")).cache()
      //各区域的最大运单数
      val maxAreaTotalCount: Row = areaCountDF.orderBy($"areaTotalCount".desc).first()
      //各区域的最小运单数
      val minAreaTotalCount: Row = areaCountDF.orderBy($"areaTotalCount".asc).first()
      //各区域的平均运单数
      val avgAreaTotalCount: Row = areaCountDF.agg(avg($"areaTotalCount")).first()

      //各公司的运单数
      val companyCountDF: DataFrame = wayBillDetailByDayDF.groupBy("sw_company_id").agg(first($"sw_company_id"), count($"id").alias("companyTotalCount")).cache()
      //各公司的最大运单数
      val maxCompanyTotalCount: Row = companyCountDF.orderBy($"companyTotalCount".desc).first()
      //各公司的最小运单数
      val minCompanyTotalCount: Row = companyCountDF.orderBy($"companyTotalCount".asc).first()
      //各公司的平均运单数
      val avgCompanyTotalCount: Row = companyCountDF.agg(avg($"companyTotalCount")).first()

      //各网点的运单数
      val dotCountDF: DataFrame = wayBillDetailByDayDF.groupBy("dot_id").agg(first($"dot_id"), count($"id").alias("dotTotalCount")).cache()
      //各网点的最大运单数
      val maxDotTotalCount: Row = dotCountDF.orderBy($"dotTotalCount".desc).first()
      //各网点的最小运单数
      val minDotTotalCount: Row = dotCountDF.orderBy($"dotTotalCount".asc).first()
      //各网点的平均运单数
      val avgDotTotalCount: Row = dotCountDF.agg(avg($"dotTotalCount")).first()

      //各线路的运单数
      val routeCountDF: DataFrame = wayBillDetailByDayDF.groupBy("route_id").agg(first($"route_id"), count($"id").alias("routeTotalCount")).cache()
      //各线路的最大运单数
      val maxRouteTotalCount: Row = routeCountDF.orderBy($"routeTotalCount".desc).first()
      //各线路的最小运单数
      val minRouteTotalCount: Row = routeCountDF.orderBy($"routeTotalCount".asc).first()
      //各线路的平均运单数
      val avgRouteTotalCount: Row = routeCountDF.agg(avg($"routeTotalCount")).first()

      //各运输工具的运单数
      val ttCountDF: DataFrame = wayBillDetailByDayDF.groupBy("tt_id").agg(first($"tt_id"), count($"id").alias("ttTotalCount")).cache()
      //各线路的最大运单数
      val maxToolTotalCount: Row = ttCountDF.orderBy($"ttTotalCount".desc).first()
      //各线路的最小运单数
      val minToolTotalCount: Row = ttCountDF.orderBy($"ttTotalCount".asc).first()
      //各线路的平均运单数
      val avgToolTotalCount: Row = ttCountDF.agg(avg($"ttTotalCount")).first()

      //各类客户的运单数
      val cTypeCountDF: DataFrame = wayBillDetailByDayDF.groupBy("ctype").agg(first($"ctype"), count($"id").alias("cTypeTotalCount")).cache()
      //各线路的最大运单数
      val maxCTypeTotalCount: Row = cTypeCountDF.orderBy($"cTypeTotalCount".desc).first()
      //各线路的最小运单数
      val minCTypeTotalCount: Row = cTypeCountDF.orderBy($"cTypeTotalCount".asc).first()
      //各线路的平均运单数
      val avgCTypeTotalCount: Row = cTypeCountDF.agg(avg($"cTypeTotalCount")).first()

      //  5.2:组织要写入到kudu表的数据
      val rowInfo: Row = Row(
        day,
        totalWayBillCount,
        if (maxAreaTotalCount.isNullAt(0)) 0L else maxAreaTotalCount.get(0).asInstanceOf[Number].longValue(),
        if (minAreaTotalCount.isNullAt(0)) 0L else minAreaTotalCount.get(0).asInstanceOf[Number].longValue(),
        if (avgAreaTotalCount.isNullAt(0)) 0L else avgAreaTotalCount.get(0).asInstanceOf[Number].longValue(),
        if (maxCompanyTotalCount.isNullAt(0)) 0L else maxCompanyTotalCount.get(0).asInstanceOf[Number].longValue(),
        if (minCompanyTotalCount.isNullAt(0)) 0L else minCompanyTotalCount.get(0).asInstanceOf[Number].longValue(),
        if (avgCompanyTotalCount.isNullAt(0)) 0L else avgCompanyTotalCount.get(0).asInstanceOf[Number].longValue(),
        if (maxDotTotalCount.isNullAt(0)) 0L else maxDotTotalCount.get(0).asInstanceOf[Number].longValue(),
        if (minDotTotalCount.isNullAt(0)) 0L else minDotTotalCount.get(0).asInstanceOf[Number].longValue(),
        if (avgDotTotalCount.isNullAt(0)) 0L else avgDotTotalCount.get(0).asInstanceOf[Number].longValue(),
        if (maxRouteTotalCount.isNullAt(0)) 0L else maxRouteTotalCount.get(0).asInstanceOf[Number].longValue(),
        if (minRouteTotalCount.isNullAt(0)) 0L else minRouteTotalCount.get(0).asInstanceOf[Number].longValue(),
        if (avgRouteTotalCount.isNullAt(0)) 0L else avgRouteTotalCount.get(0).asInstanceOf[Number].longValue(),
        if (maxToolTotalCount.isNullAt(0)) 0L else maxToolTotalCount.get(0).asInstanceOf[Number].longValue(),
        if (minToolTotalCount.isNullAt(0)) 0L else minToolTotalCount.get(0).asInstanceOf[Number].longValue(),
        if (avgToolTotalCount.isNullAt(0)) 0L else avgToolTotalCount.get(0).asInstanceOf[Number].longValue(),
        if (maxCTypeTotalCount.isNullAt(0)) 0L else maxCTypeTotalCount.get(0).asInstanceOf[Number].longValue(),
        if (minCTypeTotalCount.isNullAt(0)) 0L else minCTypeTotalCount.get(0).asInstanceOf[Number].longValue(),
        if (avgCTypeTotalCount.isNullAt(0)) 0L else avgCTypeTotalCount.get(0).asInstanceOf[Number].longValue()
      )

      rows.append(rowInfo)
      println(rowInfo)

      //释放资源
      wayBillDetailByDayDF.unpersist()
      areaCountDF.unpersist()
      areaCountDF.unpersist()
      companyCountDF.unpersist()
      dotCountDF.unpersist()
      routeCountDF.unpersist()
      ttCountDF.unpersist()
      cTypeCountDF.unpersist()
    })

    //TODO 5)将计算好的指标数据写入到kudu数据库中
    //5.1:定义指标结果表的shcma信息
    val schema: StructType = StructType(Array(
      StructField("id", StringType, true, Metadata.empty),
      StructField("totalWaybillCount", LongType, true, Metadata.empty),
      StructField("maxAreaTotalCount", LongType, true, Metadata.empty),
      StructField("minAreaTotalCount", LongType, true, Metadata.empty),
      StructField("avgAreaTotalCount", LongType, true, Metadata.empty),
      StructField("maxCompanyTotalCount", LongType, true, Metadata.empty),
      StructField("minCompanyTotalCount", LongType, true, Metadata.empty),
      StructField("avgCompanyTotalCount", LongType, true, Metadata.empty),
      StructField("maxDotTotalCount", LongType, true, Metadata.empty),
      StructField("minDotTotalCount", LongType, true, Metadata.empty),
      StructField("avgDotTotalCount", LongType, true, Metadata.empty),
      StructField("maxRouteTotalCount", LongType, true, Metadata.empty),
      StructField("minRouteTotalCount", LongType, true, Metadata.empty),
      StructField("avgRouteTotalCount", LongType, true, Metadata.empty),
      StructField("maxTransportToolTotalCount", LongType, true, Metadata.empty),
      StructField("minTransportToolTotalCount", LongType, true, Metadata.empty),
      StructField("avgTransportToolTotalCount", LongType, true, Metadata.empty),
      StructField("maxCtypeTotalCount", LongType, true, Metadata.empty),
      StructField("minCtypeTotalCount", LongType, true, Metadata.empty),
      StructField("avgCtypeTotalCount", LongType, true, Metadata.empty)
    ))

    //5.2:使用rdd和schema创建DataFrame
    val data: RDD[Row] = sparkSession.sparkContext.makeRDD(rows)
    val quotaDF: DataFrame = sparkSession.createDataFrame(data, schema)

    //将数据写入到kudu数据库中
    save(quotaDF, OfflineTableDefine.wayBillSummary)

    sparkSession.stop()
  }
}

3、​​​​​​​测试验证

启动WaybillDWS

0 人点赞