7-点击流数据分析项目-数据预处理

2022-11-12 16:29:16 浏览数 (2)

文章目录

  • 7-点击流数据分析项目-数据预处理
    • 1.数据集介绍
      • 原始数据样式:
      • 待生成的页面点击流模型Pageviews表
      • 待生成的点击流模型Visits表
      • 数据清洗
    • 2.采集日志数据到HDFS上
      • 创建目录
      • 编写脚本
      • 脚本内容
        • 给脚本添加执行权限
        • 执行脚本
    • 3.采用MR实现数据预处理-过滤静态资源
      • 创建maven工程
      • 编写日志的实体类
      • 编写处理数据清洗的工具类
      • 编写预处理Mapper类
      • 编写预处理Driver类
    • 4.采用MR实现数据预处理-获得页面流pageviews数据模型
      • pageviews原理
      • 创建页面流数据模型pageviews-Mapper类
      • 创建页面流数据模型pageviews-Reducer类
      • 创建页面流数据模型pageviews-Driver类
    • 5.采用MR实现数据预处理-获得点击流访问表visits数据模型
      • 点击流访问表visits原理
      • 生成点击流访问数据
      • 生成Visits访问数据MR-实体类PageViewsBean
      • 生成Visits访问数据MR-实体类VisitBean
      • 生成Visits访问数据MR-Mapper类ClickStreamVisitMapper
      • 生成Visits访问数据MR-Reducer类ClickStreamVisitReducer
      • 生成Visits访问数据MR-Driver类ClickStreamVisitDriver
    • 导入HDFS
    • 总结

7-点击流数据分析项目-数据预处理

1.数据集介绍

数据集介绍见https://blog.csdn.net/m0_38139250/article/details/122181337

数据集下载地址:

https://download.csdn.net/download/m0_38139250/72088781

中,我们对项目做了大致的介绍,这里摘录数据的部分罗列如下:

原始数据样式:

194.237.142.21 - - 18/Sep/2021:06:49:18 0000 “GET /wp-content/uploads/2013/07/rstudio-git3.png HTTP/1.1” 304 0 “-” “Mozilla/4.0 (compatible;)”

待生成的页面点击流模型Pageviews表

session为一个用户,用户对网站的每次访问。

待生成的点击流模型Visits表

(按session聚集的页面访问信息)

这就是点击流模型。当WEB日志转化成点击流数据的时候,很多网站分析度量的计算变得简单了,这就是点击流的“魔力”所在。基于点击流数据我们可以统计出许多常见的网站分析度量

数据清洗

  • 时间格式无效
  • 响应状态码》400
  • 静态页面删除(js、css资源)

2.采集日志数据到HDFS上

创建目录

代码语言:javascript复制
# 创建 /sx/clickstream/process/input 目录
hadoop fs -mkdir -p /sx/clickstream/process/input

编写脚本

代码语言:javascript复制
touch mv2clickstreamprocessinput.sh

脚本内容

代码语言:javascript复制
#!/bin/bash

#
# ===========================================================================
# 程序名称:     
# 功能描述:     移动文件到预处理工作目录
# 输入参数:     运行日期
# 目标路径:     /sx/clickstream/process/input
# 数据源  :     /home/ubuntu/Code/sx
# 代码审核:     
# 修改人名:
# 修改日期:
# 修改原因:
# 修改列表: 
# ===========================================================================

#lsn上传日志文件存放的目录
log_dir=/home/ubuntu/Code/sx

# 日志文件名字
log_name=access.log.fensi

#预处理程序的工作目录
log_pre_input=/sx/clickstream/process/input

#读取日志文件的目录,判断是否有需要上传的文件
#files=`hadoop fs -ls $log_dir | grep $day_01 | wc -l`
files=`ls $log_dir | wc -l`
hadoop fs -mkdir -p ${log_pre_input}
if [ $files -gt 0 ]; then
hadoop fs -put ${log_dir}/${log_name} ${log_pre_input}
echo "success moved ${log_dir}/${log_name} to ${log_pre_input} ....."
fi
给脚本添加执行权限
代码语言:javascript复制
chmod u x mv2clickstreamprocessinput.sh

chmod是权限管理命令change the permissions mode of a file的缩写。。

u代表所有者,x代表执行权限。 表示增加权限。

chmod u x file.sh 就表示对当前目录下的file.sh文件的所有者增加可执行权限。

执行脚本
代码语言:javascript复制
sh mv2clickstreamprocessinput.sh

3.采用MR实现数据预处理-过滤静态资源

创建maven工程

修改pom文件

代码语言:javascript复制
   <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>

<dependencies>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-client</artifactId>
        <version>2.7.3</version>
    </dependency>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-common</artifactId>
        <version>2.7.3</version>
    </dependency>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-hdfs</artifactId>
        <version>2.7.3</version>
    </dependency>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-mapreduce-client-core</artifactId>
        <version>2.7.3</version>
    </dependency>
    <dependency>
        <groupId>org.apache.hbase</groupId>
        <artifactId>hbase-client</artifactId>
        <version>1.1.2</version>
    </dependency>
    <dependency>
        <groupId>org.apache.hbase</groupId>
        <artifactId>hbase-server</artifactId>
        <version>1.1.2</version>
        <!-- <version>1.2.3</version> -->
    </dependency>
</dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-jar-plugin</artifactId>
                <version>2.6</version>
                <configuration>
                    <archive>
                        <manifest>
                            <mainClass>com.mystudy.hadoopPro.APP</mainClass>
                        </manifest>
                    </archive>
                </configuration>
            </plugin>

            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.1.1</version>
                <!-- 可以打 fat 和thin jar-->
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                    <encoding>UTF-8</encoding>

                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

回顾下原始数据:

194.237.142.21 - - 18/Sep/2021:06:49:18 0000 “GET /wp-content/uploads/2013/07/rstudio-git3.png HTTP/1.1” 304 0 “-” “Mozilla/4.0 (compatible;)”

编写日志的实体类

edu.sx.clickstream.pre.WebLogBean

代码如下

代码语言:javascript复制
package edu.sx.clickstream.pre;
import org.apache.hadoop.io.Writable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

/**
 * 对接外部数据的层,表结构定义最好跟外部数据源保持一致
 * 术语: 贴源表
 * @author
 *
 */
public class WebLogBean implements Writable {

    private boolean valid = true;// 判断数据是否合法
    private String remote_addr;// 记录客户端的ip地址
    private String remote_user;// 记录客户端用户名称,忽略属性"-"
    private String time_local;// 记录访问时间与时区
    private String request;// 记录请求的url与http协议
    private String status;// 记录请求状态;成功是200
    private String body_bytes_sent;// 记录发送给客户端文件主体内容大小
    private String http_referer;// 用来记录从那个页面链接访问过来的
    private String http_user_agent;// 记录客户浏览器的相关信息


    public void set(boolean valid,String remote_addr, String remote_user, String time_local, String request, String status, String body_bytes_sent, String http_referer, String http_user_agent) {
        this.valid = valid;
        this.remote_addr = remote_addr;
        this.remote_user = remote_user;
        this.time_local = time_local;
        this.request = request;
        this.status = status;
        this.body_bytes_sent = body_bytes_sent;
        this.http_referer = http_referer;
        this.http_user_agent = http_user_agent;
    }

    public String getRemote_addr() {
        return remote_addr;
    }

    public void setRemote_addr(String remote_addr) {
        this.remote_addr = remote_addr;
    }

    public String getRemote_user() {
        return remote_user;
    }

    public void setRemote_user(String remote_user) {
        this.remote_user = remote_user;
    }

    public String getTime_local() {
        return this.time_local;
    }

    public void setTime_local(String time_local) {
        this.time_local = time_local;
    }

    public String getRequest() {
        return request;
    }

    public void setRequest(String request) {
        this.request = request;
    }

    public String getStatus() {
        return status;
    }

    public void setStatus(String status) {
        this.status = status;
    }

    public String getBody_bytes_sent() {
        return body_bytes_sent;
    }

    public void setBody_bytes_sent(String body_bytes_sent) {
        this.body_bytes_sent = body_bytes_sent;
    }

    public String getHttp_referer() {
        return http_referer;
    }

    public void setHttp_referer(String http_referer) {
        this.http_referer = http_referer;
    }

    public String getHttp_user_agent() {
        return http_user_agent;
    }

    public void setHttp_user_agent(String http_user_agent) {
        this.http_user_agent = http_user_agent;
    }

    public boolean isValid() {
        return valid;
    }

    public void setValid(boolean valid) {
        this.valid = valid;
    }

    /**
     * 01是hive当中默认的分隔符,不会出现用户手打出来的情况
     * @return
     */
    @Override
    public String toString() {
        StringBuilder sb = new StringBuilder();
        sb.append(this.valid);
        sb.append("01").append(this.getRemote_addr());
        sb.append("01").append(this.getRemote_user());
        sb.append("01").append(this.getTime_local());
        sb.append("01").append(this.getRequest());
        sb.append("01").append(this.getStatus());
        sb.append("01").append(this.getBody_bytes_sent());
        sb.append("01").append(this.getHttp_referer());
        sb.append("01").append(this.getHttp_user_agent());
        return sb.toString();
    }

    @Override
    public void readFields(DataInput in) throws IOException {
        this.valid = in.readBoolean();
        this.remote_addr = in.readUTF();
        this.remote_user = in.readUTF();
        this.time_local = in.readUTF();
        this.request = in.readUTF();
        this.status = in.readUTF();
        this.body_bytes_sent = in.readUTF();
        this.http_referer = in.readUTF();
        this.http_user_agent = in.readUTF();

    }

    @Override
    public void write(DataOutput out) throws IOException {
        out.writeBoolean(this.valid);
        out.writeUTF(null==remote_addr?"":remote_addr);
        out.writeUTF(null==remote_user?"":remote_user);
        out.writeUTF(null==time_local?"":time_local);
        out.writeUTF(null==request?"":request);
        out.writeUTF(null==status?"":status);
        out.writeUTF(null==body_bytes_sent?"":body_bytes_sent);
        out.writeUTF(null==http_referer?"":http_referer);
        out.writeUTF(null==http_user_agent?"":http_user_agent);

    }

}

编写处理数据清洗的工具类

edu.sx.clickstream.pre.WebLogParser

代码如下:

代码语言:javascript复制
package edu.sx.clickstream.pre;


import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Locale;
import java.util.Set;

public class WebLogParser {

    public static SimpleDateFormat df1 = new SimpleDateFormat("dd/MMM/yyyy:HH:mm:ss", Locale.US);
    public static SimpleDateFormat df2 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss", Locale.US);

    public static WebLogBean parser(String line) {
        WebLogBean webLogBean = new WebLogBean();
        //通过空格来对我们的数据进行切割,然后拼接字符串,将我们同一个字段里面的数据拼接到一起
        //222.66.59.174  -- [18/Sep/2021:06:53:30  0000] "GET /images/my.jpg HTTP/1.1" 200 19939 "http://www.angularjs.cn/A00n" "Mozilla/5.0 (Windows NT 6.1; WOW64; rv:23.0) Gecko/20100101 Firefox/23.0"
        String[] arr = line.split(" ");
        if (arr.length > 11) {
            webLogBean.setRemote_addr(arr[0]);
            webLogBean.setRemote_user(arr[1]);
            //将我们的字符串转换成中文习惯的字符串
            //  [18/Sep/2021:06:52:32  0000]
            //   18/Sep/2021:06:52:32------》2021-09-18 06:52:32
            String time_local = formatDate(arr[3].substring(1));
            if(null==time_local || "".equals(time_local)) {
                time_local="-invalid_time-";
            }

            webLogBean.setTime_local(time_local);
            webLogBean.setRequest(arr[6]);
            webLogBean.setStatus(arr[8]);
            webLogBean.setBody_bytes_sent(arr[9]);
            webLogBean.setHttp_referer(arr[10]);

            //如果useragent元素较多,拼接useragent。
            //数组长度大于12,说明我们的最后一个字段切出来比较长,我们把所有多的数据都塞到最后一个字段里面去
            //  "Mozilla/4.0 (compatible; MSIE 8.0; Windows NT 5.1; Trident/4.0; .NET CLR 1.1.4322; .NET CLR 2.0.50727; .NET CLR 3.0.04506.30; .NET CLR 3.0.4506.2152; .NET CLR 3.5.30729; MDDR; InfoPath.2; .NET4.0C)"
            if (arr.length > 12) {
                StringBuilder sb = new StringBuilder();
                for(int i=11;i<arr.length;i  ){
                    sb.append(arr[i]);
                }
                webLogBean.setHttp_user_agent(sb.toString());
            } else {
                webLogBean.setHttp_user_agent(arr[11]);
            }
            //如果请求状态码大于400值,就认为是请求出错了,请求出错的数据直接认为是无效数据
            if (Integer.parseInt(webLogBean.getStatus()) >= 400) {// 大于400,HTTP错误
                webLogBean.setValid(false);
            }

            //如果获取时间没拿到,那么也是认为是无效的数据
            if("-invalid_time-".equals(webLogBean.getTime_local())){
                webLogBean.setValid(false);
            }
        } else {
            //58.215.204.118 - - [18/Sep/2021:06:52:33  0000] "-" 400 0 "-" "-"
            //如果切出来的数组长度小于11个,说明数据不全,,直接丢掉
            webLogBean=null;
        }

        return webLogBean;
    }

    /*
     * 设置静态资源
     * */
    public static void filtStaticResource(WebLogBean bean, Set<String> pages) {
        if (!pages.contains(bean.getRequest())) {
            bean.setValid(false);
        }
    }


    //格式化时间方法
    public static String formatDate(String time_local) {
        try {
            return df2.format(df1.parse(time_local));
        } catch (ParseException e) {
            return null;
        }
    }
    
}

编写预处理Mapper类

edu.sx.clickstream.pre.WeblogPreProcessMapper

代码如下:

代码语言:javascript复制
package edu.sx.clickstream.pre;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;
import java.util.HashSet;
import java.util.Set;

public class WeblogPreProcessMapper extends Mapper<LongWritable, Text, Text, NullWritable>
{
    // 用来存储网站url分类数据
    Set<String> pages = new HashSet<String>();
    Text k = new Text();
    NullWritable v = NullWritable.get();
    /**
     * map阶段的初始化方法
     * 从外部配置文件中加载网站的有用url分类数据 存储到maptask的内存中,用来对日志数据进行过滤
     * 过滤掉我们日志文件当中的一些静态资源,包括js   css  img  等请求日志都需要过滤掉
     */
    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
        //定义一个集合
        pages.add("/about");
        pages.add("/black-ip-list/");
        pages.add("/cassandra-clustor/");
        pages.add("/finance-rhive-repurchase/");
        pages.add("/hadoop-family-roadmap/");
        pages.add("/hadoop-hive-intro/");
        pages.add("/hadoop-zookeeper-intro/");
        pages.add("/hadoop-mahout-roadmap/");

    }

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        //得到我们一行数据
        String line = value.toString();
        WebLogBean webLogBean = WebLogParser.parser(line);
        if (webLogBean != null) {
            // 过滤js/图片/css等静态资源
            WebLogParser.filtStaticResource(webLogBean, pages);
            if (!webLogBean.isValid()) return;
            k.set(webLogBean.toString());
            context.write(k, v);
        }
    }
}

编写预处理Driver类

edu.sx.clickstream.pre.WeblogEtlPreProcessDriver

代码如下:

代码语言:javascript复制
package edu.sx.clickstream.pre;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

import java.io.IOException;

public class WeblogEtlPreProcessDriver {

    static {
        try {
            // 设置 HADOOP_HOME 目录
            System.setProperty("hadoop.home.dir", "D:/hadoop");
            // 加载库文件
            System.load("D:/hadoop/bin/hadoop.dll");
        } catch (UnsatisfiedLinkError e) {
            System.err.println("Native code library failed to load.n"   e);
            System.exit(1);
        }
    }


    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {

        Configuration configuration = new Configuration();
        Job job = Job.getInstance(configuration);

        FileInputFormat.addInputPath(job,new Path("file:///D:\hadoop\clickstreaminput1"));
        //  FileInputFormat.addInputPath(job,new Path("hdfs://192.168.137.128:8020/data/weblog/preprocess/input/2021-12-10/11-36_192.168.137.128"));

        job.setInputFormatClass(TextInputFormat.class);
        FileOutputFormat.setOutputPath(job,new Path("file:///D:\hadoop\clickstreamoutput1"));
//        FileOutputFormat.setOutputPath(job,new Path("hdfs://192.168.137.128:8020/data/weblog/preprocess/weblogPreOut"));
        job.setOutputFormatClass(TextOutputFormat.class);
        job.setJarByClass(WeblogEtlPreProcessDriver.class);

        job.setMapperClass(WeblogPreProcessMapper.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(NullWritable.class);
        job.setNumReduceTasks(0);
        boolean res = job.waitForCompletion(true);

    }
    
}

4.采用MR实现数据预处理-获得页面流pageviews数据模型

pageviews原理

在GA上,每个页面每次加载将被记为一次PV。举例来说,一次用户访问页面顺序为:页面A->页面B->页面A,然后离开了你的站点,那这次用户访问(Visits)的PV总计为3次。

创建页面流数据模型pageviews-Mapper类

edu.sx.clickstream.pageviews.ClickStreamMapper

代码:

代码语言:javascript复制
package edu.sx.clickstream.pageviews;

import edu.sx.clickstream.pre.WebLogBean;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class ClickStreamMapper extends Mapper<LongWritable, Text, Text, WebLogBean> {

    Text k = new Text();
    WebLogBean v = new WebLogBean();

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

        String line = value.toString();
        String[] fields = line.split("01");
        if (fields.length < 9) return;
        //将切分出来的各字段set到weblogbean中
        v.set("true".equals(fields[0]) ? true : false, fields[1], fields[2], fields[3], fields[4], fields[5], fields[6], fields[7], fields[8]);
        //只有有效记录才进入后续处理
        if (v.isValid()) {
            //此处用ip地址来标识用户
            //使用我们的IP作为我们的k2这样就可以标识出我们同一个IP的数据都会发送到同一个reduce当中去
            k.set(v.getRemote_addr());//将我们的ip地址设置成我们的key2
            context.write(k, v);
        }
    }
}

创建页面流数据模型pageviews-Reducer类

edu.sx.clickstream.pageviews.ClickStreamReducer

代码:

代码语言:javascript复制
package edu.sx.clickstream.pageviews;

import edu.sx.clickstream.pre.WebLogBean;
import org.apache.commons.beanutils.BeanUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.*;

public class ClickStreamReducer extends Reducer<Text, WebLogBean, NullWritable, Text> {

    Text v = new Text();

    @Override
    protected void reduce(Text key, Iterable<WebLogBean> values, Context context) throws IOException, InterruptedException {

        ArrayList<WebLogBean> beans = new ArrayList<WebLogBean>();

        //循环遍历V2,这里面装的,都是我们的同一个用的数据
        for (WebLogBean bean : values) {
            //	beans.add(bean);
            //为什么list集合当中不能直接添加循环出来的这个bean?
            //这里通过属性拷贝,每次new  一个对象,避免了bean的属性值每次覆盖
            //这是涉及到java的深浅拷贝问题
            WebLogBean webLogBean = new WebLogBean();
            try {
                BeanUtils.copyProperties(webLogBean, bean);
            } catch(Exception e) {
                e.printStackTrace();
            }
            //beans.add(bean);
            beans.add(webLogBean);
        }

        //按时间排序处理
        //将bean按时间先后顺序排序,排好序之后,就计算这个集合当中下一个时间和上一个时间的差值 ,如
        //如果差值小于三十分钟,那么就代表一次会话,如果差值大于30分钟,那么就代表多次会话
        //将我们的weblogBean塞到一个集合当中,我们就可以自定义排序,对集合当中的数据进行排序
        Collections.sort(beans, new Comparator<WebLogBean>() {
            @Override
            public int compare(WebLogBean o1, WebLogBean o2) {
                try {
                    Date d1 = toDate(o1.getTime_local());
                    Date d2 = toDate(o2.getTime_local());
                    if (d1 == null || d2 == null)
                        return 0;
                    return d1.compareTo(d2);
                } catch (Exception e) {
                    e.printStackTrace();
                    return 0;
                }
            }

        });


        int step = 1;
        //定义一个uuid作为我们的session编号
        String session = UUID.randomUUID().toString();

        ///经过排序之后,集合里面的数据都是按照时间来排好序了
        for (int i = 0; i < beans.size(); i  ) {
            WebLogBean bean = beans.get(i);
            // 如果仅有1条数据,则直接输出
            if (1 == beans.size()) {
                // 设置默认停留时长为60s
                v.set(session "01" key.toString() "01" bean.getRemote_user()   "01"   bean.getTime_local()   "01"   bean.getRequest()   "01"   step   "01"   (60)   "01"   bean.getHttp_referer()   "01"   bean.getHttp_user_agent()   "01"   bean.getBody_bytes_sent()   "01"
                          bean.getStatus());
                context.write(NullWritable.get(), v);
                session = UUID.randomUUID().toString();
                break;
            }

            // 如果不止1条数据,则将第一条跳过不输出,遍历第二条时再输出
            if (i == 0) {
                continue;
            }

            // 求近两次时间差
            long timeDiff = 0;
            try {
                timeDiff = timeDiff(toDate(bean.getTime_local()), toDate(beans.get(i - 1).getTime_local()));
            } catch (ParseException e) {
                e.printStackTrace();
            }
            // 如果本次-上次时间差<30分钟,则输出前一次的页面访问信息
            if (timeDiff < 30 * 60 * 1000) {
                v.set(session "01" key.toString() "01" beans.get(i - 1).getRemote_user()   "01"   beans.get(i - 1).getTime_local()   "01"   beans.get(i - 1).getRequest()   "01"   step   "01"   (timeDiff / 1000)   "01"   beans.get(i - 1).getHttp_referer()   "01"
                          beans.get(i - 1).getHttp_user_agent()   "01"   beans.get(i - 1).getBody_bytes_sent()   "01"   beans.get(i - 1).getStatus());
                context.write(NullWritable.get(), v);
                step  ;
            }else {

                // 如果本次-上次时间差>30分钟,则输出前一次的页面访问信息且将step重置,以分隔为新的visit
                v.set(session "01" key.toString() "01" beans.get(i - 1).getRemote_user()   "01"   beans.get(i - 1).getTime_local()   "01"   beans.get(i - 1).getRequest()   "01"   (step)   "01"   (60)   "01"   beans.get(i - 1).getHttp_referer()   "01"
                          beans.get(i - 1).getHttp_user_agent()   "01"   beans.get(i - 1).getBody_bytes_sent()   "01"   beans.get(i - 1).getStatus());
                context.write(NullWritable.get(), v);
                // 输出完上一条之后,重置step编号
                step = 1;
                session = UUID.randomUUID().toString();

            }

            // 如果此次遍历的是最后一条,则将本条直接输出
            if (i == beans.size() - 1) {
                // 设置默认停留市场为60s
                v.set(session "01" key.toString() "01" bean.getRemote_user()   "01"   bean.getTime_local()   "01"   bean.getRequest()   "01"   step   "01"   (60)   "01"   bean.getHttp_referer()   "01"   bean.getHttp_user_agent()   "01"   bean.getBody_bytes_sent()   "01"   bean.getStatus());
                context.write(NullWritable.get(), v);

            }


        }

    }


    private String toStr(Date date) {
        SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss", Locale.US);
        return df.format(date);
    }

    private Date toDate(String timeStr) throws ParseException {
        SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss", Locale.US);
        return df.parse(timeStr);
    }

    private long timeDiff(String time1, String time2) throws ParseException {
        Date d1 = toDate(time1);
        Date d2 = toDate(time2);
        return d1.getTime() - d2.getTime();

    }

    private long timeDiff(Date time1, Date time2) throws ParseException {
        // date  调用 getTime获取毫秒值
        return time1.getTime() - time2.getTime();

    }
}

创建页面流数据模型pageviews-Driver类

edu.sx.clickstream.pageviews.ClickStreamDriver

代码:

代码语言:javascript复制
package edu.sx.clickstream.pageviews;

import edu.sx.clickstream.pre.WebLogBean;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

import java.io.IOException;

public class ClickStreamDriver {

    static {
        try {
            // 设置 HADOOP_HOME 目录
            System.setProperty("hadoop.home.dir", "D:/hadoop");
            // 加载库文件
            System.load("D:/hadoop/bin/hadoop.dll");
        } catch (UnsatisfiedLinkError e) {
            System.err.println("Native code library failed to load.n"   e);
            System.exit(1);
        }
    }


    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

        Configuration configuration = new Configuration();
        Job job = Job.getInstance(configuration);

        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);
//
        FileInputFormat.addInputPath(job,new Path("file:///D:\hadoop\clickstreamoutput1\part-m-00000"));
        //  FileInputFormat.addInputPath(job,new Path("hdfs://192.168.137.128:8020/data/weblog/preprocess/input/2021-12-10/11-36_192.168.137.128"));

        job.setInputFormatClass(TextInputFormat.class);
        FileOutputFormat.setOutputPath(job,new Path("file:///D:\hadoop\clickstreamoutput2"));
//

        job.setJarByClass(ClickStreamDriver.class);
        job.setMapperClass(ClickStreamMapper.class);
        job.setReducerClass(ClickStreamReducer.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(WebLogBean.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        boolean b = job.waitForCompletion(true);
        System.exit(b?0:1);

    }
}

执行结果会在D:hadoopclickstreamoutput2下

5.采用MR实现数据预处理-获得点击流访问表visits数据模型

点击流访问表visits原理

输入数据为pageviews:

表示每个会话(用户)访问步骤

visits数据为:

表示每位用户的访问情况。

  • 它以session作为key
  • 记录当前session的进入页面的数据
  • 记录当前session离开页面的数据
  • 此数据是从页面点击流数据pageviews演变过来的。

实现数据格式原理

  • 以session作为key
  • 以step作为排序字段
  • 访问页面数据=step等于1的数据
  • 离开页面数据=session的页面数-1的数据
  • 统计页面数=session访问页的size个数

生成点击流访问数据

生成Visits访问数据MR-实体类PageViewsBean

创建实体类

edu.sx.clickstream.visits.PageViewsBean

代码:

代码语言:javascript复制
package edu.sx.clickstream.visits;

import org.apache.hadoop.io.Writable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class PageViewsBean implements Writable {

    private String session;
    private String remote_addr;
    private String timestr;
    private String request;
    private int step;
    private String staylong;
    private String referal;
    private String useragent;
    private String bytes_send;
    private String status;

    public void set(String session, String remote_addr, String useragent, String timestr, String request, int step, String staylong, String referal, String bytes_send, String status) {
        this.session = session;
        this.remote_addr = remote_addr;
        this.useragent = useragent;
        this.timestr = timestr;
        this.request = request;
        this.step = step;
        this.staylong = staylong;
        this.referal = referal;
        this.bytes_send = bytes_send;
        this.status = status;
    }

    public String getSession() {
        return session;
    }

    public void setSession(String session) {
        this.session = session;
    }

    public String getRemote_addr() {
        return remote_addr;
    }

    public void setRemote_addr(String remote_addr) {
        this.remote_addr = remote_addr;
    }

    public String getTimestr() {
        return timestr;
    }

    public void setTimestr(String timestr) {
        this.timestr = timestr;
    }

    public String getRequest() {
        return request;
    }

    public void setRequest(String request) {
        this.request = request;
    }

    public int getStep() {
        return step;
    }

    public void setStep(int step) {
        this.step = step;
    }

    public String getStaylong() {
        return staylong;
    }

    public void setStaylong(String staylong) {
        this.staylong = staylong;
    }

    public String getReferal() {
        return referal;
    }

    public void setReferal(String referal) {
        this.referal = referal;
    }

    public String getUseragent() {
        return useragent;
    }

    public void setUseragent(String useragent) {
        this.useragent = useragent;
    }

    public String getBytes_send() {
        return bytes_send;
    }

    public void setBytes_send(String bytes_send) {
        this.bytes_send = bytes_send;
    }

    public String getStatus() {
        return status;
    }

    public void setStatus(String status) {
        this.status = status;
    }

    @Override
    public void readFields(DataInput in) throws IOException {
        this.session = in.readUTF();
        this.remote_addr = in.readUTF();
        this.timestr = in.readUTF();
        this.request = in.readUTF();
        this.step = in.readInt();
        this.staylong = in.readUTF();
        this.referal = in.readUTF();
        this.useragent = in.readUTF();
        this.bytes_send = in.readUTF();
        this.status = in.readUTF();

    }

    @Override
    public void write(DataOutput out) throws IOException {
        out.writeUTF(session);
        out.writeUTF(remote_addr);
        out.writeUTF(timestr);
        out.writeUTF(request);
        out.writeInt(step);
        out.writeUTF(staylong);
        out.writeUTF(referal);
        out.writeUTF(useragent);
        out.writeUTF(bytes_send);
        out.writeUTF(status);

    }

}

生成Visits访问数据MR-实体类VisitBean

创建实体类

edu.sx.clickstream.visits.VisitBean

代码:

代码语言:javascript复制
package edu.sx.clickstream.visits;

import org.apache.hadoop.io.Writable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class VisitBean implements Writable {

    private String session;
    private String remote_addr;
    private String inTime;
    private String outTime;
    private String inPage;
    private String outPage;
    private String referal;

    private int pageVisits;

    public void set(String session, String remote_addr, String inTime, String outTime, String inPage, String outPage, String referal, int pageVisits) {
        this.session = session;
        this.remote_addr = remote_addr;
        this.inTime = inTime;
        this.outTime = outTime;
        this.inPage = inPage;
        this.outPage = outPage;
        this.referal = referal;
        this.pageVisits = pageVisits;
    }

    public String getSession() {
        return session;
    }

    public void setSession(String session) {
        this.session = session;
    }

    public String getRemote_addr() {
        return remote_addr;
    }

    public void setRemote_addr(String remote_addr) {
        this.remote_addr = remote_addr;
    }

    public String getInTime() {
        return inTime;
    }

    public void setInTime(String inTime) {
        this.inTime = inTime;
    }

    public String getOutTime() {
        return outTime;
    }

    public void setOutTime(String outTime) {
        this.outTime = outTime;
    }

    public String getInPage() {
        return inPage;
    }

    public void setInPage(String inPage) {
        this.inPage = inPage;
    }

    public String getOutPage() {
        return outPage;
    }

    public void setOutPage(String outPage) {
        this.outPage = outPage;
    }

    public String getReferal() {
        return referal;
    }

    public void setReferal(String referal) {
        this.referal = referal;
    }

    public int getPageVisits() {
        return pageVisits;
    }

    public void setPageVisits(int pageVisits) {
        this.pageVisits = pageVisits;
    }

    @Override
    public void readFields(DataInput in) throws IOException {
        this.session = in.readUTF();
        this.remote_addr = in.readUTF();
        this.inTime = in.readUTF();
        this.outTime = in.readUTF();
        this.inPage = in.readUTF();
        this.outPage = in.readUTF();
        this.referal = in.readUTF();
        this.pageVisits = in.readInt();

    }

    @Override
    public void write(DataOutput out) throws IOException {
        out.writeUTF(session);
        out.writeUTF(remote_addr);
        out.writeUTF(inTime);
        out.writeUTF(outTime);
        out.writeUTF(inPage);
        out.writeUTF(outPage);
        out.writeUTF(referal);
        out.writeInt(pageVisits);

    }

    @Override
    public String toString() {
        return session   "01"   remote_addr   "01"   inTime   "01"   outTime   "01"   inPage   "01"   outPage   "01"   referal   "01"   pageVisits;
    }
}

生成Visits访问数据MR-Mapper类ClickStreamVisitMapper

创建实体类

edu.sx.clickstream.visits.ClickStreamVisitMapper

代码:

代码语言:javascript复制
package edu.sx.clickstream.visits;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class ClickStreamVisitMapper extends Mapper<LongWritable, Text, Text, PageViewsBean> {

    PageViewsBean pvBean = new PageViewsBean();
    Text k = new Text();
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String line = value.toString();
        String[] fields = line.split("01");
        int step = Integer.parseInt(fields[5]);
        //(String session, String remote_addr, String timestr, String request, int step, String staylong, String referal, String useragent, String bytes_send, String status)
        //299d6b78-9571-4fa9-bcc2-f2567c46df3472.46.128.140-2013-09-18 07:58:50/hadoop-zookeeper-intro/160"https://www.google.com/""Mozilla/5.0"14722200
        pvBean.set(fields[0], fields[1], fields[2], fields[3],fields[4], step, fields[6], fields[7], fields[8], fields[9]);
        //以我们的session来作为我们的key2,相同session的页面访问记录都会到同一个reduce里面去,形成一个集合
        k.set(pvBean.getSession());
        context.write(k, pvBean);
    }


}

生成Visits访问数据MR-Reducer类ClickStreamVisitReducer

创建实体类

edu.sx.clickstream.visits.ClickStreamVisitReducer

代码:

代码语言:javascript复制
package edu.sx.clickstream.visits;

import org.apache.commons.beanutils.BeanUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;


import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;

public class ClickStreamVisitReducer extends Reducer<Text, PageViewsBean, NullWritable, VisitBean> {

    @Override
    protected void reduce(Text session, Iterable<PageViewsBean> pvBeans, Context context) throws IOException, InterruptedException {

        // 将pvBeans按照step排序
        ArrayList<PageViewsBean> pvBeansList = new ArrayList<PageViewsBean>();
        for (PageViewsBean pvBean : pvBeans) {
            PageViewsBean bean = new PageViewsBean();
            try {
                BeanUtils.copyProperties(bean, pvBean);
                pvBeansList.add(bean);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        /**
         * 将数据按照我们的步骤进行排序,这样就可以得到哪个页面先访问,哪个页面后访问的
         */
        Collections.sort(pvBeansList, new Comparator<PageViewsBean>() {
            @Override
            public int compare(PageViewsBean o1, PageViewsBean o2) {
                return o1.getStep() > o2.getStep() ? 1 : -1;
            }
        });

        // 取这次visit的首尾pageview记录,将数据放入VisitBean中
        VisitBean visitBean = new VisitBean();
        // 取visit的首记录
        visitBean.setInPage(pvBeansList.get(0).getRequest());
        visitBean.setInTime(pvBeansList.get(0).getTimestr());
        // 取visit集合当中末尾的记录即可
        visitBean.setOutPage(pvBeansList.get(pvBeansList.size() - 1).getRequest());
        visitBean.setOutTime(pvBeansList.get(pvBeansList.size() - 1).getTimestr());
        // visit访问的页面数
        visitBean.setPageVisits(pvBeansList.size());
        // 来访者的ip
        visitBean.setRemote_addr(pvBeansList.get(0).getRemote_addr());
        // 本次visit的referal
        visitBean.setReferal(pvBeansList.get(0).getReferal());
        visitBean.setSession(session.toString());
        context.write(NullWritable.get(), visitBean);
    }
}

生成Visits访问数据MR-Driver类ClickStreamVisitDriver

创建实体类

edu.sx.clickstream.visits.ClickStreamVisitDriver

代码:

代码语言:javascript复制
package edu.sx.clickstream.visits;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

import java.io.IOException;

public class ClickStreamVisitDriver {

    static {
        try {
            // 设置 HADOOP_HOME 目录
            System.setProperty("hadoop.home.dir", "D:/hadoop");
            // 加载库文件
            System.load("D:/hadoop/bin/hadoop.dll");
        } catch (UnsatisfiedLinkError e) {
            System.err.println("Native code library failed to load.n"   e);
            System.exit(1);
        }
    }


    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

        Configuration configuration = new Configuration();
        Job job = Job.getInstance(configuration);

        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);
//        TextInputFormat.addInputPath(job,new Path("hdfs://192.168.137.128:8020/data/weblog/preprocess/pageViewOut"));
//        TextOutputFormat.setOutputPath(job,new Path("hdfs://192.168.137.128:8020/data/weblog/preprocess/clickStreamVisit2"));

        FileInputFormat.addInputPath(job,new Path("file:///D:\hadoop\clickstreamoutput2\part-r-00000"));
        FileOutputFormat.setOutputPath(job,new Path("file:///D:\hadoop\clickstreamoutput3"));
//

        job.setJarByClass(ClickStreamVisitDriver.class);
        job.setMapperClass(ClickStreamVisitMapper.class);
        job.setReducerClass(ClickStreamVisitReducer.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(PageViewsBean.class);
        job.setOutputKeyClass(NullWritable.class);
        job.setOutputValueClass(VisitBean.class);
        boolean b = job.waitForCompletion(true);
        System.exit(b?0:1);

    }
}

结果如下:

导入HDFS

去除静态资源访问的预处理后的数据位于D:hadoopclickstreamoutput1part-m-00000,复制一份并重命名为clickstreamdata-pre

预处理后的pageviews模型的数据位于D:hadoopclickstreamoutput2part-r-00000,重命名为clickstreamdata-pageviews

预处理后的visits模型的数据位于D:hadoopclickstreamoutput3part-r-00000,重命名为clickstreamdata-visits

将上面三个文件上传到lsn中,,默认路径为/home/ubuntu/Code,上传到虚拟机后,再将文件上传到linux中的hdfs上,路径为/sx/cleandlog

代码语言:javascript复制
hadoop fs -mkdir /sx/clickstream
hadoop fs -put /home/ubuntu/Code/clickstreamdata-pre /sx/clickstream
hadoop fs -put /home/ubuntu/Code/clickstreamdata-pageviews /sx/clickstream
hadoop fs -put /home/ubuntu/Code/clickstreamdata-visits /sx/clickstream

总结

本部分读取的日志数据后,进行预处理,获取pageviews数据模型,获取访问visits数据模型。下一步骤为将对应的结果传到hive中即可。

0 人点赞