写在前面
Q1:为什么要这么做优化?即此文的目的。
因为全部调用高德API会造成高并发的问题,超出高德的调用规范,这也解释了为什么前面需求二我们只查找毕导用户。因此,在不给高德充钱的前提下,我们采用维表 高德api调用少数可以继续进行优化。
Q2:维表中的数据是什么?为什么还有结合高德api?
维表中的数据是全国的地理位置hash解析,是公开的,我们提前准备好的数据库资源。但是ods层实际用户坐标的地理hash可能有不在维表中的情况,因此有了本需求,即结合高德api完善维表中的信息。
本项目需求运行在Hadoop10单机环境:
- Spark3.2.0
- Flink1.13.6
- Hadoop3.1.4
- jdk1.8
- Sqoop1.4.6
- MySQL5.7
- Hive3.1.2
- Kafka0.11
- Flume1.9.0
- Zookeeper3.4.6
- Hbase2.4
- Redis6.2.0
- Dlink0.7.3
Windows11 开发环境:
- Idea 2020
- Moba
- DBeaver7.0.0
- Scala2.12.17
项目中所需的表,工具类,可以在前文需求中找到。
一、地理位置解析需求优化
在需求二的地理位置解析需求中,我们暂时指定了一位叫毕导的用户测试。将其写入了dwd.event_log_detail
表,在本需求中,我们将清空``dwd.event_log_detail` 表,重新优化代码。
需求二我们做的是:
代码语言:javascript复制1.读取tmp.event_log_splited,计算经纬度的geohash
2.读取dim.area_geo
3.前边两步查询的结果进行join
处理:如果连接条件是geohash5,则产生数据膨胀问题
需求三,现在我们的优化是:
代码语言:javascript复制1.如果用户上报的地理位置计算的geohash5在dim.area_geo维表中不存在
2.则调用高德api查询对应的省市区,并且插入dim.area_geo维表
二、需求代码编写
2.1 新的优化需求技术组件
2.1.1 需求util工具及bean
地图解析util:
代码语言:javascript复制package com.yh.utils
import cn.hutool.json.{JSONObject, JSONUtil}
import org.apache.commons.lang3.StringUtils
import scalaj.http.{Http, HttpRequest}
object GaoUtils {
def getLocaition(latitude:Double,longitude:Double) = {
val request: HttpRequest = Http("https://restapi.amap.com/v3/geocode/regeo")
.param("key", "1ee98f687f76d5428f279cd0dbc5ad85")
.param("location", longitude "," latitude)
val str: String = request.asString.body
// print(str)
if(!StringUtils.isBlank(str) && JSONUtil.isTypeJSON(str)){
val jSONObject: JSONObject = JSONUtil.parseObj(str)
if("1".equals(jSONObject.getStr("status"))){
val jSONObject2: JSONObject = jSONObject.getJSONObject("regeocode").getJSONObject("addressComponent")
val province = jSONObject2.getStr("province")
val city = jSONObject2.getStr("city").replace("\[]","")
val district = jSONObject2.getStr("district")
province "," city "," district
}else{
"null,null,null"
}
}else{
"null,null,null"
}
}
def main(args: Array[String]): Unit = {
val str = GaoUtils.getLocaition(35.727602, 111.256812)
println(str)
}
}
累加器util:
代码语言:javascript复制package com.yh.acc
import org.apache.spark.util.AccumulatorV2
import scala.collection.mutable.ListBuffer
class CutmAccumulatorString extends AccumulatorV2[String, ListBuffer[String]] {
val list = new ListBuffer[String]()
// 当前的累加器是否为初始化状态,只需要判断一下list是否为空即可
override def isZero: Boolean = list.isEmpty
// 复制累加器对象
override def copy(): AccumulatorV2[String, ListBuffer[String]] = {
new CutmAccumulatorString
}
// 重置累加器对象
override def reset(): Unit = {
list.clear()
}
// 向累加器中增加值
override def add(v: String): Unit = {
list.append(v) //list.add()
}
// 重写方法,实现自定义业务
/*def add(v: String, t: String): Unit = {
if (v.contains(t)) {
list.append(v)
}
}*/
// 合并累加器
override def merge(other: AccumulatorV2[String, ListBuffer[String]]): Unit = {
list. =(other.value) //list.addAll()
}
// 获取累加器的结果
override def value: ListBuffer[String] = list
}
bean只用了一行代码存储位置
代码语言:javascript复制package com.yh.bean
case class Location(prov:String,city:String,district:String)
2.1.2 地理数据解析主程序
com.yh.ods_etl.AppLogToDWD_03:
代码语言:javascript复制我们将使用以下技术和组件:
Broadcast 变量:通过 Broadcast 变量,在集群中将 map 集合广播到每个 Spark Executor 节点上,以便在每个节点上使用该集合的副本,提高性能和效率。
累加器(Accumulator):代码中创建了一个自定义的字符串累加器 CutmAccumulatorString,用于收集特定条件下的数据,并在处理完成后获取累加器的值。
UDF(User-Defined Function):通过注册自定义的 UDF 函数 parse_city,在 Spark SQL 中进行经纬度解析和处理。该函数根据经纬度计算出对应的 Geohash5,并根据该 Geohash5 查询相应的省市区信息。如果该 Geohash5 在 dim.area_geo 维度表中不存在,则调用高德 API 查询对应的省市区信息,并插入到 dim.area_geo 维度表中。
DataFrame 操作:代码中使用了 DataFrame 的读取、转换和持久化操作。通过读取 dim.area_geo 维度表和临时表 tmp.event_log_splited,进行数据处理和计算,并最终将结果插入到 dwd.event_log_detail 表中。
代码语言:javascript复制package com.yh.ods_etl
import ch.hsr.geohash.GeoHash
import com.yh.acc.CutmAccumulatorString
import com.yh.bean.Location
import com.yh.utils.{GaoUtils, SparkUtils}
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.sql.DataFrame
import org.apache.spark.storage.StorageLevel
object AppLogToDWD_03 {
def main(args: Array[String]): Unit = {
if(args.length == 0){
println("缺失参数")
System.exit(0)
}
val spark = SparkUtils.getSparkSession("AppLogToDWD_03")
spark.sparkContext.setLogLevel("WARN")
import spark.implicits._
val dt = args(0)
val df: DataFrame = spark.read.table("dim.area_geo") //1.读取dim.area_geo维度表
val rdd = df.rdd.map(row => {
val geohash5 = row.getAs[String]("geohash5")
val prov = row.getAs[String]("prov")
val city = row.getAs[String]("city")
val district = row.getAs[String]("district")
(geohash5, Location(prov, city, district))
})
val map = rdd.collectAsMap() //将rdd转换成map集合 collectAsMap、collect
val bc: Broadcast[collection.Map[String, Location]] = spark.sparkContext.broadcast(map)
//val list = new ListBuffer[String]
//1.定义一个累加器对象
val acc1 = new CutmAccumulatorString
//2.注册一个累加器
spark.sparkContext.register(acc1)
spark.udf.register("parse_city",(latitude:Double,longitude:Double) => {
//计算用户上报经纬度的geohash
val hash5 = GeoHash.geoHashStringWithCharacterPrecision(latitude, longitude, 5)
//从map集合(Geohash字典)中获取对应的省市区
val option = bc.value.get(hash5)
if(option.isDefined){
val value = option.get
value.prov "," value.city "," value.district
}else{
//"null,null,null"
val str: String = GaoUtils.getLocaition(latitude, longitude)
//list.append(str s",${longitude},${latitude},${hash5}")
//3.将元素添加到累加器
acc1.add(str s",${longitude},${latitude},${hash5}")
str
}
})
val df2: DataFrame = spark.sql(
s"""
| select
| *
| ,split(parse_city(round(latitude,6),round(longitude,6)),',') c1
| from tmp.event_log_splited where dt = '${dt}'
|
|""".stripMargin)
df2.persist(StorageLevel.MEMORY_AND_DISK)
df2.createTempView("t2")
//val rdd2: RDD[Row] = df2.where($"c1"(0) === "null").rdd
spark.sql(
s"""
|
|insert overwrite table dwd.event_log_detail
|partition(dt='${dt}')
|select
| account
| ,appid
| ,appversion
| ,carrier
| ,deviceid
| ,devicetype
| ,eventid
| ,ip
| ,latitude
| ,longitude
| ,nettype
| ,osname
| ,osversion
| ,properties
| ,releasechannel
| ,resolution
| ,sessionid
| ,`timestamp`
| ,newsessionid
| ,c1[0] prov
| ,c1[1] city
| ,c1[2] district
|from t2
|
|""".stripMargin)
//4.从累加器获取集合
val list = acc1.value
print(list)
val list2 = list.map(v => {
val arr: Array[String] = v.split(",")
(arr(0), arr(1), arr(2), arr(3), arr(4), arr(5))
}).filter(v => v._1 != "null")
print(list2)
//val rdd2: RDD[(String, String, String)] = spark.sparkContext.makeRDD(list2)
val df3: DataFrame = list2.toDF("prov", "city", "district","longitude","latitude","hash5")
df3.createTempView("t3")
spark.sql(
// CAST('latitude' AS DOUBLE)
// CAST('longitude' AS DOUBLE), CAST('latitude' AS DOUBLE)
"""
|
|insert into table dim.area_geo
|select
| prov,city,district,null,CAST('longitude' AS DOUBLE), CAST('latitude' AS DOUBLE),hash5,null
|from t3
|
|""".stripMargin)
spark.stop()
}
}
2.2 需求代码测试
测试在本地模式同样需要先设置主函数日期参数。
2.2.1 AppLogToDWD_03本地测试运行:
代码运行成功后,能够看到查找到的不存在于dim维表地图中的地理数据信息,我们将其存入累加器,并且写入维表。
这里登录高德开发平台,可以看到逆地理编码的调用次数已经有所增加,但是增加的查找仅仅只是维表中不存在的地理位置数据,所以api的使用次数经过优化后是可控的。
三、该需求用到的测试Demo
广播变量,累加器都是本项目的需求。
代码语言:javascript复制package com.yh.test
import com.yh.acc.CutmAccumulator
import com.yh.utils.SparkUtils
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import scala.collection.mutable.ListBuffer
object Test6 {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkUtils.getSparkSession("xx")
val sc: SparkContext = spark.sparkContext
//累加器
/*val rdd: RDD[Int] = sc.makeRDD(1 to 10)
var sum = 0
val c1: LongAccumulator = sc.longAccumulator("c1")
rdd.foreach(v => {
//sum = v
c1.add(v)
})
println(c1.value)*/
//1.定义一个累加器
val acc1 = new CutmAccumulator
//2.注册累加器
sc.register(acc1)
val rdd: RDD[Int] = sc.makeRDD(1 to 10,2)
rdd.foreach(v => {
if(v %2 == 0){
acc1.add(v)
}
})
val list: ListBuffer[Int] = acc1.value
println(list)
sc.stop()
}
}
代码语言:javascript复制package com.yh.test
import com.yh.acc.CutmAccumulator
import com.yh.utils.SparkUtils
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import scala.collection.mutable.ListBuffer
object Test6 {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkUtils.getSparkSession("xx")
val sc: SparkContext = spark.sparkContext
//累加器
/*val rdd: RDD[Int] = sc.makeRDD(1 to 10)
var sum = 0
val c1: LongAccumulator = sc.longAccumulator("c1")
rdd.foreach(v => {
//sum = v
c1.add(v)
})
println(c1.value)*/
//1.定义一个累加器
val acc1 = new CutmAccumulator
//2.注册累加器
sc.register(acc1)
val rdd: RDD[Int] = sc.makeRDD(1 to 10,2)
rdd.foreach(v => {
if(v %2 == 0){
acc1.add(v)
}
})
val list: ListBuffer[Int] = acc1.value
println(list)
sc.stop()
}
}