运单主题
一、背景介绍
“运单是运输合同的证明,是承运人已经接收货物的收据。一份运单,填写一个托运人、收货人、起运港、到达港。如同一托运人的货物分别属到达港的两个或两个以上收货人,则应分别填制运单。”
运单统计根据区域、公司、网点、线路、运输工具等维度进行统计,可以对各个维度运单数量进行排行,如对网点运单进行统计可以反映该网点的运营情况,对线路运单进行统计可以观察每个线路的运力情况
二、指标明细
指标列表 | 维度 |
---|---|
运单数 | 总运单数 |
最大运单数 | 最大区域运单数 |
各分公司最大运单数 | |
各网点最大运单数 | |
各线路最大运单数 | |
各运输工具最大运单数 | |
各类客户最大运单数 | |
最小运单数 | 各区域最小运单数 |
各分公司最小运单数 | |
各网点最小运单数 | |
各线路最小运单数 | |
各运输工具最小运单数 | |
各类客户最小运单数 | |
平均运单数 | 各区域平均运单数 |
各分公司平均运单数 | |
各网点平均运单数 | |
各线路平均运单数 | |
各运输工具平均运单数 | |
各类客户平均运单数 |
三、表关联关系
1、事实表
表名 | 描述 |
---|---|
tbl_waybill | 运单表 |
2、维度表
表名 | 描述 |
---|---|
tbl_courier | 快递员表 |
tbl_transport_record | 转运记录表 |
tbl_areas | 区域表 |
tbl_route | 线路表 |
tbl_dot | 网点表 |
tbl_company | 公司表 |
tbl_warehouse | 仓库表 |
tbl_customer | 客户表 |
tbl_company_warehouse_map | 公司仓库关联表 |
tbl_transport_tool | 车辆表 |
tbl_codes | 物流系统码表 |
3、关联关系
运单表与维度表的关联关系如下:
四、运单数据拉宽开发
1、拉宽后的字段
表 | 字段名 | 别名 | 字段描述 |
---|---|---|---|
tbl_waybill | id | id | 运单id |
tbl_waybill | expressBillNumber | express_bill_number | 快递单编号 |
tbl_waybill | waybillNumber | waybill_number | 运单编号 |
tbl_waybill | cid | cid | 客户id |
tbl_customer | name | cname | 客户名字 |
tbl_codes | consumerType | ctype | 客户类型编号 |
tbl_codes | consumerTypeName | ctype_name | 客户类型名称 |
tbl_waybill | eid | eid | 收件员工ID |
tbl_courier | name | ename | 收件员工名称 |
tbl_dot | id | dot_id | 网点id |
tbl_dot | dotName | dot_name | 网点名称 |
tbl_areas | id | area_id | 区域id |
tbl_areas | name | area_name | 区域名称 |
tbl_waybill | orderChannelId | order_channel_id | 下单渠道id |
tbl_codes | orderChannelName | order_channel_name | 下单渠道名称 |
tbl_waybill | orderDt | order_dt | 下单时间 |
tbl_waybill | orderTerminalType | order_terminal_type | 下单设备类型ID |
tbl_waybill | orderTerminalOsType | order_terminal_os_type | 下单设备操作系统ID |
tbl_waybill | reserveDt | reserve_dt | 预约取件时间 |
tbl_waybill | isCollectPackageTimeout | is_collect_package_timeout | 是否取件超时 |
tbl_waybill | pkgId | pkg_id | 订装ID |
tbl_waybill | pkgNumber | pkg_number | 订装编号 |
tbl_waybill | timeoutDt | timeout_dt | 超时时间 |
tbl_waybill | transformType | transform_type | 运输方式 |
tbl_waybill | deliveryCustomerName | delivery_customer_name | 发货人 |
tbl_waybill | deliveryAddr | delivery_addr | 发货地址 |
tbl_waybill | deliveryMobile | delivery_mobile | 发货人手机 |
tbl_waybill | deliveryTel | delivery_tel | 发货人电话 |
tbl_waybill | receiveCustomerName | receive_customer_name | 收货人 |
tbl_waybill | receiveAddr | receive_addr | 收货地址 |
tbl_waybill | receiveMobile | receive_mobile | 收货人手机 |
tbl_waybill | receiveTel | receive_tel | 收货人电话 |
tbl_waybill | cdt | cdt | 创建时间 |
tbl_waybill | udt | udt | 修改时间 |
tbl_waybill | remark | remark | 运单备注 |
tbl_transport_record | swId | sw_id | 发货仓库id |
tbl_warehouse | name | sw_name | 发货仓库名称 |
tbl_company | company_name | sw_company_name | 发货公司名称 |
tbl_transport_record | ewId | ew_id | 下一站仓库id |
tbl_warehouse | name | ew_name | 下一站仓库名称 |
tbl_company | companyName | ew_company_name | 下一站公司名称 |
tbl_transport_tool | id | tt_id | 运输工具id |
tbl_transport_tool | licensePlate | tt_name | 运输工具牌照 |
tbl_transport_record | routeId | route_id | 运输线路id |
tbl_route | start_station end_station | route_name | 运输线路起点和终点组合 |
tbl_waybill | yyyyMMdd(cdt) | Day | 创建时间 年月日格式 |
2、SQL语句
代码语言:javascript复制SELECT
WAYBILL."id" ,
WAYBILL."express_bill_number",
WAYBILL."waybill_number" ,
WAYBILL."cid" ,
customer."name" AS cname,
customercodes."code" AS ctype,
customercodes."code_desc" AS ctype_name,
WAYBILL."eid" ,
courier."name" AS ename,
DOT ."id" AS dot_id,
DOT ."dot_name",
AREA."id" AS area_id,
AREA ."name" AS area_name,
WAYBILL."order_channel_id" ,
orderchannelcodes."code_desc" AS "order_channel_name",
WAYBILL."order_dt" ,
WAYBILL."order_terminal_type" ,
WAYBILL."order_terminal_os_type" ,
WAYBILL."reserve_dt" ,
WAYBILL."is_collect_package_timeout" ,
WAYBILL."pkg_id" ,
WAYBILL."pkg_number" ,
WAYBILL."timeout_dt" ,
WAYBILL."transform_type",
WAYBILL."delivery_customer_name" ,
WAYBILL."delivery_addr" ,
WAYBILL."delivery_mobile" ,
WAYBILL."delivery_tel" ,
WAYBILL."cdt" ,
WAYBILL."udt" ,
WAYBILL."remark" ,
record."sw_id",
swarehouse."name" AS sw_name,
scompany."company_name" AS sw_company_name,
record."ew_id",
ewarehouse."name" AS ew_name,
ecompany."company_name" AS ew_company_name,
tool."id" AS tt_id,
tool."license_plate" AS tt_name,
RECORD ."route_id",
concat(ROUTE ."start_station",ROUTE ."end_station") AS route_name
FROM "tbl_waybill" WAYBILL
LEFT JOIN "tbl_codes" orderchannelcodes ON orderchannelcodes."type" =18 AND WAYBILL."order_channel_id" =orderchannelcodes."code"
LEFT JOIN "tbl_customer" customer ON WAYBILL."cid" = CUSTOMER ."id"
LEFT JOIN "tbl_codes" customercodes ON customercodes."type" =16 AND customer."type" =customercodes."code"
LEFT JOIN "tbl_courier" courier ON WAYBILL."eid" = courier."id"
LEFT JOIN "tbl_dot" dot ON courier."dot_id" =dot."id"
LEFT JOIN "tbl_areas" area ON area."id" = DOT ."manage_area_id"
LEFT JOIN "tbl_transport_record" record ON RECORD ."pw_waybill_number" = WAYBILL."waybill_number"
LEFT JOIN "tbl_warehouse" swarehouse ON swarehouse."id" = record."sw_id"
LEFT JOIN "tbl_warehouse" ewarehouse ON ewarehouse."id" = record."ew_id"
LEFT JOIN "tbl_transport_tool" tool ON tool."id"=record."transport_tool_id"
LEFT JOIN "tbl_route" route ON record."route_id"=ROUTE ."id"
LEFT JOIN "tbl_company_warehouse_map" swarehouse_map ON record."sw_id"=swarehouse_map."warehouse_id"
LEFT JOIN "tbl_company" scompany ON scompany."id"=swarehouse_map."company_id"
LEFT JOIN "tbl_company_warehouse_map" ewarehouse_map ON record."ew_id"=ewarehouse_map."warehouse_id"
LEFT JOIN "tbl_company" ecompany ON ecompany."id"=ewarehouse_map."company_id"
ORDER BY WAYBILL."id" ASC
3、Spark实现
初始化环境变量
初始化运单明细拉宽作业的环境变量
代码语言:javascript复制package cn.it.logistics.offline.dwd
import cn.it.logistics.common.{Configuration, SparkUtils}
import cn.it.logistics.offline.OfflineApp
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
object WayBillDWD extends OfflineApp{
//定义应用的名称
val appName = this.getClass.getSimpleName
/**
* 入口方法
* @param args
*/
def main(args: Array[String]): Unit = {
/**
* 实现步骤:
* 1)创建SparkConf对象
* 2)创建SparkSession对象
* 3)读取运单明细宽表的数据
* 4)对运单明细宽表的数据进行指标的计算
* 5)将计算好的指标数据写入到kudu数据库中
* 5.1:定义指标结果表的schema信息
* 5.2:组织需要写入到kudu表的数据
* 5.3:判断指标结果表是否存在,如果不存在则创建
* 5.4:将数据写入到kudu表中
* 6)删除缓存数据
* 7)停止任务,退出sparksession
*/
//TODO 1)创建SparkConf对象
val sparkConf: SparkConf = SparkUtils.autoSettingEnv(
SparkUtils.sparkConf(appName)
)
//TODO 2)创建SparkSession对象
val sparkSession: SparkSession = SparkUtils.getSparkSession(sparkConf)
sparkSession.sparkContext.setLogLevel(Configuration.LOG_OFF)
//处理数据
execute(sparkSession)
}
/**
* 数据处理
*
* @param sparkSession
*/
override def execute(sparkSession: SparkSession): Unit = {
sparkSession.stop()
}
}
加载快递单相关的表数据并缓存
- 加载运单表的时候,需要指定日期条件,因为运单主题最终需要Azkaban定时调度执行,每天执行一次增量数据,因此需要指定日期。
- 判断是否是首次运行,如果是首次运行的话,则全量装载数据(含历史数据)
//TODO 3)加载kudu中的事实表和维度表的数据(将加载后的数据进行缓存)
//3.1:加载运单事实表的数据
val wayBillDF: DataFrame = getKuduSource(sparkSession, TableMapping.waybill, Configuration.isFirstRunnable)
.persist(StorageLevel.DISK_ONLY_2) //将数据缓存两个节点的磁盘目录,避免单机故障导致的缓存数据丢失
//3.2:加载快递员表
val courierDF: DataFrame = getKuduSource(sparkSession, TableMapping.courier, true).persist(StorageLevel.DISK_ONLY_2)
//3.3:加载网点表
val dotDF: DataFrame = getKuduSource(sparkSession, TableMapping.dot, true).persist(StorageLevel.DISK_ONLY_2)
//3.4:加载区域表
val areasDF: DataFrame = getKuduSource(sparkSession, TableMapping.areas, true).persist(StorageLevel.DISK_ONLY_2)
//3.5:加载转运记录表
val recordDF: DataFrame = getKuduSource(sparkSession, TableMapping.transportRecord, true).persist(StorageLevel.DISK_ONLY_2)
//3.6:加载起始仓库表
val startWarehouseDF: DataFrame = getKuduSource(sparkSession, TableMapping.warehouse, true).persist(StorageLevel.DISK_ONLY_2)
//3.7:加载到达仓库表
val endWarehouseDF: DataFrame = getKuduSource(sparkSession, TableMapping.warehouse, true).persist(StorageLevel.DISK_ONLY_2)
//3.8:加载车辆表
val toolDF: DataFrame = getKuduSource(sparkSession, TableMapping.transportTool, true).persist(StorageLevel.DISK_ONLY_2)
//3.9:加载线路表
val routeDF: DataFrame = getKuduSource(sparkSession, TableMapping.route, true).persist(StorageLevel.DISK_ONLY_2)
//3.10:加载起始仓库关联表
val startCompanyWarehouseDF: DataFrame = getKuduSource(sparkSession, TableMapping.companyWarehouseMap, true).persist(StorageLevel.DISK_ONLY_2)
//3.11:加载到达仓库关联表
val endCompanyWarehouseDF: DataFrame = getKuduSource(sparkSession, TableMapping.companyWarehouseMap, true).persist(StorageLevel.DISK_ONLY_2)
//3.12:加载起始仓库所在公司表
val startCompanyDF: DataFrame = getKuduSource(sparkSession, TableMapping.company, true).persist(StorageLevel.DISK_ONLY_2)
//3.13:加载到达仓库所在公司表
val endCompanyDF: DataFrame = getKuduSource(sparkSession, TableMapping.company, true).persist(StorageLevel.DISK_ONLY_2)
//3.14:加载物流码表
val codesDF: DataFrame = getKuduSource(sparkSession, TableMapping.codes, true).persist(StorageLevel.DISK_ONLY_2)
//3.15:加载客户表
val customerDF: DataFrame = getKuduSource(sparkSession, TableMapping.customer, true).persist(StorageLevel.DISK_ONLY_2)
//导入隐士转换
import sparkSession.implicits._
//下单渠道类型表
val orderChannelTypeDF: DataFrame = codesDF.where($"type" === CodeTypeMapping.OrderChannelType).select(
$"code".as("orderChannelTypeCode"), $"codeDesc".as("orderChannelTypeName"))
//客户类型表
val customerTypeDF: DataFrame = codesDF.where($"type" === CodeTypeMapping.CustomType).select(
$"code".as("customerTypeCode"), $"codeDesc".as("customerTypeName"))
定义表的关联关系
- 为了在DWS层任务中方便的获取每日增量运单数据(根据日期),因此在DataFrame基础上动态增加列(day),指定日期格式为yyyyMMdd
代码如下:
代码语言:javascript复制//TODO 4)定义维度表与事实表的关联
val left_outer = "left_outer"
val wayBillDetailDF = wayBillDF
.join(courierDF, wayBillDF("eid") === courierDF("id"), left_outer) //运单表与快递员表进行关联
.join(dotDF, courierDF("dotId") === dotDF("id"), left_outer) //网点表与快递员表进行关联
.join(areasDF, areasDF("id") === dotDF("manageAreaId"), left_outer) //网点表与区域表进行关联
.join(recordDF, recordDF("pwWaybillNumber") === wayBillDF("waybillNumber"), left_outer) //转运记录表与运单表关联
.join(startWarehouseDF, startWarehouseDF("id") === recordDF("swId"), left_outer) //起始仓库与转运记录表关联
.join(endWarehouseDF, endWarehouseDF("id") === recordDF("ewId"), left_outer) //到达仓库与转运记录表关联
.join(toolDF, toolDF("id") === recordDF("transportToolId"), left_outer) //转运记录表与交通工具表关联
.join(routeDF, routeDF("id") === recordDF("routeId"), left_outer) //转运记录表与路线表关联
.join(startCompanyWarehouseDF, startCompanyWarehouseDF("warehouseId") === startWarehouseDF("id"), left_outer) //起始仓库表与仓库公司关联表关联
.join(startCompanyDF, startCompanyDF("id") === startCompanyWarehouseDF("companyId"), left_outer) //公司表与起始仓库公司关联表关联
.join(endCompanyWarehouseDF, endCompanyWarehouseDF("warehouseId") === endWarehouseDF("id"), left_outer) //到达仓库表与仓库公司关联表关联
.join(endCompanyDF, endCompanyDF("id") === endCompanyWarehouseDF("companyId"), left_outer) //公司表与到达仓库公司关联表关联
.join(customerDF, customerDF("id") === wayBillDF("cid"), left_outer) //运单表与客户表关联
.join(orderChannelTypeDF, orderChannelTypeDF("orderChannelTypeCode") === wayBillDF("orderChannelId"), left_outer) //下单渠道表与运单表关联
.join(customerTypeDF, customerTypeDF("customerTypeCode") === customerDF("type"), left_outer) //客户类型表与客户表关联
.withColumn("day", date_format(wayBillDF("cdt"), "yyyyMMdd")) //增加日期列
.sort(wayBillDF.col("cdt").asc) //根据运单表的创建时间顺序排序
.select(
wayBillDF("id"),//运单id
wayBillDF("expressBillNumber").as("express_bill_number"), //快递单编号
wayBillDF("waybillNumber").as("waybill_number"),//运单编号
wayBillDF("cid"), //客户id
customerDF("name").as("cname"),//客户名称
customerDF("type").as("ctype"),//客户类型
customerTypeDF("customerTypeName").as("ctype_name"),//客户类型名称
wayBillDF("eid"), //快递员id
courierDF("name").as("ename"),//快递员名称
dotDF("id").as("dot_id"),//网点id
dotDF("dotName").as("dot_name"),//网点名称
areasDF("id").as("area_id"),//区域id
areasDF("name").as("area_name"),//区域名称
wayBillDF("orderChannelId").as("order_channel_id"),//渠道id
orderChannelTypeDF("orderChannelTypeName").as("order_chanel_name"),//渠道名称
wayBillDF("orderDt").as("order_dt"),//下单时间
wayBillDF("orderTerminalType").as("order_terminal_type"),//下单设备类型
wayBillDF("orderTerminalOsType").as("order_terminal_os_type"),//下单设备操作系统类型
wayBillDF("reserveDt").as("reserve_dt"),//预约取件时间
wayBillDF("isCollectPackageTimeout").as("is_collect_package_timeout"),//是否取件超时
wayBillDF("pkgId").as("pkg_id"),//订装ID
wayBillDF("pkgNumber").as("pkg_number"),//订装编号
wayBillDF("timeoutDt").as("timeout_dt"),//超时时间
wayBillDF("transformType").as("transform_type"),//运输方式
wayBillDF("deliveryAddr").as("delivery_addr"),
wayBillDF("deliveryCustomerName").as("delivery_customer_name"),
wayBillDF("deliveryMobile").as("delivery_mobile"),
wayBillDF("deliveryTel").as("delivery_tel"),
wayBillDF("receiveAddr").as("receive_addr"),
wayBillDF("receiveCustomerName").as("receive_customer_name"),
wayBillDF("receiveMobile").as("receive_mobile"),
wayBillDF("receiveTel").as("receive_tel"),
wayBillDF("cdt"),
wayBillDF("udt"),
wayBillDF("remark"),
recordDF("swId").as("sw_id"),
startWarehouseDF("name").as("sw_name"),
startCompanyDF("id").as("sw_company_id"),
startCompanyDF("companyName").as("sw_company_name"),
recordDF("ewId").as("ew_id"),
endWarehouseDF("name").as("ew_name"),
endCompanyDF("id").as("ew_company_id"),
endCompanyDF("companyName").as("ew_company_name"),
toolDF("id").as("tt_id"),
toolDF("licensePlate").as("tt_name"),
recordDF("routeId").as("route_id"),
functions.concat(routeDF("startStation"), routeDF("endStation")).as("route_name"),
$"day"
)
创建运单明细宽表并将运单明细数据写入到kudu表中
运单宽表数据需要保存到kudu中,因此在第一次执行快递单明细拉宽操作时,运单明细宽表是不存在的,因此需要实现自动判断宽表是否存在,如果不存在则创建
实现步骤:
- 在WaybillDWD 单例对象中调用save方法
实现过程:
- 在WaybillDWD 单例对象Main方法中调用save方法
//TODO 5)将拉宽后的数据再次写回到kudu数据库中(DWD明细层)
save(wayBillDetailDF, OfflineTableDefine.wayBillDetail)
删除缓存数据
为了释放资源,运单明细宽表数据计算完成以后,需要将缓存的源表数据删除。
代码语言:javascript复制//TODO 6):将缓存的数据删除掉
wayBillDF.unpersist()
courierDF.unpersist()
dotDF.unpersist()
areasDF.unpersist()
recordDF.unpersist()
startWarehouseDF.unpersist()
endWarehouseDF.unpersist()
toolDF.unpersist()
routeDF.unpersist()
startCompanyWarehouseDF.unpersist()
startCompanyDF.unpersist()
endCompanyWarehouseDF.unpersist()
endCompanyDF.unpersist()
customerDF.unpersist()
orderChannelTypeDF.unpersist()
customerTypeDF.unpersist()
完整代码
代码语言:javascript复制package cn.it.logistics.offline.dwd
import cn.it.logistics.common.{CodeTypeMapping, Configuration, OfflineTableDefine, SparkUtils, TableMapping}
import cn.it.logistics.offline.OfflineApp
import org.apache.spark.SparkConf
import org.apache.spark.sql.functions.date_format
import org.apache.spark.sql.{DataFrame, SparkSession, functions}
import org.apache.spark.storage.StorageLevel
object WayBillDWD extends OfflineApp{
//定义应用的名称
val appName = this.getClass.getSimpleName
/**
* 入口方法
* @param args
*/
def main(args: Array[String]): Unit = {
/**
* 实现步骤:
* 1)创建SparkConf对象
* 2)创建SparkSession对象
* 3)读取运单明细宽表的数据
* 4)对运单明细宽表的数据进行指标的计算
* 5)将计算好的指标数据写入到kudu数据库中
* 5.1:定义指标结果表的schema信息
* 5.2:组织需要写入到kudu表的数据
* 5.3:判断指标结果表是否存在,如果不存在则创建
* 5.4:将数据写入到kudu表中
* 6)删除缓存数据
* 7)停止任务,退出sparksession
*/
//TODO 1)创建SparkConf对象
val sparkConf: SparkConf = SparkUtils.autoSettingEnv(
SparkUtils.sparkConf(appName)
)
//TODO 2)创建SparkSession对象
val sparkSession: SparkSession = SparkUtils.getSparkSession(sparkConf)
sparkSession.sparkContext.setLogLevel(Configuration.LOG_OFF)
//处理数据
execute(sparkSession)
}
/**
* 数据处理
*
* @param sparkSession
*/
override def execute(sparkSession: SparkSession): Unit = {
//TODO 3)加载kudu中的事实表和维度表的数据(将加载后的数据进行缓存)
//3.1:加载运单事实表的数据
val wayBillDF: DataFrame = getKuduSource(sparkSession, TableMapping.waybill, Configuration.isFirstRunnable)
.persist(StorageLevel.DISK_ONLY_2) //将数据缓存两个节点的磁盘目录,避免单机故障导致的缓存数据丢失
//3.2:加载快递员表
val courierDF: DataFrame = getKuduSource(sparkSession, TableMapping.courier, true).persist(StorageLevel.DISK_ONLY_2)
//3.3:加载网点表
val dotDF: DataFrame = getKuduSource(sparkSession, TableMapping.dot, true).persist(StorageLevel.DISK_ONLY_2)
//3.4:加载区域表
val areasDF: DataFrame = getKuduSource(sparkSession, TableMapping.areas, true).persist(StorageLevel.DISK_ONLY_2)
//3.5:加载转运记录表
val recordDF: DataFrame = getKuduSource(sparkSession, TableMapping.transportRecord, true).persist(StorageLevel.DISK_ONLY_2)
//3.6:加载起始仓库表
val startWarehouseDF: DataFrame = getKuduSource(sparkSession, TableMapping.warehouse, true).persist(StorageLevel.DISK_ONLY_2)
//3.7:加载到达仓库表
val endWarehouseDF: DataFrame = getKuduSource(sparkSession, TableMapping.warehouse, true).persist(StorageLevel.DISK_ONLY_2)
//3.8:加载车辆表
val toolDF: DataFrame = getKuduSource(sparkSession, TableMapping.transportTool, true).persist(StorageLevel.DISK_ONLY_2)
//3.9:加载线路表
val routeDF: DataFrame = getKuduSource(sparkSession, TableMapping.route, true).persist(StorageLevel.DISK_ONLY_2)
//3.10:加载起始仓库关联表
val startCompanyWarehouseDF: DataFrame = getKuduSource(sparkSession, TableMapping.companyWarehouseMap, true).persist(StorageLevel.DISK_ONLY_2)
//3.11:加载到达仓库关联表
val endCompanyWarehouseDF: DataFrame = getKuduSource(sparkSession, TableMapping.companyWarehouseMap, true).persist(StorageLevel.DISK_ONLY_2)
//3.12:加载起始仓库所在公司表
val startCompanyDF: DataFrame = getKuduSource(sparkSession, TableMapping.company, true).persist(StorageLevel.DISK_ONLY_2)
//3.13:加载到达仓库所在公司表
val endCompanyDF: DataFrame = getKuduSource(sparkSession, TableMapping.company, true).persist(StorageLevel.DISK_ONLY_2)
//3.14:加载物流码表
val codesDF: DataFrame = getKuduSource(sparkSession, TableMapping.codes, true).persist(StorageLevel.DISK_ONLY_2)
//3.15:加载客户表
val customerDF: DataFrame = getKuduSource(sparkSession, TableMapping.customer, true).persist(StorageLevel.DISK_ONLY_2)
//导入隐士转换
import sparkSession.implicits._
//下单渠道类型表
val orderChannelTypeDF: DataFrame = codesDF.where($"type" === CodeTypeMapping.OrderChannelType).select(
$"code".as("orderChannelTypeCode"), $"codeDesc".as("orderChannelTypeName"))
//客户类型表
val customerTypeDF: DataFrame = codesDF.where($"type" === CodeTypeMapping.CustomType).select(
$"code".as("customerTypeCode"), $"codeDesc".as("customerTypeName"))
//TODO 4)定义维度表与事实表的关联
val left_outer = "left_outer"
val wayBillDetailDF = wayBillDF
.join(courierDF, wayBillDF("eid") === courierDF("id"), left_outer) //运单表与快递员表进行关联
.join(dotDF, courierDF("dotId") === dotDF("id"), left_outer) //网点表与快递员表进行关联
.join(areasDF, areasDF("id") === dotDF("manageAreaId"), left_outer) //网点表与区域表进行关联
.join(recordDF, recordDF("pwWaybillNumber") === wayBillDF("waybillNumber"), left_outer) //转运记录表与运单表关联
.join(startWarehouseDF, startWarehouseDF("id") === recordDF("swId"), left_outer) //起始仓库与转运记录表关联
.join(endWarehouseDF, endWarehouseDF("id") === recordDF("ewId"), left_outer) //到达仓库与转运记录表关联
.join(toolDF, toolDF("id") === recordDF("transportToolId"), left_outer) //转运记录表与交通工具表关联
.join(routeDF, routeDF("id") === recordDF("routeId"), left_outer) //转运记录表与路线表关联
.join(startCompanyWarehouseDF, startCompanyWarehouseDF("warehouseId") === startWarehouseDF("id"), left_outer) //起始仓库表与仓库公司关联表关联
.join(startCompanyDF, startCompanyDF("id") === startCompanyWarehouseDF("companyId"), left_outer) //公司表与起始仓库公司关联表关联
.join(endCompanyWarehouseDF, endCompanyWarehouseDF("warehouseId") === endWarehouseDF("id"), left_outer) //到达仓库表与仓库公司关联表关联
.join(endCompanyDF, endCompanyDF("id") === endCompanyWarehouseDF("companyId"), left_outer) //公司表与到达仓库公司关联表关联
.join(customerDF, customerDF("id") === wayBillDF("cid"), left_outer) //运单表与客户表关联
.join(orderChannelTypeDF, orderChannelTypeDF("orderChannelTypeCode") === wayBillDF("orderChannelId"), left_outer) //下单渠道表与运单表关联
.join(customerTypeDF, customerTypeDF("customerTypeCode") === customerDF("type"), left_outer) //客户类型表与客户表关联
.withColumn("day", date_format(wayBillDF("cdt"), "yyyyMMdd")) //增加日期列
.sort(wayBillDF.col("cdt").asc) //根据运单表的创建时间顺序排序
.select(
wayBillDF("id"),//运单id
wayBillDF("expressBillNumber").as("express_bill_number"), //快递单编号
wayBillDF("waybillNumber").as("waybill_number"),//运单编号
wayBillDF("cid"), //客户id
customerDF("name").as("cname"),//客户名称
customerDF("type").as("ctype"),//客户类型
customerTypeDF("customerTypeName").as("ctype_name"),//客户类型名称
wayBillDF("eid"), //快递员id
courierDF("name").as("ename"),//快递员名称
dotDF("id").as("dot_id"),//网点id
dotDF("dotName").as("dot_name"),//网点名称
areasDF("id").as("area_id"),//区域id
areasDF("name").as("area_name"),//区域名称
wayBillDF("orderChannelId").as("order_channel_id"),//渠道id
orderChannelTypeDF("orderChannelTypeName").as("order_chanel_name"),//渠道名称
wayBillDF("orderDt").as("order_dt"),//下单时间
wayBillDF("orderTerminalType").as("order_terminal_type"),//下单设备类型
wayBillDF("orderTerminalOsType").as("order_terminal_os_type"),//下单设备操作系统类型
wayBillDF("reserveDt").as("reserve_dt"),//预约取件时间
wayBillDF("isCollectPackageTimeout").as("is_collect_package_timeout"),//是否取件超时
wayBillDF("pkgId").as("pkg_id"),//订装ID
wayBillDF("pkgNumber").as("pkg_number"),//订装编号
wayBillDF("timeoutDt").as("timeout_dt"),//超时时间
wayBillDF("transformType").as("transform_type"),//运输方式
wayBillDF("deliveryAddr").as("delivery_addr"),
wayBillDF("deliveryCustomerName").as("delivery_customer_name"),
wayBillDF("deliveryMobile").as("delivery_mobile"),
wayBillDF("deliveryTel").as("delivery_tel"),
wayBillDF("receiveAddr").as("receive_addr"),
wayBillDF("receiveCustomerName").as("receive_customer_name"),
wayBillDF("receiveMobile").as("receive_mobile"),
wayBillDF("receiveTel").as("receive_tel"),
wayBillDF("cdt"),
wayBillDF("udt"),
wayBillDF("remark"),
recordDF("swId").as("sw_id"),
startWarehouseDF("name").as("sw_name"),
startCompanyDF("id").as("sw_company_id"),
startCompanyDF("companyName").as("sw_company_name"),
recordDF("ewId").as("ew_id"),
endWarehouseDF("name").as("ew_name"),
endCompanyDF("id").as("ew_company_id"),
endCompanyDF("companyName").as("ew_company_name"),
toolDF("id").as("tt_id"),
toolDF("licensePlate").as("tt_name"),
recordDF("routeId").as("route_id"),
functions.concat(routeDF("startStation"), routeDF("endStation")).as("route_name"),
$"day"
)
//TODO 5)将拉宽后的数据再次写回到kudu数据库中(DWD明细层)
save(wayBillDetailDF, OfflineTableDefine.wayBillDetail)
//TODO 6):将缓存的数据删除掉
wayBillDF.unpersist()
courierDF.unpersist()
dotDF.unpersist()
areasDF.unpersist()
recordDF.unpersist()
startWarehouseDF.unpersist()
endWarehouseDF.unpersist()
toolDF.unpersist()
routeDF.unpersist()
startCompanyWarehouseDF.unpersist()
startCompanyDF.unpersist()
endCompanyWarehouseDF.unpersist()
endCompanyDF.unpersist()
customerDF.unpersist()
orderChannelTypeDF.unpersist()
customerTypeDF.unpersist()
sparkSession.stop()
}
}
4、测试验证
启动WaybillDWD
五、运单数据指标计算开发
1、计算的字段
字段名 | 字段描述 |
---|---|
id | 数据产生时间 |
totalWaybillCount | 总运单数 |
maxAreaTotalCount | 最大区域运单数 |
minAreaTotalCount | 最小区域运单数 |
avgAreaTotalCount | 各区域平均运单数 |
maxCompanyTotalCount | 各分公司最大运单数 |
minCompanyTotalCount | 各分公司最小运单数 |
avgCompanyTotalCount | 各分公司平均运单数 |
maxDotTotalCount | 各网点最大运单数 |
minDotTotalCount | 各网点最小运单数 |
avgDotTotalCount | 各网点平均运单数 |
maxRouteTotalCount | 各线路最大运单数 |
minRouteTotalCount | 各线路最小运单数 |
avgRouteTotalCount | 各线路平均运单数 |
maxTransportToolTotalCount | 各运输工具最大运单数 |
minTransportToolTotalCount | 各运输工具最小运单数 |
avgTransportToolTotalCount | 各运输工具平均运单数 |
maxCtypeTotalCount | 各类客户类型最大运单数 |
minCtypeTotalCount | 各类客户类型最小运单数 |
avgCtypeTotalCount | 各类客户类型平均运单数 |
2、Spark实现
实现步骤:
- 在dws目录下创建 WaybillDWS 单例对象,继承自OfflineApp特质
- 初始化环境的参数,创建SparkSession对象
- 根据指定的日期获取拉宽后的运单宽表(tbl_waybill_detail)增量数据,并缓存数据
- 判断是否是首次运行,如果是首次运行的话,则全量装载数据(含历史数据)
- 指标计算
- 计算总运单数
- 最大区域运单数
- 最小区域运单数
- 各区域平均运单数
- 各分公司最大运单数
- 各分公司最小运单数
- 各分公司平均运单数
- 各网点最大运单数
- 各网点最小运单数
- 各网点平均运单数
- 各线路最大运单数
- 各线路最小运单数
- 各线路平均运单数
- 各运输工具最大运单数
- 各运输工具最小运单数
- 各运输工具平均运单数
- 各类客户最大运单数
- 各类客户最小运单数
- 各类客户平均运单数
- 获取当前时间yyyyMMddHH
- 构建要持久化的指标数据(需要判断计算的指标是否有值,若没有需要赋值默认值)
- 通过StructType构建指定Schema
- 创建运单指标数据表(若存在则不创建)
- 持久化指标数据到kudu表
初始化环境变量
代码语言:javascript复制package cn.it.logistics.offline.dws
import cn.it.logistics.common.{Configuration, SparkUtils}
import cn.it.logistics.offline.OfflineApp
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
/**
* 运单指标开发
*/
object WayBillDWS extends OfflineApp{
//定义应用的名称
val appName = this.getClass.getSimpleName
/**
* 入口方法
* @param args
*/
def main(args: Array[String]): Unit = {
/**
* 实现步骤:
* 1)创建SparkConf对象
* 2)创建SparkSession对象
* 3)读取运单明细宽表的数据
* 4)对运单明细宽表的数据进行指标的计算
* 5)将计算好的指标数据写入到kudu数据库中
* 5.1:定义指标结果表的schema信息
* 5.2:组织需要写入到kudu表的数据
* 5.3:判断指标结果表是否存在,如果不存在则创建
* 5.4:将数据写入到kudu表中
* 6)删除缓存数据
* 7)停止任务,退出sparksession
*/
//TODO 1)创建SparkConf对象
val sparkConf: SparkConf = SparkUtils.autoSettingEnv(
SparkUtils.sparkConf(appName)
)
//TODO 2)创建SparkSession对象
val sparkSession: SparkSession = SparkUtils.getSparkSession(sparkConf)
sparkSession.sparkContext.setLogLevel(Configuration.LOG_OFF)
//处理数据
execute(sparkSession)
}
/**
* 数据处理
*
* @param sparkSession
*/
override def execute(sparkSession: SparkSession): Unit = {
sparkSession.stop()
}
}
加载运单宽表增量数据并缓存
加载运单宽表的时候,需要指定日期条件,因为运单主题最终需要Azkaban定时调度执行,每天执行一次增量数据,因此需要指定日期。
代码语言:javascript复制//TODO 3)读取运单明细宽表的数据
val wayBillDetailDF: DataFrame = getKuduSource(sparkSession, OfflineTableDefine.wayBillDetail, Configuration.isFirstRunnable).toDF()
指标计算
代码语言:javascript复制//根据运单的日期进行分组
val wayBillDetailGroupByDayDF: DataFrame = wayBillDetailDF.select("day").groupBy("day").count().cache()
//导入隐式转换
import sparkSession.implicits._
//定义计算好的指标结果集合对象
val rows: ArrayBuffer[Row] = ArrayBuffer[Row]()
//循环遍历日期的数据
wayBillDetailGroupByDayDF.collect().foreach(row => {
//获取到要处理的数据所在的日期
val day: String = row.getAs[String](0)
//返回指定日期的运单明细数据
val wayBillDetailByDayDF: DataFrame = wayBillDetailDF.where(col("day") === day).toDF().persist(StorageLevel.DISK_ONLY_2)
//TODO 4)对运单明细宽表的数据进行指标的计算
//总运单数
val totalWayBillCount: Long = wayBillDetailByDayDF.agg(count("waybill_number")).first().getLong(0)
//各区域的运单数
val areaCountDF: DataFrame = wayBillDetailByDayDF.groupBy("area_id").agg(first($"area_id"), count($"id").alias("areaTotalCount")).cache()
//各区域的最大运单数
val maxAreaTotalCount: Row = areaCountDF.orderBy($"areaTotalCount".desc).first()
//各区域的最小运单数
val minAreaTotalCount: Row = areaCountDF.orderBy($"areaTotalCount".asc).first()
//各区域的平均运单数
val avgAreaTotalCount: Row = areaCountDF.agg(avg($"areaTotalCount")).first()
//各公司的运单数
val companyCountDF: DataFrame = wayBillDetailByDayDF.groupBy("sw_company_id").agg(first($"sw_company_id"), count($"id").alias("companyTotalCount")).cache()
//各公司的最大运单数
val maxCompanyTotalCount: Row = companyCountDF.orderBy($"companyTotalCount".desc).first()
//各公司的最小运单数
val minCompanyTotalCount: Row = companyCountDF.orderBy($"companyTotalCount".asc).first()
//各公司的平均运单数
val avgCompanyTotalCount: Row = companyCountDF.agg(avg($"companyTotalCount")).first()
//各网点的运单数
val dotCountDF: DataFrame = wayBillDetailByDayDF.groupBy("dot_id").agg(first($"dot_id"), count($"id").alias("dotTotalCount")).cache()
//各网点的最大运单数
val maxDotTotalCount: Row = dotCountDF.orderBy($"dotTotalCount".desc).first()
//各网点的最小运单数
val minDotTotalCount: Row = dotCountDF.orderBy($"dotTotalCount".asc).first()
//各网点的平均运单数
val avgDotTotalCount: Row = dotCountDF.agg(avg($"dotTotalCount")).first()
//各线路的运单数
val routeCountDF: DataFrame = wayBillDetailByDayDF.groupBy("route_id").agg(first($"route_id"), count($"id").alias("routeTotalCount")).cache()
//各线路的最大运单数
val maxRouteTotalCount: Row = routeCountDF.orderBy($"routeTotalCount".desc).first()
//各线路的最小运单数
val minRouteTotalCount: Row = routeCountDF.orderBy($"routeTotalCount".asc).first()
//各线路的平均运单数
val avgRouteTotalCount: Row = routeCountDF.agg(avg($"routeTotalCount")).first()
//各运输工具的运单数
val ttCountDF: DataFrame = wayBillDetailByDayDF.groupBy("tt_id").agg(first($"tt_id"), count($"id").alias("ttTotalCount")).cache()
//各线路的最大运单数
val maxToolTotalCount: Row = ttCountDF.orderBy($"ttTotalCount".desc).first()
//各线路的最小运单数
val minToolTotalCount: Row = ttCountDF.orderBy($"ttTotalCount".asc).first()
//各线路的平均运单数
val avgToolTotalCount: Row = ttCountDF.agg(avg($"ttTotalCount")).first()
//各类客户的运单数
val cTypeCountDF: DataFrame = wayBillDetailByDayDF.groupBy("ctype").agg(first($"ctype"), count($"id").alias("cTypeTotalCount")).cache()
//各线路的最大运单数
val maxCTypeTotalCount: Row = cTypeCountDF.orderBy($"cTypeTotalCount".desc).first()
//各线路的最小运单数
val minCTypeTotalCount: Row = cTypeCountDF.orderBy($"cTypeTotalCount".asc).first()
//各线路的平均运单数
val avgCTypeTotalCount: Row = cTypeCountDF.agg(avg($"cTypeTotalCount")).first()
// 5.2:组织要写入到kudu表的数据
val rowInfo: Row = Row(
day,
totalWayBillCount,
if (maxAreaTotalCount.isNullAt(0)) 0L else maxAreaTotalCount.get(0).asInstanceOf[Number].longValue(),
if (minAreaTotalCount.isNullAt(0)) 0L else minAreaTotalCount.get(0).asInstanceOf[Number].longValue(),
if (avgAreaTotalCount.isNullAt(0)) 0L else avgAreaTotalCount.get(0).asInstanceOf[Number].longValue(),
if (maxCompanyTotalCount.isNullAt(0)) 0L else maxCompanyTotalCount.get(0).asInstanceOf[Number].longValue(),
if (minCompanyTotalCount.isNullAt(0)) 0L else minCompanyTotalCount.get(0).asInstanceOf[Number].longValue(),
if (avgCompanyTotalCount.isNullAt(0)) 0L else avgCompanyTotalCount.get(0).asInstanceOf[Number].longValue(),
if (maxDotTotalCount.isNullAt(0)) 0L else maxDotTotalCount.get(0).asInstanceOf[Number].longValue(),
if (minDotTotalCount.isNullAt(0)) 0L else minDotTotalCount.get(0).asInstanceOf[Number].longValue(),
if (avgDotTotalCount.isNullAt(0)) 0L else avgDotTotalCount.get(0).asInstanceOf[Number].longValue(),
if (maxRouteTotalCount.isNullAt(0)) 0L else maxRouteTotalCount.get(0).asInstanceOf[Number].longValue(),
if (minRouteTotalCount.isNullAt(0)) 0L else minRouteTotalCount.get(0).asInstanceOf[Number].longValue(),
if (avgRouteTotalCount.isNullAt(0)) 0L else avgRouteTotalCount.get(0).asInstanceOf[Number].longValue(),
if (maxToolTotalCount.isNullAt(0)) 0L else maxToolTotalCount.get(0).asInstanceOf[Number].longValue(),
if (minToolTotalCount.isNullAt(0)) 0L else minToolTotalCount.get(0).asInstanceOf[Number].longValue(),
if (avgToolTotalCount.isNullAt(0)) 0L else avgToolTotalCount.get(0).asInstanceOf[Number].longValue(),
if (maxCTypeTotalCount.isNullAt(0)) 0L else maxCTypeTotalCount.get(0).asInstanceOf[Number].longValue(),
if (minCTypeTotalCount.isNullAt(0)) 0L else minCTypeTotalCount.get(0).asInstanceOf[Number].longValue(),
if (avgCTypeTotalCount.isNullAt(0)) 0L else avgCTypeTotalCount.get(0).asInstanceOf[Number].longValue()
)
rows.append(rowInfo)
println(rowInfo)
//释放资源
wayBillDetailByDayDF.unpersist()
areaCountDF.unpersist()
areaCountDF.unpersist()
companyCountDF.unpersist()
dotCountDF.unpersist()
routeCountDF.unpersist()
ttCountDF.unpersist()
cTypeCountDF.unpersist()
})
通过StructType构建指定Schema
代码语言:javascript复制//TODO 5)将计算好的指标数据写入到kudu数据库中
//5.1:定义指标结果表的shcma信息
val schema: StructType = StructType(Array(
StructField("id", StringType, true, Metadata.empty),
StructField("totalWaybillCount", LongType, true, Metadata.empty),
StructField("maxAreaTotalCount", LongType, true, Metadata.empty),
StructField("minAreaTotalCount", LongType, true, Metadata.empty),
StructField("avgAreaTotalCount", LongType, true, Metadata.empty),
StructField("maxCompanyTotalCount", LongType, true, Metadata.empty),
StructField("minCompanyTotalCount", LongType, true, Metadata.empty),
StructField("avgCompanyTotalCount", LongType, true, Metadata.empty),
StructField("maxDotTotalCount", LongType, true, Metadata.empty),
StructField("minDotTotalCount", LongType, true, Metadata.empty),
StructField("avgDotTotalCount", LongType, true, Metadata.empty),
StructField("maxRouteTotalCount", LongType, true, Metadata.empty),
StructField("minRouteTotalCount", LongType, true, Metadata.empty),
StructField("avgRouteTotalCount", LongType, true, Metadata.empty),
StructField("maxTransportToolTotalCount", LongType, true, Metadata.empty),
StructField("minTransportToolTotalCount", LongType, true, Metadata.empty),
StructField("avgTransportToolTotalCount", LongType, true, Metadata.empty),
StructField("maxCtypeTotalCount", LongType, true, Metadata.empty),
StructField("minCtypeTotalCount", LongType, true, Metadata.empty),
StructField("avgCtypeTotalCount", LongType, true, Metadata.empty)
))
持久化指标数据到kudu表
代码语言:javascript复制//5.2:使用rdd和schema创建DataFrame
val data: RDD[Row] = sparkSession.sparkContext.makeRDD(rows)
val quotaDF: DataFrame = sparkSession.createDataFrame(data, schema)
//将数据写入到kudu数据库中
save(quotaDF, OfflineTableDefine.wayBillSummary)
完整代码
代码语言:javascript复制package cn.it.logistics.offline.dws
import cn.it.logistics.common.{Configuration, OfflineTableDefine, SparkUtils}
import cn.it.logistics.offline.OfflineApp
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.functions.{avg, col, count, first}
import org.apache.spark.sql.types.{LongType, Metadata, StringType, StructField, StructType}
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import org.apache.spark.storage.StorageLevel
import scala.collection.mutable.ArrayBuffer
/**
* 运单指标开发
*/
object WayBillDWS extends OfflineApp{
//定义应用的名称
val appName = this.getClass.getSimpleName
/**
* 入口方法
* @param args
*/
def main(args: Array[String]): Unit = {
/**
* 实现步骤:
* 1)创建SparkConf对象
* 2)创建SparkSession对象
* 3)读取运单明细宽表的数据
* 4)对运单明细宽表的数据进行指标的计算
* 5)将计算好的指标数据写入到kudu数据库中
* 5.1:定义指标结果表的schema信息
* 5.2:组织需要写入到kudu表的数据
* 5.3:判断指标结果表是否存在,如果不存在则创建
* 5.4:将数据写入到kudu表中
* 6)删除缓存数据
* 7)停止任务,退出sparksession
*/
//TODO 1)创建SparkConf对象
val sparkConf: SparkConf = SparkUtils.autoSettingEnv(
SparkUtils.sparkConf(appName)
)
//TODO 2)创建SparkSession对象
val sparkSession: SparkSession = SparkUtils.getSparkSession(sparkConf)
sparkSession.sparkContext.setLogLevel(Configuration.LOG_OFF)
//处理数据
execute(sparkSession)
}
/**
* 数据处理
*
* @param sparkSession
*/
override def execute(sparkSession: SparkSession): Unit = {
//TODO 3)读取运单明细宽表的数据
val wayBillDetailDF: DataFrame = getKuduSource(sparkSession, OfflineTableDefine.wayBillDetail, Configuration.isFirstRunnable).toDF()
//根据运单的日期进行分组
val wayBillDetailGroupByDayDF: DataFrame = wayBillDetailDF.select("day").groupBy("day").count().cache()
//导入隐式转换
import sparkSession.implicits._
//定义计算好的指标结果集合对象
val rows: ArrayBuffer[Row] = ArrayBuffer[Row]()
//循环遍历日期的数据
wayBillDetailGroupByDayDF.collect().foreach(row => {
//获取到要处理的数据所在的日期
val day: String = row.getAs[String](0)
//返回指定日期的运单明细数据
val wayBillDetailByDayDF: DataFrame = wayBillDetailDF.where(col("day") === day).toDF().persist(StorageLevel.DISK_ONLY_2)
//TODO 4)对运单明细宽表的数据进行指标的计算
//总运单数
val totalWayBillCount: Long = wayBillDetailByDayDF.agg(count("waybill_number")).first().getLong(0)
//各区域的运单数
val areaCountDF: DataFrame = wayBillDetailByDayDF.groupBy("area_id").agg(first($"area_id"), count($"id").alias("areaTotalCount")).cache()
//各区域的最大运单数
val maxAreaTotalCount: Row = areaCountDF.orderBy($"areaTotalCount".desc).first()
//各区域的最小运单数
val minAreaTotalCount: Row = areaCountDF.orderBy($"areaTotalCount".asc).first()
//各区域的平均运单数
val avgAreaTotalCount: Row = areaCountDF.agg(avg($"areaTotalCount")).first()
//各公司的运单数
val companyCountDF: DataFrame = wayBillDetailByDayDF.groupBy("sw_company_id").agg(first($"sw_company_id"), count($"id").alias("companyTotalCount")).cache()
//各公司的最大运单数
val maxCompanyTotalCount: Row = companyCountDF.orderBy($"companyTotalCount".desc).first()
//各公司的最小运单数
val minCompanyTotalCount: Row = companyCountDF.orderBy($"companyTotalCount".asc).first()
//各公司的平均运单数
val avgCompanyTotalCount: Row = companyCountDF.agg(avg($"companyTotalCount")).first()
//各网点的运单数
val dotCountDF: DataFrame = wayBillDetailByDayDF.groupBy("dot_id").agg(first($"dot_id"), count($"id").alias("dotTotalCount")).cache()
//各网点的最大运单数
val maxDotTotalCount: Row = dotCountDF.orderBy($"dotTotalCount".desc).first()
//各网点的最小运单数
val minDotTotalCount: Row = dotCountDF.orderBy($"dotTotalCount".asc).first()
//各网点的平均运单数
val avgDotTotalCount: Row = dotCountDF.agg(avg($"dotTotalCount")).first()
//各线路的运单数
val routeCountDF: DataFrame = wayBillDetailByDayDF.groupBy("route_id").agg(first($"route_id"), count($"id").alias("routeTotalCount")).cache()
//各线路的最大运单数
val maxRouteTotalCount: Row = routeCountDF.orderBy($"routeTotalCount".desc).first()
//各线路的最小运单数
val minRouteTotalCount: Row = routeCountDF.orderBy($"routeTotalCount".asc).first()
//各线路的平均运单数
val avgRouteTotalCount: Row = routeCountDF.agg(avg($"routeTotalCount")).first()
//各运输工具的运单数
val ttCountDF: DataFrame = wayBillDetailByDayDF.groupBy("tt_id").agg(first($"tt_id"), count($"id").alias("ttTotalCount")).cache()
//各线路的最大运单数
val maxToolTotalCount: Row = ttCountDF.orderBy($"ttTotalCount".desc).first()
//各线路的最小运单数
val minToolTotalCount: Row = ttCountDF.orderBy($"ttTotalCount".asc).first()
//各线路的平均运单数
val avgToolTotalCount: Row = ttCountDF.agg(avg($"ttTotalCount")).first()
//各类客户的运单数
val cTypeCountDF: DataFrame = wayBillDetailByDayDF.groupBy("ctype").agg(first($"ctype"), count($"id").alias("cTypeTotalCount")).cache()
//各线路的最大运单数
val maxCTypeTotalCount: Row = cTypeCountDF.orderBy($"cTypeTotalCount".desc).first()
//各线路的最小运单数
val minCTypeTotalCount: Row = cTypeCountDF.orderBy($"cTypeTotalCount".asc).first()
//各线路的平均运单数
val avgCTypeTotalCount: Row = cTypeCountDF.agg(avg($"cTypeTotalCount")).first()
// 5.2:组织要写入到kudu表的数据
val rowInfo: Row = Row(
day,
totalWayBillCount,
if (maxAreaTotalCount.isNullAt(0)) 0L else maxAreaTotalCount.get(0).asInstanceOf[Number].longValue(),
if (minAreaTotalCount.isNullAt(0)) 0L else minAreaTotalCount.get(0).asInstanceOf[Number].longValue(),
if (avgAreaTotalCount.isNullAt(0)) 0L else avgAreaTotalCount.get(0).asInstanceOf[Number].longValue(),
if (maxCompanyTotalCount.isNullAt(0)) 0L else maxCompanyTotalCount.get(0).asInstanceOf[Number].longValue(),
if (minCompanyTotalCount.isNullAt(0)) 0L else minCompanyTotalCount.get(0).asInstanceOf[Number].longValue(),
if (avgCompanyTotalCount.isNullAt(0)) 0L else avgCompanyTotalCount.get(0).asInstanceOf[Number].longValue(),
if (maxDotTotalCount.isNullAt(0)) 0L else maxDotTotalCount.get(0).asInstanceOf[Number].longValue(),
if (minDotTotalCount.isNullAt(0)) 0L else minDotTotalCount.get(0).asInstanceOf[Number].longValue(),
if (avgDotTotalCount.isNullAt(0)) 0L else avgDotTotalCount.get(0).asInstanceOf[Number].longValue(),
if (maxRouteTotalCount.isNullAt(0)) 0L else maxRouteTotalCount.get(0).asInstanceOf[Number].longValue(),
if (minRouteTotalCount.isNullAt(0)) 0L else minRouteTotalCount.get(0).asInstanceOf[Number].longValue(),
if (avgRouteTotalCount.isNullAt(0)) 0L else avgRouteTotalCount.get(0).asInstanceOf[Number].longValue(),
if (maxToolTotalCount.isNullAt(0)) 0L else maxToolTotalCount.get(0).asInstanceOf[Number].longValue(),
if (minToolTotalCount.isNullAt(0)) 0L else minToolTotalCount.get(0).asInstanceOf[Number].longValue(),
if (avgToolTotalCount.isNullAt(0)) 0L else avgToolTotalCount.get(0).asInstanceOf[Number].longValue(),
if (maxCTypeTotalCount.isNullAt(0)) 0L else maxCTypeTotalCount.get(0).asInstanceOf[Number].longValue(),
if (minCTypeTotalCount.isNullAt(0)) 0L else minCTypeTotalCount.get(0).asInstanceOf[Number].longValue(),
if (avgCTypeTotalCount.isNullAt(0)) 0L else avgCTypeTotalCount.get(0).asInstanceOf[Number].longValue()
)
rows.append(rowInfo)
println(rowInfo)
//释放资源
wayBillDetailByDayDF.unpersist()
areaCountDF.unpersist()
areaCountDF.unpersist()
companyCountDF.unpersist()
dotCountDF.unpersist()
routeCountDF.unpersist()
ttCountDF.unpersist()
cTypeCountDF.unpersist()
})
//TODO 5)将计算好的指标数据写入到kudu数据库中
//5.1:定义指标结果表的shcma信息
val schema: StructType = StructType(Array(
StructField("id", StringType, true, Metadata.empty),
StructField("totalWaybillCount", LongType, true, Metadata.empty),
StructField("maxAreaTotalCount", LongType, true, Metadata.empty),
StructField("minAreaTotalCount", LongType, true, Metadata.empty),
StructField("avgAreaTotalCount", LongType, true, Metadata.empty),
StructField("maxCompanyTotalCount", LongType, true, Metadata.empty),
StructField("minCompanyTotalCount", LongType, true, Metadata.empty),
StructField("avgCompanyTotalCount", LongType, true, Metadata.empty),
StructField("maxDotTotalCount", LongType, true, Metadata.empty),
StructField("minDotTotalCount", LongType, true, Metadata.empty),
StructField("avgDotTotalCount", LongType, true, Metadata.empty),
StructField("maxRouteTotalCount", LongType, true, Metadata.empty),
StructField("minRouteTotalCount", LongType, true, Metadata.empty),
StructField("avgRouteTotalCount", LongType, true, Metadata.empty),
StructField("maxTransportToolTotalCount", LongType, true, Metadata.empty),
StructField("minTransportToolTotalCount", LongType, true, Metadata.empty),
StructField("avgTransportToolTotalCount", LongType, true, Metadata.empty),
StructField("maxCtypeTotalCount", LongType, true, Metadata.empty),
StructField("minCtypeTotalCount", LongType, true, Metadata.empty),
StructField("avgCtypeTotalCount", LongType, true, Metadata.empty)
))
//5.2:使用rdd和schema创建DataFrame
val data: RDD[Row] = sparkSession.sparkContext.makeRDD(rows)
val quotaDF: DataFrame = sparkSession.createDataFrame(data, schema)
//将数据写入到kudu数据库中
save(quotaDF, OfflineTableDefine.wayBillSummary)
sparkSession.stop()
}
}
3、测试验证
启动WaybillDWS