一. 背景
通常公司有产品矩阵,而每个产品都有自己的注册账号产生的用户ID。从公司全局,整合用户表,用户行为数据来看,确定不同产品的用户ID是相同一个人非常重要,这关系到用户行为分析,用户画像,用户数据挖掘等业务需求。
二. 开发实现
1. 通过采集的数据内容,对应用户数据样例如下:
代码语言:javascript复制{"name":"zs","uid":"u_001","account":"","email":"","phoneNbr":"15857635755","birthday":"","isRegistered":"","isLogin":"","addr":"","gender":"","phone":{"imei":"imei_zs_001","mac":"mac_zs_001","imsi":"imsi_zs_001","androidId":"androidId_zs_001","deviceId":"deviceId_zs_001","uuid":"uuid_zs_001"}}
{"name":"zs","uid":"u_001","account":"","email":"","phoneNbr":"15857635755","birthday":"","isRegistered":"","isLogin":"","addr":"","gender":"","phone":{"imei":"imei_zs_001","mac":"mac_zs_001","androidId":"androidId_zs_001","deviceId":"deviceId_zs_001"}}
{"name":"zs","uid":"u_001","phone":{"imei":"imei_zs_002","mac":"mac_zs_002","imsi":"imsi_zs_002","androidId":"androidId_zs_002","deviceId":"deviceId_zs_001","uuid":"uuid_zs_002"}}
{"name":"zs","uid":"u_001","phone":{"mac":"mac_zs_002","imsi":"imsi_zs_002","deviceId":"deviceId_zs_001","uuid":"uuid_zs_001"}}
{"name":"zs","uid":"u_001","phone":{"imei":"imei_zs_002","mac":"mac_zs_002","imsi":"imsi_zs_002","androidId":"androidId_zs_001","deviceId":"deviceId_zs_001","uuid":"uuid_zs_001"}}
{"name":"ls","uid":"u_002","phone":{"imei":"imei_ls_001","mac":"mac_ls_001","imsi":"imsi_ls_001","androidId":"androidId_ls_001","deviceId":"deviceId_ls_001","uuid":"uuid_ls_001"}}
{"name":"ls","uid":"u_002","phone":{"mac":"mac_ls_001","imsi":"imsi_ls_002","androidId":"androidId_ls_002","deviceId":"deviceId_ls_001","uuid":"uuid_ls_001"}}
{"name":"ls","uid":"u_002","phone":{"mac":"mac_ls_001","imsi":"imsi_ls_002","androidId":"androidId_ls_002","deviceId":"deviceId_ls_001","uuid":"uuid_ls_001"}}
{"name":"ls","uid":"u_002","phone":{"imei":"imei_ls_002","mac":"mac_ls_001","imsi":"imsi_ls_002","androidId":"androidId_ls_002","deviceId":"deviceId_ls_002","uuid":"uuid_ls_002"}}
2. 对应方案内容
通常我们的数据内容,涵盖很多的唯一ID字段,如:用户id, 电子邮件,以及在移动APP中经常有使用的唯一设备信息内容(imei, imsi,mac, Device ID, etc...),通过这些唯一id的字段进行mapping设计,设别是否是同一个人并产生OneId。
3.spark代码实现
3.1 初始化One Id
代码语言:javascript复制
import com.alibaba.fastjson.JSON
import org.apache.commons.lang3.StringUtils
import org.apache.spark.graphx.{Edge, Graph, VertexId, VertexRDD}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
object IdMapFirst {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession.builder()
.appName("id-mapping")
.master("local[1]")
.getOrCreate()
//将rdd变成df
import spark.implicits._
val rawData = spark.read.textFile("file:///E://code//study//spark_ml//file//userInfo.json")
val data: RDD[Array[String]] = rawData.rdd.map(line => {
//将每行数据解析成json对象
val jsonObj = JSON.parseObject(line)
// 从json对象中取user对象
// val userObj = jsonObj.getJSONObject("user")
val uid = jsonObj.getString("uid")
// 从user对象中取phone对象
val phoneObj = jsonObj.getJSONObject("phone")
val imei = phoneObj.getOrDefault("imei","").toString
val mac = phoneObj.getOrDefault("mac","").toString
val imsi = phoneObj.getOrDefault("imsi","").toString
val androidId = phoneObj.getOrDefault("androidId","").toString
val deviceId = phoneObj.getOrDefault("deviceId","").toString
val uuid = phoneObj.getOrDefault("uuid","").toString
Array(uid, imei, mac, imsi, androidId, deviceId, uuid).filter(StringUtils.isNotBlank(_))
}
)
val vertices: RDD[(Long, String)] = data.flatMap(arr => {
for (id <- arr) yield (id.hashCode.toLong, id)
})
vertices.foreach(ele => println(ele._1 " : " ele._2))
val edges: RDD[Edge[String]] = data.flatMap(arr => {
for (i <- 0 to arr.length - 2; j <- i 1 to arr.length - 1) yield Edge(arr(i).hashCode.toLong, arr(j).hashCode.toLong, "")
})
.map(edge => (edge, 1)).reduceByKey(_ _)
.filter(tp => tp._2 > 2)
.map(x => x._1)
//用 点集合 和 边集合 构造一张图 使用Graph算法
val graph = Graph(vertices,edges)
//并调用最大连同子图算法VertexRDD[VertexId] ==>rdd 里面装的元组(Long值,组中最小值)
val res: VertexRDD[VertexId] = graph.connectedComponents().vertices
val firstIds = res.toDF("id","guid")
firstIds.write.parquet("file:///E://code//study//spark_ml//file//userIds_demo")
spark.stop()
}
}
3.2 第二次数据和初始化数据合并
代码语言:javascript复制import com.alibaba.fastjson.JSON
import org.apache.commons.lang3.StringUtils
import org.apache.spark.graphx.{Edge, Graph, VertexId, VertexRDD}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Row, SparkSession}
object IdMapSecond {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession.builder()
.appName("id-mapping")
.master("local[1]")
.getOrCreate()
val rawData = spark.read.textFile("file:///E://code//study//spark_ml//file//userInfo.json")
val data: RDD[Array[String]] = rawData.rdd.map(line => {
//将每行数据解析成json对象
val jsonObj = JSON.parseObject(line)
// 从json对象中取user对象
// val userObj = jsonObj.getJSONObject("user")
val uid = jsonObj.getString("uid")
// 从user对象中取phone对象
val phoneObj = jsonObj.getJSONObject("phone")
val imei = phoneObj.getOrDefault("imei","").toString
val mac = phoneObj.getOrDefault("mac","").toString
val imsi = phoneObj.getOrDefault("imsi","").toString
val androidId = phoneObj.getOrDefault("androidId","").toString
val deviceId = phoneObj.getOrDefault("deviceId","").toString
val uuid = phoneObj.getOrDefault("uuid","").toString
Array(uid, imei, mac, imsi, androidId, deviceId, uuid).filter(StringUtils.isNotBlank(_))
}
)
val vertices: RDD[(Long, String)] = data.flatMap(arr => {
for (id <- arr) yield (id.hashCode.toLong, id)
})
vertices.foreach(ele => println(ele._1 " : " ele._2))
val edges: RDD[Edge[String]] = data.flatMap(arr => {
//用双重for循环的方法让数组中所有的两两组合成边
for (i <- 0 to arr.length - 2; j <- i 1 to arr.length - 1) yield Edge(arr(i).hashCode.toLong, arr(j).hashCode.toLong, "")
})
.map(edge => (edge, 1)).reduceByKey(_ _)
//过滤将重复次数<5(经验阈值)的边去掉,
.filter(tp => tp._2 > 2)
.map(x => x._1)
//从初次的guid读取
val firstIdmap = spark.read.parquet("file:///E://code//study//spark_ml//file//userIds_demo2")
val firstVertices = firstIdmap.rdd.map(
{
case Row(id_hashcode: VertexId, guid: VertexId) =>
(id_hashcode, "")
}
)
val firstEdges = firstIdmap.rdd.map(row => {
val id_hashcode = row.getAs[VertexId]("id")
val guid = row.getAs[VertexId]("guid")
Edge(id_hashcode,guid,"")
}
)
// 通过vertex, edge沟通graph
val graph = Graph(vertices.union(firstVertices),edges.union(firstEdges))
//result: VertexRDD[VertexId] => rdd(点id-long, 组中最小值)
val result: VertexRDD[VertexId] = graph.connectedComponents().vertices
val idMap = firstIdmap.rdd.map(row => {
val id_hashcode = row.getAs[VertexId]("id")
val guid = row.getAs[VertexId]("guid")
(id_hashcode,guid)
}
).collectAsMap()
val bcMap = spark.sparkContext.broadcast(idMap)
import spark.implicits._
val todayIdmap = result.map(tup => (tup._2,tup._1))
.groupByKey()
.mapPartitions( iter => {
iter.map(tup => {
val idmapMap = bcMap.value
var todayGuid = tup._1
val ids = tup._2
//遍历id,挨个映射查找
var idFind = false
for (id <- ids if !idFind) {
val getGuid = idmapMap.get(id)
if (getGuid.isDefined) {
todayGuid = getGuid.get
idFind = true
}
}
(todayGuid, ids)
})
})
.flatMap(tup => {
val ids = tup._2
val guid = tup._1
for(ele <- ids) yield (ele, guid)
}).toDF("id", "guid")
todayIdmap.show()
todayIdmap.createOrReplaceTempView("id_guid");
val data2 = data.flatMap( arr => {
for(id <- arr) yield (id.hashCode.toLong, id)
}).toDF("id", "str_id")
data2.createOrReplaceTempView("id_original")
val output = spark.sql("""select
| t1.id as id_hashcode,
| t2.str_id as id,
| t1.guid as guid
|from id_guid t1
|left join id_original t2 on t1.id = t2.id
|group by t1.id, t2.str_id, t1.guid
|order by guid""".stripMargin
)
output.show()
}
}
输出展示:
代码语言:javascript复制 ----------- ---------------- -----------
|id_hashcode| id| guid|
----------- ---------------- -----------
|-1381665248|androidId_zs_001|-1908595409|
| 110929767| u_001|-1908595409|
|-1908595409| uuid_zs_001|-1908595409|
|-1018465903| imsi_zs_002|-1908595409|
|-1753513447| deviceId_zs_001|-1908595409|
| -714652388| mac_zs_002|-1908595409|
|-1908595408| uuid_zs_002|-1908595408|
|-1884715312| imei_ls_001|-1884715312|
|-1884715311| imei_ls_002|-1884715311|
|-1782473362|androidId_ls_001|-1782473362|
| 2140645735| deviceId_ls_001|-1782473361|
|-1419274017| imsi_ls_002|-1782473361|
| 110929768| u_002|-1782473361|
| 1985563773| uuid_ls_001|-1782473361|
|-1115460503| mac_ls_001|-1782473361|
|-1782473361|androidId_ls_002|-1782473361|
|-1483907198| imei_zs_001|-1483907198|
|-1483907197| imei_zs_002|-1483907197|
|-1419274018| imsi_ls_001|-1419274018|
|-1381665247|androidId_zs_002|-1381665247|
----------- ---------------- -----------
当然guid也可以采用MD5/UUID的方式输出。
代码语言:javascript复制 val output = spark.sql("""select
| t1.id as id_hashcode,
| t2.str_id as id,
| t1.guid as guid
|from id_guid t1
|left join id_original t2 on t1.id = t2.id
|group by t1.id, t2.str_id, t1.guid
|order by guid desc""".stripMargin
)
val info: RDD[(String, String, String)] = output.rdd.map(
row => {
val id = row.getAs[String]("id")
val oneId = row.getAs[Long]("guid").toString
val oneId_Md5 = DigestUtils.md5Hex(oneId)
(id, oneId,oneId_Md5)
}
)
<dependency>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
<version>1.13</version>
</dependency>
<dependency>
<groupId>commons-lang</groupId>
<artifactId>commons-lang</artifactId>
<version>2.6</version>
</dependency>