【Spark数仓项目】需求三:地图位置解析进一步优化

2024-07-25 15:47:46 浏览数 (2)

写在前面

Q1:为什么要这么做优化?即此文的目的。

因为全部调用高德API会造成高并发的问题,超出高德的调用规范,这也解释了为什么前面需求二我们只查找毕导用户。因此,在不给高德充钱的前提下,我们采用维表 高德api调用少数可以继续进行优化。

Q2:维表中的数据是什么?为什么还有结合高德api?

维表中的数据是全国的地理位置hash解析,是公开的,我们提前准备好的数据库资源。但是ods层实际用户坐标的地理hash可能有不在维表中的情况,因此有了本需求,即结合高德api完善维表中的信息。

本项目需求运行在Hadoop10单机环境:

  • Spark3.2.0
  • Flink1.13.6
  • Hadoop3.1.4
  • jdk1.8
  • Sqoop1.4.6
  • MySQL5.7
  • Hive3.1.2
  • Kafka0.11
  • Flume1.9.0
  • Zookeeper3.4.6
  • Hbase2.4
  • Redis6.2.0
  • Dlink0.7.3

Windows11 开发环境:

  • Idea 2020
  • Moba
  • DBeaver7.0.0
  • Scala2.12.17

项目中所需的表,工具类,可以在前文需求中找到。

一、地理位置解析需求优化

在需求二的地理位置解析需求中,我们暂时指定了一位叫毕导的用户测试。将其写入了dwd.event_log_detail表,在本需求中,我们将清空``dwd.event_log_detail` 表,重新优化代码。

需求二我们做的是:

代码语言:javascript复制
1.读取tmp.event_log_splited,计算经纬度的geohash
2.读取dim.area_geo
3.前边两步查询的结果进行join
处理:如果连接条件是geohash5,则产生数据膨胀问题

需求三,现在我们的优化是:

代码语言:javascript复制
1.如果用户上报的地理位置计算的geohash5在dim.area_geo维表中不存在
2.则调用高德api查询对应的省市区,并且插入dim.area_geo维表

二、需求代码编写

2.1 新的优化需求技术组件

2.1.1 需求util工具及bean

地图解析util:

代码语言:javascript复制
package com.yh.utils

import cn.hutool.json.{JSONObject, JSONUtil}
import org.apache.commons.lang3.StringUtils
import scalaj.http.{Http, HttpRequest}

object GaoUtils {


  def getLocaition(latitude:Double,longitude:Double) = {
    val request: HttpRequest = Http("https://restapi.amap.com/v3/geocode/regeo")
      .param("key", "1ee98f687f76d5428f279cd0dbc5ad85")
      .param("location", longitude "," latitude)

    val str: String = request.asString.body
   // print(str)

    if(!StringUtils.isBlank(str) && JSONUtil.isTypeJSON(str)){
      val jSONObject: JSONObject = JSONUtil.parseObj(str)
      if("1".equals(jSONObject.getStr("status"))){
        val jSONObject2: JSONObject = jSONObject.getJSONObject("regeocode").getJSONObject("addressComponent")
        val province = jSONObject2.getStr("province")
        val city = jSONObject2.getStr("city").replace("\[]","")
        val district = jSONObject2.getStr("district")
        province "," city "," district
      }else{
        "null,null,null"
      }
    }else{
      "null,null,null"
    }
  }

  def main(args: Array[String]): Unit = {
    val str = GaoUtils.getLocaition(35.727602, 111.256812)
    println(str)
  }
}

累加器util:

代码语言:javascript复制
package com.yh.acc

import org.apache.spark.util.AccumulatorV2

import scala.collection.mutable.ListBuffer

class CutmAccumulatorString extends AccumulatorV2[String, ListBuffer[String]] {

  val list = new ListBuffer[String]()

  // 当前的累加器是否为初始化状态,只需要判断一下list是否为空即可
  override def isZero: Boolean = list.isEmpty

  // 复制累加器对象
  override def copy(): AccumulatorV2[String, ListBuffer[String]] = {
    new CutmAccumulatorString
  }

  // 重置累加器对象
  override def reset(): Unit = {
    list.clear()
  }

  // 向累加器中增加值
  override def add(v: String): Unit = {
    list.append(v)   //list.add()
  }

  // 重写方法,实现自定义业务
  /*def add(v: String, t: String): Unit = {
    if (v.contains(t)) {
      list.append(v)
    }
  }*/

  // 合并累加器
  override def merge(other: AccumulatorV2[String, ListBuffer[String]]): Unit = {
    list.  =(other.value)  //list.addAll()
  }

  // 获取累加器的结果
  override def value: ListBuffer[String] = list

}

bean只用了一行代码存储位置

代码语言:javascript复制
package com.yh.bean

case class Location(prov:String,city:String,district:String)
2.1.2 地理数据解析主程序

com.yh.ods_etl.AppLogToDWD_03:

代码语言:javascript复制
我们将使用以下技术和组件:

Broadcast 变量:通过 Broadcast 变量,在集群中将 map 集合广播到每个 Spark Executor 节点上,以便在每个节点上使用该集合的副本,提高性能和效率。

累加器(Accumulator):代码中创建了一个自定义的字符串累加器 CutmAccumulatorString,用于收集特定条件下的数据,并在处理完成后获取累加器的值。

UDF(User-Defined Function):通过注册自定义的 UDF 函数 parse_city,在 Spark SQL 中进行经纬度解析和处理。该函数根据经纬度计算出对应的 Geohash5,并根据该 Geohash5 查询相应的省市区信息。如果该 Geohash5 在 dim.area_geo 维度表中不存在,则调用高德 API 查询对应的省市区信息,并插入到 dim.area_geo 维度表中。

DataFrame 操作:代码中使用了 DataFrame 的读取、转换和持久化操作。通过读取 dim.area_geo 维度表和临时表 tmp.event_log_splited,进行数据处理和计算,并最终将结果插入到 dwd.event_log_detail 表中。
代码语言:javascript复制
package com.yh.ods_etl

import ch.hsr.geohash.GeoHash
import com.yh.acc.CutmAccumulatorString
import com.yh.bean.Location
import com.yh.utils.{GaoUtils, SparkUtils}
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.sql.DataFrame
import org.apache.spark.storage.StorageLevel

object AppLogToDWD_03 {

  def main(args: Array[String]): Unit = {
    if(args.length == 0){
      println("缺失参数")
      System.exit(0)
    }

    val spark = SparkUtils.getSparkSession("AppLogToDWD_03")
    spark.sparkContext.setLogLevel("WARN")
    import spark.implicits._

    val dt = args(0)


    val df: DataFrame = spark.read.table("dim.area_geo")   //1.读取dim.area_geo维度表
    val rdd = df.rdd.map(row => {
      val geohash5 = row.getAs[String]("geohash5")
      val prov = row.getAs[String]("prov")
      val city = row.getAs[String]("city")
      val district = row.getAs[String]("district")
      (geohash5, Location(prov, city, district))
    })
    val map = rdd.collectAsMap()  //将rdd转换成map集合  collectAsMap、collect
    val bc: Broadcast[collection.Map[String, Location]] = spark.sparkContext.broadcast(map)

    //val list = new ListBuffer[String]
    //1.定义一个累加器对象
    val acc1 = new CutmAccumulatorString
    //2.注册一个累加器
    spark.sparkContext.register(acc1)

    spark.udf.register("parse_city",(latitude:Double,longitude:Double) => {
      //计算用户上报经纬度的geohash
      val hash5 = GeoHash.geoHashStringWithCharacterPrecision(latitude, longitude, 5)
      //从map集合(Geohash字典)中获取对应的省市区
      val option = bc.value.get(hash5)
      if(option.isDefined){
        val value = option.get
        value.prov "," value.city "," value.district
      }else{
        //"null,null,null"
        val str: String = GaoUtils.getLocaition(latitude, longitude)
        //list.append(str s",${longitude},${latitude},${hash5}")
        //3.将元素添加到累加器
        acc1.add(str s",${longitude},${latitude},${hash5}")
        str
      }
    })


    val df2: DataFrame = spark.sql(
      s"""
         |	select
         |	   *
         |	  ,split(parse_city(round(latitude,6),round(longitude,6)),',') c1
         |	from tmp.event_log_splited where dt = '${dt}'
         |
         |""".stripMargin)

    df2.persist(StorageLevel.MEMORY_AND_DISK)
    df2.createTempView("t2")

    //val rdd2: RDD[Row] = df2.where($"c1"(0) === "null").rdd

    spark.sql(
      s"""
         |
         |insert overwrite table dwd.event_log_detail
         |partition(dt='${dt}')
         |select
         |  account
         |  ,appid
         |  ,appversion
         |  ,carrier
         |  ,deviceid
         |  ,devicetype
         |  ,eventid
         |  ,ip
         |  ,latitude
         |  ,longitude
         |  ,nettype
         |  ,osname
         |  ,osversion
         |  ,properties
         |  ,releasechannel
         |  ,resolution
         |  ,sessionid
         |  ,`timestamp`
         |  ,newsessionid
         |  ,c1[0]  prov
         |  ,c1[1]  city
         |  ,c1[2]  district
         |from t2
         |
         |""".stripMargin)

    //4.从累加器获取集合
    val list = acc1.value
    print(list)

    val list2 = list.map(v => {
      val arr: Array[String] = v.split(",")
      (arr(0), arr(1), arr(2), arr(3), arr(4), arr(5))
    }).filter(v => v._1 != "null")

    print(list2)
    //val rdd2: RDD[(String, String, String)] = spark.sparkContext.makeRDD(list2)
    val df3: DataFrame = list2.toDF("prov", "city", "district","longitude","latitude","hash5")
    df3.createTempView("t3")
    spark.sql(


//      CAST('latitude' AS DOUBLE)
//      CAST('longitude' AS DOUBLE), CAST('latitude' AS DOUBLE)
      """
        |
        |insert into table dim.area_geo
        |select
        |   prov,city,district,null,CAST('longitude' AS DOUBLE), CAST('latitude' AS DOUBLE),hash5,null
        |from t3
        |
        |""".stripMargin)

    spark.stop()
  }
}

2.2 需求代码测试

测试在本地模式同样需要先设置主函数日期参数。

2.2.1 AppLogToDWD_03本地测试运行:

代码运行成功后,能够看到查找到的不存在于dim维表地图中的地理数据信息,我们将其存入累加器,并且写入维表。

在这里插入图片描述在这里插入图片描述

这里登录高德开发平台,可以看到逆地理编码的调用次数已经有所增加,但是增加的查找仅仅只是维表中不存在的地理位置数据,所以api的使用次数经过优化后是可控的。

三、该需求用到的测试Demo

广播变量,累加器都是本项目的需求。

代码语言:javascript复制
package com.yh.test

import com.yh.acc.CutmAccumulator
import com.yh.utils.SparkUtils
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession

import scala.collection.mutable.ListBuffer

object Test6 {

  def main(args: Array[String]): Unit = {
    val spark: SparkSession = SparkUtils.getSparkSession("xx")

    val sc: SparkContext = spark.sparkContext

    //累加器
    /*val rdd: RDD[Int] = sc.makeRDD(1 to 10)
    var sum = 0
    val c1: LongAccumulator = sc.longAccumulator("c1")
    rdd.foreach(v => {
      //sum  = v
      c1.add(v)
    })
    println(c1.value)*/

    //1.定义一个累加器
    val acc1 = new CutmAccumulator
    //2.注册累加器
    sc.register(acc1)

    val rdd: RDD[Int] = sc.makeRDD(1 to 10,2)
    rdd.foreach(v => {
      if(v %2 == 0){
        acc1.add(v)
      }
    })

    val list: ListBuffer[Int] = acc1.value
    println(list)

    sc.stop()
  }
}
代码语言:javascript复制
package com.yh.test

import com.yh.acc.CutmAccumulator
import com.yh.utils.SparkUtils
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession

import scala.collection.mutable.ListBuffer

object Test6 {

  def main(args: Array[String]): Unit = {
    val spark: SparkSession = SparkUtils.getSparkSession("xx")

    val sc: SparkContext = spark.sparkContext

    //累加器
    /*val rdd: RDD[Int] = sc.makeRDD(1 to 10)
    var sum = 0
    val c1: LongAccumulator = sc.longAccumulator("c1")
    rdd.foreach(v => {
      //sum  = v
      c1.add(v)
    })
    println(c1.value)*/

    //1.定义一个累加器
    val acc1 = new CutmAccumulator
    //2.注册累加器
    sc.register(acc1)

    val rdd: RDD[Int] = sc.makeRDD(1 to 10,2)
    rdd.foreach(v => {
      if(v %2 == 0){
        acc1.add(v)
      }
    })

    val list: ListBuffer[Int] = acc1.value
    println(list)

    sc.stop()
  }
}

0 人点赞