大数据【企业级360°全方位用户画像】标签开发代码抽取

2021-01-27 16:23:56 浏览数 (1)

写在前面: 博主是一名大数据的初学者,昵称来源于《爱丽丝梦游仙境》中的Alice和自己的昵称。作为一名互联网小白,写博客一方面是为了记录自己的学习历程,一方面是希望能够帮助到很多和自己一样处于起步阶段的萌新。由于水平有限,博客中难免会有一些错误,有纰漏之处恳请各位大佬不吝赐教!个人小站:http://alices.ibilibili.xyz/ , 博客主页:https://alice.blog.csdn.net/ 尽管当前水平可能不及各位大佬,但我还是希望自己能够做得更好,因为一天的生活就是一生的缩影。我希望在最美的年华,做最好的自己

在之前的几篇关于标签开发的博客中,博主已经不止一次地为大家介绍了开发代码书写的流程。无论是匹配型标签还是统计型标签,都涉及到了大量的代码重用问题。为了解决这个问题,本篇博客,我们将开始将对代码进行抽取,简便我们的开发!


1、创建一个特质

对于scala基础语法不太熟悉的朋友们可能有疑惑了。什么是特质呢?

其实关于scala中特质的介绍,博主在前几个月写scala专栏的时候就科普过了。感兴趣的朋友可以?《scala快速入门系列【特质】》

简单来说就是,scala中没有Java中的接口(interface),替代的概念是——特质。

特质是scala中代码复用的基础单元,特质的定义和抽象类的定义很像,但它是使用trait关键字。

我们先在IDEA中创建一个特质

然后咱们就可以开始写代码。

因为在前面的几篇具体讲解标签开发的博客中,博主已经将流程讲了好几遍,算得上是非常透彻了。所以本篇博客,博主在这直接贴上代码,并不做过多的过程说明。每一步具体的含义,都已经体现在了代码中,如果各位朋友们看了有任何的疑惑,可以私信我,也可以在评论区留言。

代码语言:javascript复制
package com.czxy.base

import java.util.Properties
import com.czxy.bean.HBaseMeta
import com.typesafe.config.{Config, ConfigFactory}
import org.apache.spark.sql.expressions.UserDefinedFunction
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}

/*
 * @Author: Alice菌
 * @Date: 2020/6/13 08:49
 * @Description: 

    此代码用户编写用户画像项目可以重用的代码
 */
trait BaseModel {
  // 所有重复的代码(功能)都抽取到这里


  // 设置任务的名称
  def setAppName:String

  // 设置四级标签id
  def setFourTagId:String

  /* 1. 初始化SparkSession对象  */
  private val spark:SparkSession = SparkSession.builder().appName(setAppName).master("local[*]").getOrCreate()

  //导入隐式转换
  import org.apache.spark.sql.functions._
  import spark.implicits._

  /* 2. 连接MySQL  */
  // 读取application.conf 内的配置
  private val config: Config = ConfigFactory.load()
  // 获取url
  private val url : String = config.getString("jdbc.mysql.url")
  // 获取tableName
  private val tableName : String = config.getString("jdbc.mysql.tablename")

  
  def getMySQLDF = {
    // 连接MySQL数据库
    spark.read.jdbc(url,tableName,new Properties)
  }

  
  /* 3. 读取MySQL数据库的四级标签  */

  def getFourTag (mysqlCoon: DataFrame): HBaseMeta ={
    //读取HBase中的四级标签
    val fourTagsDS: Dataset[Row] = mysqlCoon.select("id","rule").where("id=" setFourTagId)
    //切分rule
    val KVMap: Map[String, String] = fourTagsDS.map(row => {
      // 获取到rule值
      val RuleValue: String = row.getAs("rule").toString
      // 使用“##”对数据进行切分
      val KVMaps: Array[(String, String)] = RuleValue.split("##").map(kv => {
        val arr: Array[String] = kv.split("=")
        (arr(0), arr(1))
      })
      KVMaps
    }).collectAsList().get(0).toMap       //封装成map
    //   将Map 转换成HbaseMeta样例类
    val hbaseMeta: HBaseMeta = toHBaseMeta(KVMap)
    hbaseMeta
  }

  /* 4. 读取五级标签数据【单独处理】*/
  def getFiveTagDF(mysqlConn:DataFrame)={

    mysqlConn.select("id","rule").where("pid=" setFourTagId).toDF()

  }

  /* 5. 读取hbase中的数据,这里将hbase作为数据源进行读取 */
  def getHbase(hbaseMeta: HBaseMeta)={
    val hbaseDatas: DataFrame = spark.read.format("com.czxy.tools.HBaseDataSource")
      // hbaseMeta.zkHosts 就是 192.168.10.20  和 下面是两种不同的写法
      .option("zkHosts",hbaseMeta.zkHosts)
      .option(HBaseMeta.ZKPORT, hbaseMeta.zkPort)
      .option(HBaseMeta.HBASETABLE, hbaseMeta.hbaseTable)
      .option(HBaseMeta.FAMILY, hbaseMeta.family)
      .option(HBaseMeta.SELECTFIELDS, hbaseMeta.selectFields)
      .load()

    println(hbaseMeta)

    hbaseDatas.show()

       hbaseDatas
  }

  /* 6. 五级数据与 HBase 数据进行打标签【单独处理】 */
  def getNewTag(spark: SparkSession,fiveTagDF:DataFrame,hbaseDF:DataFrame):DataFrame



  /**
    * 7.合并历史数据
    * 将标签写入HBase
    *
    * @param newTags 新标签
    * @return 返回最终标签
    */
  def joinAllTags(newTags: DataFrame): DataFrame = {
    //读取HBase 中的历史数据
    val oldTags: DataFrame = spark.read.format("com.czxy.tools.HBaseDataSource")
      .option(HBaseMeta.ZKHOSTS, "192.168.10.20")
      .option(HBaseMeta.ZKPORT, "2181")
      .option(HBaseMeta.HBASETABLE, "test")
      .option(HBaseMeta.FAMILY, "detail")
      .option(HBaseMeta.SELECTFIELDS, "userId,tagsId")
      .load()

    //使用join将新数据和旧数据的tagsId合并到一起
    val allTags: DataFrame = oldTags.join(newTags, oldTags("userId") === newTags("userId"))


    //  创建一个新的udf函数,用来拼接 tagsId
    val getAllTags: UserDefinedFunction = udf((oldTagsId: String, newTagsId: String) => {
      if (oldTagsId == "" && newTagsId != "") {
        newTagsId
      } else if (oldTagsId != "" && newTagsId == "") {
        oldTagsId
      } else if (oldTagsId == "" && newTagsId == "") {
        ""
      } else {
        val str: String = oldTagsId   ","   newTagsId
        str.split(",").distinct.mkString(",")
      }
    })

    //获取最终结果
    allTags.select(
      when(oldTags("userId").isNotNull, oldTags("userId"))
        .when(newTags("userId").isNotNull, newTags("userId"))
        .as("userId"),
      getAllTags(oldTags("tagsId"), newTags("tagsId"))
        .as("tagsId")
    )


  }


  /**
    * 8. 新建一个方法,用于保存数据  save
    * @param allTags   最终的结果
    */
  def save(allTags: DataFrame): Unit = {
    //把最终结果保存到HBase
    allTags.write.format("com.czxy.tools.HBaseDataSource")
      .option(HBaseMeta.ZKHOSTS, "192.168.10.20")
      .option(HBaseMeta.ZKPORT, "2181")
      .option(HBaseMeta.HBASETABLE, "test")
      .option(HBaseMeta.FAMILY, "detail")
      .option(HBaseMeta.SELECTFIELDS, "userId,tagsId")
      .save()

    println("结果保存完毕!!!")
  }


  /* 9. 断开连接 */
  def close(): Unit = {
    spark.close()
  }
   
  //将mysql中的四级标签的rule  封装成HBaseMeta
  //方便后续使用的时候方便调用
  def toHBaseMeta(KVMap: Map[String, String]): HBaseMeta = {
    //开始封装
    HBaseMeta(KVMap.getOrElse("inType",""),
      KVMap.getOrElse(HBaseMeta.ZKHOSTS,""),
      KVMap.getOrElse(HBaseMeta.ZKPORT,""),
      KVMap.getOrElse(HBaseMeta.HBASETABLE,""),
      KVMap.getOrElse(HBaseMeta.FAMILY,""),
      KVMap.getOrElse(HBaseMeta.SELECTFIELDS,""),
      KVMap.getOrElse(HBaseMeta.ROWKEY,"")
    )
  }


  /**
    * 按照先后顺序, 连接mysql数据库, 读取四级,五级HBase数据
    * 打标签,最终写入
    */
  def exec(): Unit = {
    //1.设置日志级别
    spark.sparkContext.setLogLevel("WARN")
    //2.连接mysql
    val mysqlConnection: DataFrame = getMySQLDF
    //3. 读取mysql数据库中的四级标签
    val fourTags: HBaseMeta = getFourTag(mysqlConnection)
    //4. 读取mysql数据库中的五级标签
    val fiveTags: Dataset[Row] = getFiveTagDF(mysqlConnection)
    //读取HBase 中的数据
    val hBaseMea: DataFrame = getHbase(fourTags)
    //读取新获取的数据
    val newTags: DataFrame = getNewTag(spark,fiveTags, hBaseMea)
    newTags.show()

    //获取最终结果
    val allTags: DataFrame = joinAllTags(newTags)
    allTags.show()

    //保存到HBase
    save(allTags)
    //断开连接
    close()
    
  }
  
  
}

2、调用特质

既然特质我们已经写好了,那么现在我们想要基于用户的工作进行统计型标签开发,那么我们就可以像下面的示例一样。

代码语言:javascript复制
import com.czxy.base.BaseModel
import com.czxy.bean.{HBaseMeta, TagRule}
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.expressions.UserDefinedFunction

/*
 * @Author: Alice菌
 * @Date: 2020/6/13 09:48
 * @Description:

     基于用户的Job标签,做测试使用
 */
object Test extends BaseModel{

  override def setAppName: String = "Job"

  override def setFourTagId: String = "65"

  // 重写Hbase数据与MySQL五级标签数据处理的方法
  override def getNewTag(spark: SparkSession, fiveTagDF: DataFrame, hbaseDF: DataFrame): DataFrame = {

    // 引入隐式转换
    import spark.implicits._
    //引入java 和scala相互转换
    import scala.collection.JavaConverters._
    //引入sparkSQL的内置函数
    import org.apache.spark.sql.functions._

    // 对5级标签的数据进行处理
    val fiveTageList: List[TagRule] = fiveTagDF.map(row => {
      // row 是一条数据
      // 获取出id 和 rule
      val id: Int = row.getAs("id").toString.toInt
      val rule: String = row.getAs("rule").toString

      // 封装样例类
      TagRule(id,rule)
    }).collectAsList()   // 将DataSet转换成util.List[TagRule]   这个类型遍历时无法获取id,rule数据
      .asScala.toList    // 将util.List转换成list   需要隐式转换    import scala.collection.JavaConverters._

    // 需要自定义UDF函数
    val getUserTags: UserDefinedFunction = udf((rule: String) => {

      // 设置标签的默认值
      var tagId: Int = 0
      // 遍历每一个五级标签的rule
      for (tagRule <- fiveTageList) {

        if (tagRule.rule == rule) {
          tagId = tagRule.id
        }
      }
      tagId
    })

    val jobNewTags : DataFrame = hbaseDF.select('id.as ("userId"),getUserTags('job).as("tagsId"))
    jobNewTags.show(5)

    jobNewTags

  }
  def main(args: Array[String]): Unit = {

    exec()

  }

}

我们可以发现,现在开发一个标签,我们只需要实现第一步写好的特质,然后在具体的类中设置任务的名称AppName和四级标签的id,以及重写Hbase数据与MySQL五级标签数据处理的方法。然后在程序的主入口main函数中,调用特质中的exec方法即可。

这大大的减少了我们的工作量。不知道各位朋友感受到了没有呢?

结语

博主在经过了几个小时的开发后,目前已经成功了开发了15个标签,分别是7个匹配型和8个统计型标签。等过段时间学习了挖掘算法后,再开发几个挖掘型的标签后,有希望完成超过20个标签?

如果以上过程中出现了任何的纰漏错误,烦请大佬们指正?

受益的朋友或对大数据技术感兴趣的伙伴记得点赞关注支持一波?

希望我们都能在学习的道路上越走越远?

0 人点赞