基于Spark Graphx实现ID-Mapping

2020-10-16 16:42:02 浏览数 (2)

一. 背景

通常公司有产品矩阵,而每个产品都有自己的注册账号产生的用户ID。从公司全局,整合用户表,用户行为数据来看,确定不同产品的用户ID是相同一个人非常重要,这关系到用户行为分析,用户画像,用户数据挖掘等业务需求。

二. 开发实现

1. 通过采集的数据内容,对应用户数据样例如下:

代码语言:javascript复制
{"name":"zs","uid":"u_001","account":"","email":"","phoneNbr":"15857635755","birthday":"","isRegistered":"","isLogin":"","addr":"","gender":"","phone":{"imei":"imei_zs_001","mac":"mac_zs_001","imsi":"imsi_zs_001","androidId":"androidId_zs_001","deviceId":"deviceId_zs_001","uuid":"uuid_zs_001"}}
{"name":"zs","uid":"u_001","account":"","email":"","phoneNbr":"15857635755","birthday":"","isRegistered":"","isLogin":"","addr":"","gender":"","phone":{"imei":"imei_zs_001","mac":"mac_zs_001","androidId":"androidId_zs_001","deviceId":"deviceId_zs_001"}}
{"name":"zs","uid":"u_001","phone":{"imei":"imei_zs_002","mac":"mac_zs_002","imsi":"imsi_zs_002","androidId":"androidId_zs_002","deviceId":"deviceId_zs_001","uuid":"uuid_zs_002"}}
{"name":"zs","uid":"u_001","phone":{"mac":"mac_zs_002","imsi":"imsi_zs_002","deviceId":"deviceId_zs_001","uuid":"uuid_zs_001"}}
{"name":"zs","uid":"u_001","phone":{"imei":"imei_zs_002","mac":"mac_zs_002","imsi":"imsi_zs_002","androidId":"androidId_zs_001","deviceId":"deviceId_zs_001","uuid":"uuid_zs_001"}}
{"name":"ls","uid":"u_002","phone":{"imei":"imei_ls_001","mac":"mac_ls_001","imsi":"imsi_ls_001","androidId":"androidId_ls_001","deviceId":"deviceId_ls_001","uuid":"uuid_ls_001"}}
{"name":"ls","uid":"u_002","phone":{"mac":"mac_ls_001","imsi":"imsi_ls_002","androidId":"androidId_ls_002","deviceId":"deviceId_ls_001","uuid":"uuid_ls_001"}}
{"name":"ls","uid":"u_002","phone":{"mac":"mac_ls_001","imsi":"imsi_ls_002","androidId":"androidId_ls_002","deviceId":"deviceId_ls_001","uuid":"uuid_ls_001"}}
{"name":"ls","uid":"u_002","phone":{"imei":"imei_ls_002","mac":"mac_ls_001","imsi":"imsi_ls_002","androidId":"androidId_ls_002","deviceId":"deviceId_ls_002","uuid":"uuid_ls_002"}}

2. 对应方案内容

通常我们的数据内容,涵盖很多的唯一ID字段,如:用户id, 电子邮件,以及在移动APP中经常有使用的唯一设备信息内容(imei, imsi,mac, Device ID, etc...),通过这些唯一id的字段进行mapping设计,设别是否是同一个人并产生OneId。

3.spark代码实现

3.1 初始化One Id

代码语言:javascript复制

import com.alibaba.fastjson.JSON
import org.apache.commons.lang3.StringUtils
import org.apache.spark.graphx.{Edge, Graph, VertexId, VertexRDD}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession

object IdMapFirst {
  def main(args: Array[String]): Unit = {
    val spark: SparkSession = SparkSession.builder()
      .appName("id-mapping")
      .master("local[1]")
      .getOrCreate()
    //将rdd变成df
    import spark.implicits._


    val rawData = spark.read.textFile("file:///E://code//study//spark_ml//file//userInfo.json")
    val data: RDD[Array[String]] = rawData.rdd.map(line => {
      //将每行数据解析成json对象
      val jsonObj = JSON.parseObject(line)

      // 从json对象中取user对象
      //      val userObj = jsonObj.getJSONObject("user")
      val uid = jsonObj.getString("uid")

      // 从user对象中取phone对象
      val phoneObj = jsonObj.getJSONObject("phone")
      val imei = phoneObj.getOrDefault("imei","").toString
      val mac = phoneObj.getOrDefault("mac","").toString
      val imsi = phoneObj.getOrDefault("imsi","").toString
      val androidId = phoneObj.getOrDefault("androidId","").toString
      val deviceId = phoneObj.getOrDefault("deviceId","").toString
      val uuid = phoneObj.getOrDefault("uuid","").toString
      Array(uid, imei, mac, imsi, androidId, deviceId, uuid).filter(StringUtils.isNotBlank(_))
    }
    )

    val vertices: RDD[(Long, String)] = data.flatMap(arr => {
      for (id <- arr) yield (id.hashCode.toLong, id)
    })
    vertices.foreach(ele => println(ele._1   " : "   ele._2))

    val edges: RDD[Edge[String]] = data.flatMap(arr => {
      for (i <- 0 to arr.length - 2; j <- i   1 to arr.length - 1) yield Edge(arr(i).hashCode.toLong, arr(j).hashCode.toLong, "")
    })
      .map(edge => (edge, 1)).reduceByKey(_   _)
      .filter(tp => tp._2 > 2)
      .map(x => x._1)

    //用 点集合 和 边集合 构造一张图  使用Graph算法
    val graph = Graph(vertices,edges)


    //并调用最大连同子图算法VertexRDD[VertexId] ==>rdd 里面装的元组(Long值,组中最小值)
    val res: VertexRDD[VertexId] = graph.connectedComponents().vertices
    val firstIds =   res.toDF("id","guid")

    firstIds.write.parquet("file:///E://code//study//spark_ml//file//userIds_demo")

    spark.stop()
  }

}

3.2 第二次数据和初始化数据合并

代码语言:javascript复制
import com.alibaba.fastjson.JSON
import org.apache.commons.lang3.StringUtils
import org.apache.spark.graphx.{Edge, Graph, VertexId, VertexRDD}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Row, SparkSession}

object IdMapSecond {
  def main(args: Array[String]): Unit = {
    val spark: SparkSession = SparkSession.builder()
      .appName("id-mapping")
      .master("local[1]")
      .getOrCreate()

    val rawData = spark.read.textFile("file:///E://code//study//spark_ml//file//userInfo.json")
    val data: RDD[Array[String]] = rawData.rdd.map(line => {
        //将每行数据解析成json对象
        val jsonObj = JSON.parseObject(line)

        // 从json对象中取user对象
        //      val userObj = jsonObj.getJSONObject("user")
        val uid = jsonObj.getString("uid")

        // 从user对象中取phone对象
      val phoneObj = jsonObj.getJSONObject("phone")
      val imei = phoneObj.getOrDefault("imei","").toString
      val mac = phoneObj.getOrDefault("mac","").toString
      val imsi = phoneObj.getOrDefault("imsi","").toString
      val androidId = phoneObj.getOrDefault("androidId","").toString
      val deviceId = phoneObj.getOrDefault("deviceId","").toString
      val uuid = phoneObj.getOrDefault("uuid","").toString
      Array(uid, imei, mac, imsi, androidId, deviceId, uuid).filter(StringUtils.isNotBlank(_))
      }
    )

    val vertices: RDD[(Long, String)] = data.flatMap(arr => {
      for (id <- arr) yield (id.hashCode.toLong, id)
    })

    vertices.foreach(ele => println(ele._1   " : "   ele._2))

    val edges: RDD[Edge[String]] = data.flatMap(arr => {
      //用双重for循环的方法让数组中所有的两两组合成边
      for (i <- 0 to arr.length - 2; j <- i   1 to arr.length - 1) yield Edge(arr(i).hashCode.toLong, arr(j).hashCode.toLong, "")
    })
      .map(edge => (edge, 1)).reduceByKey(_   _)
      //过滤将重复次数<5(经验阈值)的边去掉,
      .filter(tp => tp._2 > 2)
      .map(x => x._1)

    //从初次的guid读取
    val firstIdmap = spark.read.parquet("file:///E://code//study//spark_ml//file//userIds_demo2")
    val firstVertices = firstIdmap.rdd.map(
      {
        case Row(id_hashcode: VertexId, guid: VertexId) =>
          (id_hashcode, "")
      }
    )

    val firstEdges = firstIdmap.rdd.map(row => {
        val id_hashcode = row.getAs[VertexId]("id")
        val guid = row.getAs[VertexId]("guid")
        Edge(id_hashcode,guid,"")
      }
    )

    // 通过vertex, edge沟通graph
    val graph = Graph(vertices.union(firstVertices),edges.union(firstEdges))

    //result:  VertexRDD[VertexId] => rdd(点id-long, 组中最小值)
    val result:  VertexRDD[VertexId] = graph.connectedComponents().vertices


    val idMap = firstIdmap.rdd.map(row => {
      val id_hashcode = row.getAs[VertexId]("id")
      val guid = row.getAs[VertexId]("guid")
      (id_hashcode,guid)
    }
    ).collectAsMap()

    val bcMap = spark.sparkContext.broadcast(idMap)

    import spark.implicits._

    val todayIdmap = result.map(tup => (tup._2,tup._1))
      .groupByKey()
      .mapPartitions( iter => {
        iter.map(tup => {
          val idmapMap = bcMap.value

          var todayGuid = tup._1
          val ids = tup._2
          //遍历id,挨个映射查找
          var idFind = false
          for (id <- ids if !idFind) {
            val getGuid = idmapMap.get(id)
            if (getGuid.isDefined) {
              todayGuid = getGuid.get
              idFind = true
            }
          }
          (todayGuid, ids)
        })
      })
      .flatMap(tup => {
        val ids = tup._2
        val guid = tup._1
        for(ele <- ids) yield (ele, guid)
      }).toDF("id", "guid")

    todayIdmap.show()

    todayIdmap.createOrReplaceTempView("id_guid");


    val data2 = data.flatMap( arr => {
      for(id <- arr) yield (id.hashCode.toLong, id)
    }).toDF("id", "str_id")

    data2.createOrReplaceTempView("id_original")

    val output = spark.sql("""select
                             |    t1.id as id_hashcode,
                             |    t2.str_id as id,
                             |    t1.guid as guid
                             |from id_guid t1
                             |left join id_original t2 on t1.id = t2.id
                             |group by t1.id, t2.str_id, t1.guid
                             |order by guid""".stripMargin
    )
    output.show()

  }

}

输出展示:

代码语言:javascript复制
 ----------- ---------------- ----------- 
|id_hashcode|              id|       guid|
 ----------- ---------------- ----------- 
|-1381665248|androidId_zs_001|-1908595409|
|  110929767|           u_001|-1908595409|
|-1908595409|     uuid_zs_001|-1908595409|
|-1018465903|     imsi_zs_002|-1908595409|
|-1753513447| deviceId_zs_001|-1908595409|
| -714652388|      mac_zs_002|-1908595409|
|-1908595408|     uuid_zs_002|-1908595408|
|-1884715312|     imei_ls_001|-1884715312|
|-1884715311|     imei_ls_002|-1884715311|
|-1782473362|androidId_ls_001|-1782473362|
| 2140645735| deviceId_ls_001|-1782473361|
|-1419274017|     imsi_ls_002|-1782473361|
|  110929768|           u_002|-1782473361|
| 1985563773|     uuid_ls_001|-1782473361|
|-1115460503|      mac_ls_001|-1782473361|
|-1782473361|androidId_ls_002|-1782473361|
|-1483907198|     imei_zs_001|-1483907198|
|-1483907197|     imei_zs_002|-1483907197|
|-1419274018|     imsi_ls_001|-1419274018|
|-1381665247|androidId_zs_002|-1381665247|
 ----------- ---------------- ----------- 

当然guid也可以采用MD5/UUID的方式输出。

代码语言:javascript复制
    val output = spark.sql("""select
                |    t1.id as id_hashcode,
                |    t2.str_id as id,
                |    t1.guid as guid
                |from id_guid t1
                |left join id_original t2 on t1.id = t2.id
                |group by t1.id, t2.str_id, t1.guid
                |order by guid desc""".stripMargin
    )

    val info: RDD[(String, String, String)] = output.rdd.map(
      row => {
        val id = row.getAs[String]("id")
        val oneId = row.getAs[Long]("guid").toString
        val oneId_Md5 = DigestUtils.md5Hex(oneId)
        (id, oneId,oneId_Md5)
      }
    )
    
   <dependency>
      <groupId>commons-codec</groupId>
      <artifactId>commons-codec</artifactId>
      <version>1.13</version>
    </dependency>

    <dependency>
      <groupId>commons-lang</groupId>
      <artifactId>commons-lang</artifactId>
      <version>2.6</version>
    </dependency>

0 人点赞