一. 需求简介
计算页面单跳转化率,什么是页面单跳转换率,比如一个用户在一次 Session 过程中访问的页面路径 3,5,7,9,10,21,那么页面 3 跳到页面 5 叫一次单跳,7-9 也叫一次单跳,那么单跳转化率就是要统计页面点击的概率 比如:计算 3-5 的单跳转化率,先获取符合条件的 Session 对于页面 3 的访问次数(PV)为 A,然后获取符合条件的 Session 中访问了页面 3 又紧接着访问了页面 5 的次数为 B,那么 B/A 就是 3-5 的页面单跳转化率.
产品经理和运营总监,可以根据这个指标,去尝试分析,整个网站,产品,各个页面的表现怎么样,是不是需要去优化产品的布局;吸引用户最终可以进入最后的支付页面。 数据分析师,可以此数据做更深一步的计算和分析。 企业管理层,可以看到整个公司的网站,各个页面的之间的跳转的表现如何,可以适当调整公司的经营战略或策略。 在该模块中,需要根据查询对象中设置的 Session 过滤条件,先将对应得 Session 过滤出来,然后根据查询对象中设置的页面路径,计算页面单跳转化率,比如查询的页面路径为:3、5、7、8,那么就要计算 3-5、5-7、7-8 的页面单跳转化率。 需要注意的一点是,页面的访问时有先后的,要做好排序。
二. 思路分析
- 读取到规定的页面
- 过滤出来规定页面的日志记录, 并统计出来每个页面的访问次数 countByKey 是行动算子 reduceByKey 是转换算子
- 明确哪些页面需要计算跳转次数 1-2, 2-3, 3-4 …
- 按照 session 统计所有页面的跳转次数, 并且需要按照时间升序来排序
- 按照 session 分组, 然后并对每组内的 UserVisitAction 进行排序
- 转换访问流水
- 过滤出来和统计目标一致的跳转
- 统计跳转次数
- 计算跳转率
三. 具体代码实现
- 1. 业务代码
package com.buwenbuhuo.spark.core.project.app
import java.text.DecimalFormat
import com.buwenbuhuo.spark.core.project.bean.UserVisitAction
import org.apache.spark.SparkContext
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
/**
**
*
* @author 不温卜火
* *
* @create 2020-07-30 15:19
**
* MyCSDN : https://buwenbuhuo.blog.csdn.net/
*
*/
object PageConversion {
def statPageConversionRate(sc: SparkContext,
userVisitActionRDD: RDD[UserVisitAction],
pageString:String): Unit ={
// 1. 做出来目标跳转流 1,2,3,4,5,6,7
val pages: Array[String] = pageString.split(",")
val prePages: Array[String] = pages.take(pages.length -1)
val postPages: Array[String] = pages.takeRight(pages.length -1)
val targetPageFlows: Array[String] = prePages.zip(postPages).map {
case (pre, post) => s"$pre->$post"
}
/* // 1.1 把targetPages做广播变量,优化性能
val targetPageFlowsBC: Broadcast[Array[String]] = sc.broadcast(targetPageFlows)*/
// println(targetPageFlows.toList)
// 2. 计算分母,计算需要页面的点击量
val pageAndCount = userVisitActionRDD
.filter(action => prePages.contains(action.page_id.toString))
.map(action => (action.page_id, 1))
.countByKey()
// println(pageAndCount) // 没问题
// 3. 计算分子
// 3.1 按照sessionID分组,不能先对需要的页面做过滤,否则会应用调整的逻辑
val sessionIdGrouped: RDD[(String, Iterable[UserVisitAction])] = userVisitActionRDD.groupBy(_ .session_id)
val pageFlowsRDD: RDD[String] = sessionIdGrouped.flatMap {
case (sid, actionIt) =>
// 每个session的行为做一个按照时间排序
val actions: List[UserVisitAction] = actionIt.toList.sortBy(_.action_time)
val preActions: List[UserVisitAction] = actions.take(actions.length -1)
val postActions: List[UserVisitAction] = actions.takeRight(actions.length -1)
preActions.zip(postActions).map {
case (preAction, postAction) => s"${preAction.page_id}->${postAction.page_id}"
}.filter(flow => targetPageFlows.contains(flow))
// .filter(flow => targetPageFlowsBC.value.contains(flow)) // 使用广播变量 本人使用有错误
}
// pageFlowsRDD.collect.foreach(println)
// 3.2 聚合
val pageFlowsAndCount = pageFlowsRDD.map((_, 1)).countByKey()
// 序列化
val f: DecimalFormat = new DecimalFormat(".00%")
// 4. 计算调整率
val result = pageFlowsAndCount.map {
// pageAndCount 分母
// 1-> 2 count/1的点击量
case (flow, count) =>
val rate = count.toDouble / pageAndCount(flow.split("->")(0).toLong)
(flow,f.format(rate))
}
println(result)
}
}
/*
1,2,3,4,5,6,7 计算他们的转换率
1. 想办法做出来跳转流
“ 1->2 ”,“ 2->3 ”,“ 3->4 ” ...
2. 计算跳转率
1 -> 2 调整率
分子
“1->2” 跳转流的个数
如何计算?
1. 保证是同一session才能计算,其实就是按照session进行分组
2. 按照时间排序
3. RDD[“1->2”,“2->3”,“3->4”] map() reduceByKey
RDD[UserVisitAction] map
RDD[1,2,3,4,5,6,7,8]
如果做跳转流:
rdd1 = RDD[1,2,3,4,5,6]
rdd2 = RDD[2,3,4,5,6,7,8]
rdd3 = rdd1.zip(zip).map(...)
过滤出来目标跳转流,然后再聚合
分母
页数1的点击数
*/
- 2. 主项目代码
package com.buwenbuhuo.spark.core.project.app
import com.buwenbuhuo.spark.core.project.bean.{CategoryCountInfo, UserVisitAction}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
/**
**
*@author 不温卜火
**
* @create 2020-07-29 12:18
**
* MyCSDN :https://buwenbuhuo.blog.csdn.net/
*/
object ProjectApp {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("ProjectAPP").setMaster("local[2]")
val sc: SparkContext = new SparkContext(conf)
// 把数据从文件读出来
val sourceRDD: RDD[String] = sc.textFile("D:/user_visit_action.txt")
// 把数据封装好(封装到样例类中)
// sourceRDD.collect.foreach(println)
val userVisitActionRDD: RDD[UserVisitAction] = sourceRDD.map(line => {
val fields: Array[String] = line.split("_")
UserVisitAction(
fields(0),
fields(1).toLong,
fields(2),
fields(3).toLong,
fields(4),
fields(5),
fields(6).toLong,
fields(7).toLong,
fields(8),
fields(9),
fields(10),
fields(11),
fields(12).toLong)
})
// 三
PageConversion.statPageConversionRate(sc,userVisitActionRDD,"1,2,3,4,5,6,7,8")
// 关闭项目(sc)
sc.stop()
}
}
四. 运行结果
本次的分享就到这里了