Spark Core项目实战(2) | Top10热门品类中每个品类的 Top10 活跃 Session 统计

2020-10-28 16:56:01 浏览数 (2)

一. 需求分析

  对于排名前 10 的品类,分别获取每个品类点击次数排名前 10 的 sessionId。(注意: 这里我们只关注点击次数, 不关心下单和支付次数)   这个就是说,对于 top10 的品类,每一个都要获取对它点击次数排名前 10 的 sessionId。   这个功能,可以让我们看到,对某个用户群体最感兴趣的品类,各个品类最感兴趣最典型的用户的 session 的行为。

二. 思路

  1. 过滤出来 category Top10的日志
  2. 需要用到需求1的结果, 然后只需要得到categoryId就可以了
  3. 转换结果为 RDD[(categoryId, sessionId), 1] 然后统计数量 => RDD[(categoryId, sessionId), count]
  4. 统计每个品类 top10. => RDD[categoryId, (sessionId, count)] => RDD[categoryId, Iterable[(sessionId, count)]]
  5. 对每个 Iterable[(sessionId, count)]进行排序, 并取每个Iterable的前10
  6. 把数据封装到 CategorySession 中

三. 项目实现

1. bean类

  • 1. 创建SessionInfo
代码语言:javascript复制
case class SessionInfo(sessionId:String,
                       count: Long) extends Ordered[SessionInfo]{
      // 按照降序排列
      // else if (this.count == that.count) 0  这个不能加,否则会去重
      override def compare(that: SessionInfo): Int =
        if (this.count >that.count) -1

        else 1

2. 具体实现

  • 1. 主类实现
代码语言:javascript复制
package com.buwenbuhuo.spark.core.project.app

import com.buwenbuhuo.spark.core.project.bean.{CategoryCountInfo, UserVisitAction}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

/**
  **
*@author 不温卜火
  **
  * @create 2020-07-29 12:18
  **
  *         MyCSDN :https://buwenbuhuo.blog.csdn.net/
  */
object ProjectApp {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setAppName("ProjectAPP").setMaster("local[2]")
    val sc: SparkContext = new SparkContext(conf)

    // 把数据从文件读出来
    val sourceRDD: RDD[String] = sc.textFile("D:/user_visit_action.txt")

    // 把数据封装好(封装到样例类中)
//    sourceRDD.collect.foreach(println)
    val userVisitActionRDD: RDD[UserVisitAction] = sourceRDD.map(line => {
    val fields: Array[String] = line.split("_")
      UserVisitAction(
        fields(0),
        fields(1).toLong,
        fields(2),
        fields(3).toLong,
        fields(4),
        fields(5),
        fields(6).toLong,
        fields(7).toLong,
        fields(8),
        fields(9),
        fields(10),
        fields(11),
        fields(12).toLong)
    })


        // 需求1:
         val categoryTop10: List[CategoryCountInfo] = CategoryTopApp.calcCatgoryTop10(sc , userVisitActionRDD)

        // 需求2:top10品类的top10session

    	CategorySessionTopApp.statCategorySessionTop10(sc,categoryTop10,userVisitActionRDD)


    // 关闭项目(sc)
    sc.stop()

  }

}
  • 2. 解决方案1(原始方法,没任何优化)
代码语言:javascript复制
package com.buwenbuhuo.spark.core.project.app

import com.buwenbuhuo.spark.core.project.bean.{CategoryCountInfo, SessionInfo, UserVisitAction}
import org.apache.spark.{Partitioner, SparkContext}
import org.apache.spark.rdd.RDD

import scala.collection.mutable

/**
 **
 *
 * @author 不温卜火
 *         *
 * @create 2020-07-29 20:07
 **
 *         MyCSDN :  https://buwenbuhuo.blog.csdn.net/
 *
 */
object CategorySessionTopApp {

    /*
    1. 最原始方法 ,没有任何优化,方案1
    */
    def statCategorySessionTop10(sc: SparkContext,categoryTop10: List[CategoryCountInfo],userVisitActionRDD: RDD[UserVisitAction]): Unit ={
        // 1. 过滤出来只包含 top10 品类id的那些点击记录
        // 1.1 先把top10品类id拿出来,转成Long id的目的是为了和UserVisitAction Clided兼容
      val cids: List[Long] = categoryTop10.map(_.categoryId.toLong)
      val filteredUserVisitActionRDD: RDD[UserVisitAction] = userVisitActionRDD.filter(action => cids.contains(action.click_category_id))


      // 2.每个品类top10session的计算

      // 2.1 先map出来需要字段
      val cidSidAndOne: RDD[((Long, String), Int)] =
      filteredUserVisitActionRDD.map(action =>((action.click_category_id,action.session_id),1))

      // 2.2 做聚合操作   得到RDD[((cid,sid),count)]
      val cidSidAndCount: RDD[((Long, String), Int)] = cidSidAndOne.reduceByKey(_   _)

      // map 出来想要的数据结构
      val cidAndSidCount: RDD[(Long, (String, Int))] = cidSidAndCount.map {
        case ((cid, sid), count) => (cid, (sid, count))
      }

      // 2.3 分组  排序取Top10
      val cidAandSidCountItRDD: RDD[(Long, Iterable[(String, Int)])] = cidAndSidCount.groupByKey()

      // 2.4 对每个值排序取top10

      //  解法1: 最原始的写法 最不好的写法,it.toList一时爽,不过最终可能会因为内存原因而爆掉
      val result = cidAandSidCountItRDD mapValues((it:Iterable[(String,Int)]) =>{
        // 只能使用scala排序,scala排序必须把所有数据全部加载到内存才能排。
        // 如果数据量很小可以 ,数据量大就不行了
        it.toList.sortBy(-_._2).take(10)
      })

      result.collect.foreach(println)

    }


}


/*
计算热门session口径:看每个session的点击记录

1. 过滤出来只包含 top10 品类id的那些点击记录

2. 每个品类top10session的计算
    => RDD[(品类id, sessionId))] map
    => RDD[(品类id, sessionId), 1)]  reduceByKey
    => RDD[(品类id, sessionId), count)]    map
    => RDD[品类id, (sessionId, count)]     groupByKey
    RDD[品类id, Iterator[(sessionId, count)]]  map内部,对iterator排序,取前10

----

 */
  • 3. 解决方案2
代码语言:javascript复制
  /*
    2. 解决方案2:
            每次排序一个cid,需要排10次
    */
    def statCategorySessionTop10_2(sc: SparkContext,categoryTop10: List[CategoryCountInfo],userVisitActionRDD: RDD[UserVisitAction]): Unit ={
    // 1. 过滤出来只包含 top10 品类id的那些点击记录
    // 1.1 先把top10品类id拿出来,转成Long id的目的是为了和UserVisitAction Clided兼容
    val cids: List[Long] = categoryTop10.map(_.categoryId.toLong)
    val filteredUserVisitActionRDD: RDD[UserVisitAction] = userVisitActionRDD.filter(action => cids.contains(action.click_category_id))

      // 方案2写法1:
     /*
       // 2. 需要排10次
        cids.foreach(cid =>{
          // 2.1 先过滤出来点击id是cid的那些记录
          val cidUserVisitActionRDD: RDD[UserVisitAction] = filteredUserVisitActionRDD.filter(_.click_category_id == cid)
          // 2.2 聚合
          val result: Map[Long, List[(String, Int)]] = cidUserVisitActionRDD
            .map(action => ((action.click_category_id, action.session_id), 1))
            .reduceByKey(_   _)
            .map {
              case ((cid, sid), count) => (cid, (sid, count))
            }
            .sortBy(- _._2._2)
            .take(10)
            .groupBy(_ . _1)
            .map{
              case (cid,arr) => (cid,arr.map(_._2).toList)
            }

          println(result.toMap)

        })
        */

     // 方案2写法2:
     // 2. 需要排10次
      val temp: List[Map[Long, List[(String, Int)]]] = cids.map(f = cid => {
        // 2.1 先过滤出来点击id是cid的那些记录
        val cidUserVisitActionRDD: RDD[UserVisitAction] = filteredUserVisitActionRDD.filter(_.click_category_id == cid)
        // 2.2 聚合
        val r: Map[Long, List[(String, Int)]] = cidUserVisitActionRDD
          .map(action => ((action.click_category_id, action.session_id), 1))
          .reduceByKey(_   _)
          .map {
            case ((cid, sid), count) => (cid, (sid, count))
          }
          .sortBy(-_._2._2)
          .take(10)
          .groupBy(_._1)
          .map {
            case (cid, arr) => (cid, arr.map(_._2).toList)
          }
        r
      })
      val result: List[(Long, List[(String, Int)])] = temp.flatMap(map => map)
      result.foreach(println)

  }


/*


使用scala的排序,会导致内存溢出
问题解决方案:
    方案2:
        1. 使用spark排序,来解决问题
        2. spark的排序是整体排序。 不能直接使用spark排序
        3. 10个品类id,我就使用spark的排序功能排10次

        优点:
            一定能完成,不会oom
         缺点:
             要起10个Job,排序10次


 */
  • 4. 解决方案3
代码语言:javascript复制
     /*
    3. 解决方案3:
            找一个可以排序的集合,然后时刻保持这个集合中只有10最大的元素
    */
      def statCategorySessionTop10_3(sc: SparkContext,categoryTop10: List[CategoryCountInfo],userVisitActionRDD: RDD[UserVisitAction]): Unit ={
        // 1. 过滤出来只包含 top10 品类id的那些点击记录
        // 1.1 先把top10品类id拿出来,转成Long id的目的是为了和UserVisitAction Clided兼容
        val cids: List[Long] = categoryTop10.map(_.categoryId.toLong)
        val filteredUserVisitActionRDD: RDD[UserVisitAction] = userVisitActionRDD.filter(action => cids.contains(action.click_category_id))



        // 2.每个品类top10session的计算

        // 2.1 先map出来需要字段
        val cidSidAndOne: RDD[((Long, String), Int)] =
          filteredUserVisitActionRDD.map(action =>((action.click_category_id,action.session_id),1))

        // 2.2 做聚合操作   得到RDD[((cid,sid),count)]
        val cidSidAndCount: RDD[((Long, String), Int)] = cidSidAndOne.reduceByKey(_   _)

        // map 出来想要的数据结构
        val cidAndSidCount: RDD[(Long, (String, Int))] = cidSidAndCount.map {
          case ((cid, sid), count) => (cid, (sid, count))
        }

        // 2.3 分组  排序取Top10
        val cidAandSidCountItRDD: RDD[(Long, Iterable[(String, Int)])] = cidAndSidCount.groupByKey()

        // 2.4 对每个值排序取top10
        val result = cidAandSidCountItRDD mapValues((it:Iterable[(String,Int)]) =>{
          // 不要把Iterable直接转成list再排序
          var set = mutable.TreeSet[SessionInfo]()
          it.foreach{
            case (sid,count) =>
              val info: SessionInfo = SessionInfo(sid, count)
              set  = info
              if(set.size > 10) set = set.take(10)

          }
          set.toList
        })

        // 起1 job
        result.collect.foreach(println)
        Thread.sleep(1000000)


      }

/*


     方案3:
         内存溢出,iterable => 转换list

         最终的目的top10
         搞一个集合,这集合中永远只保存10个元素,用于最大的10个元素

         先聚合,聚合后分组,分组内做了排序(用了自动排序的功能集合TreeSet)

         优点:
              一定可以完成,也不会oom,job也是只有一个job

          坏处:
               做了两次shuffle,效率比较低下


 */
  • 5. 解决方案4
代码语言:javascript复制
    /*
    4. 解决方案4:
            去掉groupBy,减少shuffle的次数
    */
      def statCategorySessionTop10_4(sc: SparkContext,categoryTop10: List[CategoryCountInfo],userVisitActionRDD: RDD[UserVisitAction]): Unit ={
        // 1. 过滤出来只包含 top10 品类id的那些点击记录
        // 1.1 先把top10品类id拿出来,转成Long id的目的是为了和UserVisitAction Clided兼容
        val cids: List[Long] = categoryTop10.map(_.categoryId.toLong)
        val filteredUserVisitActionRDD: RDD[UserVisitAction] = userVisitActionRDD.filter(action => cids.contains(action.click_category_id))



        // 2.每个品类top10session的计算

        // 2.1 先map出来需要字段
        val cidSidAndOne: RDD[((Long, String), Int)] =
          filteredUserVisitActionRDD.map(action =>((action.click_category_id,action.session_id),1))

        // 2.2 做聚合操作   得到RDD[((cid,sid),count)]
        val cidSidAndCount: RDD[((Long, String), Int)] =
          cidSidAndOne.reduceByKey(new CategorySessionPartitioner(cids),_   _)

        // 2.3 cidSidAndCount 执行mapPartitions
        val result: RDD[(Long, List[SessionInfo])] = cidSidAndCount.mapPartitions(it => {
          // 不要把Iterable直接转成list再排序
          var set = mutable.TreeSet[SessionInfo]()
          var  categoryId = -1L
          it.foreach {
            case ((cid, sid), count) =>
              categoryId = cid
              val info: SessionInfo = SessionInfo(sid, count)
              set  = info
              if (set.size > 10) set = set.take(10)

          }
//          set.map((categoryId, _)).toIterator
          Iterator((categoryId, set.toList))
        })
          result.collect.foreach(println)

          Thread.sleep(1000000)


      }
}

class CategorySessionPartitioner(cids:List[Long]) extends Partitioner {
  private val cidIndexMap: Map[Long, Int] = cids.zipWithIndex.toMap

  // 分区和品类id数量保持一致,可以保证一个的分区只有一个cid
  override def numPartitions: Int = 10

  // (Long,String) => (cid,sessionId)
  override def getPartition(key: Any): Int = key match {
      // 使用这个cid在数组中的下标作为分区的索引非常合适
    case (cid:Long,_) => cidIndexMap(cid)
  }

}

/*

       方案4:
          对方案3做优化,减少一次shuffle
          减少shuffle只能是去掉groupByKey

          还得需要得到每一个cid的所有session的集合?! 怎样得到?
          rdd是分区的,mapPartitions(it => {})
          能不能让一个分区只有一个cid的所有数据

          每个分区只有一种cid,如何做到每个分区只有一个cid?
          用自定义区分器!
          10cid,应该有10个分区


 */

综合上述四种方法 最后一种方法是最完美的

  本次的分享就到这里了

0 人点赞