3.3、数据分析
我们的数据已经完整的采集到了 HBase 集群中,这次我们需要对采集到的数据进行分析,统计出我们想要的结果。注意,在分析的过程中,我们不一定会采取一个业务指标对应一个 mapreduce-job 的方式,如果情景允许,我们会采取一个 mapreduce 分析多个业务指标的方式来进行任务。具体何时采用哪种方式,我们后续会详细探讨。
分析模块流程图:
业务指标: a) 用户每天主叫通话个数统计,通话时间统计。 b) 用户每月通话记录统计,通话时间统计。 c) 用户之间亲密关系统计。(通话次数与通话时间体现用户亲密关系)
3.3.1、Mysql 表结构设计
我们将分析的结果数据保存到 Mysql 中,以方便 Web 端进行查询展示。
思路讨论:
1) 表:db_telecom.tb_call
用于存放【某个查询人维度下】和【某个时间维度下】通话次数与通话时长的总和。
2) 表:db_telecom.tb_contacts
用于存放【查询人维度】的相关数据(用户手机号码与查询人姓名)。
3) 表:db_telecom.tb_dimension_date
用于存放【时间维度】的相关数据(年、月、日)。
4) 表:db_telecom.tb_intimacy
用于存放所有用户【用户关系】的结果数据。(作业中使用)
3.3.2、需求:按照不同的维度统计通话
根据需求目标,设计出如上表结构。我们需要按照查询人范围和时间范围(年月日),结合 MapReduce 统计出所属时间范围内所有手机号码的通话次数总和以及通话时长总和。
思路:
a) 维度,即某个角度,某个视角,按照时间维度来统计通话,比如我想统计 2017 年所有月份所有日子的通话记录,那这个维度我们大概可以表述为 2017 年*月*日
。
b) 通过 Mapper 将数据按照不同维度聚合给 Reducer。
c) 通过 Reducer 拿到按照各个维度聚合过来的数据,进行汇总,输出。
d) 根据业务需求,将 Reducer 的输出通过 Outputformat 把数据输出到 Mysql。
数据输入:HBase 数据输出:Mysql
HBase 中数据源结构:
思路:
a) 已知目标,那么需要结合目标思考已有数据是否能够支撑目标实现;
b) 根据目标数据结构,构建 Mysql 表结构,建表;
c) 思考代码需要涉及到哪些功能模块,建立不同功能模块对应的包结构。
d) 描述数据,一定是基于某个维度(视角)的,所以构建维度类
。比如按照“手机号码”与“年”的组合作为 key 聚合所有的数据,便可以统计这个手机号码,这一年的相关结果。
e) 自定义 OutputFormat 用于对接 Mysql,使数据输出。
f) 创建相关工具类。
MySQL 结果表的创建
代码语言:javascript复制/*
Navicat MySQL Data Transfer
Source Server : 192.168.25.102
Source Server Version : 50173
Source Host : 192.168.25.102:3306
Source Database : db_telecom
Target Server Type : MYSQL
Target Server Version : 50173
File Encoding : 65001
Date: 2019-03-19 16:11:44
*/
SET FOREIGN_KEY_CHECKS=0;
-- ----------------------------
-- Table structure for tb_call
-- ----------------------------
DROP TABLE IF EXISTS `tb_call`;
CREATE TABLE `tb_call` (
`id_contact_date` varchar(255) NOT NULL,
`id_dimension_contact` int(11) NOT NULL,
`id_dimension_date` int(11) NOT NULL,
`call_sum` int(11) NOT NULL,
`call_duration_sum` int(11) NOT NULL,
PRIMARY KEY (`id_contact_date`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
-- ----------------------------
-- Table structure for tb_dimension_contacts
-- ----------------------------
DROP TABLE IF EXISTS `tb_dimension_contacts`;
CREATE TABLE `tb_dimension_contacts` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`telephone` varchar(255) NOT NULL,
`name` varchar(255) NOT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
-- ----------------------------
-- Table structure for tb_dimension_date
-- ----------------------------
DROP TABLE IF EXISTS `tb_dimension_date`;
CREATE TABLE `tb_dimension_date` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`year` int(11) NOT NULL,
`month` int(11) NOT NULL,
`day` int(11) NOT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
-- ----------------------------
-- Table structure for tb_intimacy
-- ----------------------------
DROP TABLE IF EXISTS `tb_intimacy`;
CREATE TABLE `tb_intimacy` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`intimacy_rank` int(11) NOT NULL,
`contact_id1` int(11) NOT NULL,
`contact_id2` int(11) NOT NULL,
`call_count` int(11) NOT NULL,
`call_duration_count` int(11) NOT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
注意
:字段名为什么加反引号? 答:因为 Mysql 中 sql 语法是不区分大小写的,而 Mysql 有一个优化机制,关键字用小写,表名和字段名用小写;关键字用大写,表名和字段名用大写;会提高 sql 执行的效率。 加反引号的意思是:不让其对字段进行大小写的优化。
使用 Navicat 创建数据库和表,如下:
3.3.3、环境准备
1) idea 中 新建 module:ct_analysis pom.xml 文件配置如下:
代码语言:javascript复制<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.china</groupId>
<artifactId>ct_analysis</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.27</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.hbase/hbase-client -->
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>1.3.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.hbase/hbase-server -->
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>1.3.1</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.12.4</version>
<configuration>
<!-- 设置打包时跳过test包里面的代码 -->
<skipTests>true</skipTests>
</configuration>
</plugin>
</plugins>
</build>
</project>
2) 创建包结构,根包:com.china,如下图所示:
3) 类表,如下图所示:
3.3.4、编写代码:数据分析
1) 创建类:CountDurationMapper(数据分析的Mapper类,继承自 TableMapper)
代码语言:javascript复制package com.china.analysis.mapper;
import com.china.analysis.kv.key.ComDimension;
import com.china.analysis.kv.key.ContactDimension;
import com.china.analysis.kv.key.DateDimension;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
/**
* @author chenmingjun
* 2019-03-19 10:26
*/
public class CountDurationMapper extends TableMapper<ComDimension, Text> {
private ComDimension comDimension = new ComDimension();
private Text durationText = new Text();
// 用于存放联系人电话与姓名的映射
private Map<String, String> phoneNameMap = null;
@Override
protected void setup(Context context) throws IOException, InterruptedException {
phoneNameMap = new HashMap<>();
phoneNameMap.put("13242820024", "李雁");
phoneNameMap.put("14036178412", "卫艺");
phoneNameMap.put("16386074226", "仰莉");
phoneNameMap.put("13943139492", "陶欣悦");
phoneNameMap.put("18714767399", "施梅梅");
phoneNameMap.put("14733819877", "金虹霖");
phoneNameMap.put("13351126401", "魏明艳");
phoneNameMap.put("13017498589", "华贞");
phoneNameMap.put("16058589347", "华啟倩");
phoneNameMap.put("18949811796", "仲采绿");
phoneNameMap.put("13558773808", "卫丹");
phoneNameMap.put("14343683320", "戚丽红");
phoneNameMap.put("13870632301", "何翠柔");
phoneNameMap.put("13465110157", "钱溶艳");
phoneNameMap.put("15382018060", "钱琳");
phoneNameMap.put("13231085347", "缪静欣");
phoneNameMap.put("13938679959", "焦秋菊");
phoneNameMap.put("13779982232", "吕访琴");
phoneNameMap.put("18144784030", "沈丹");
phoneNameMap.put("18637946280", "褚美丽");
}
@Override
protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {
// 01_15837312345_20170810141024_13738909097_1_0180
// 获取数据
String roeKey = Bytes.toString(value.getRow());
// 切割
String[] splits = roeKey.split("_");
// 只拿到主叫数据即可
String flag = splits[4];
if (flag.equals("0")) return;
String call1 = splits[1];
String call2 = splits[3];
String bulidTime = splits[2];
String duration = splits[5];
durationText.set(duration);
int year = Integer.valueOf(bulidTime.substring(0, 4));
int month = Integer.valueOf(bulidTime.substring(4, 6));
int day = Integer.valueOf(bulidTime.substring(6, 8));
// 组装-时间维度类DateDimension
DateDimension yearDimension = new DateDimension(year, -1, -1);
DateDimension monthDimension = new DateDimension(year, month, -1);
DateDimension dayDimension = new DateDimension(year, month, day);
// 组装-联系人维度类ContactDimension
ContactDimension call1ContactDimension = new ContactDimension(call1, phoneNameMap.get(call1)); // 实际业务做法:1、不写name。2、在Mapper这里调用HBase的API去HBase中将名字和手机号的映射读出来。
ContactDimension call2ContactDimension = new ContactDimension(call2, phoneNameMap.get(call2)); // 学习阶段,为了数据好看和省事,我们简单做一下
// 组装-组合维度类ComDimension
// 聚合主叫数据
comDimension.setContactDimension(call1ContactDimension);
// 年
comDimension.setDateDimension(yearDimension);
context.write(comDimension, durationText);
// 月
comDimension.setDateDimension(monthDimension);
context.write(comDimension, durationText);
// 日
comDimension.setDateDimension(dayDimension);
context.write(comDimension, durationText);
// 聚合被叫数据
comDimension.setContactDimension(call2ContactDimension);
// 年
comDimension.setDateDimension(yearDimension);
context.write(comDimension, durationText);
// 月
comDimension.setDateDimension(monthDimension);
context.write(comDimension, durationText);
// 日
comDimension.setDateDimension(dayDimension);
context.write(comDimension, durationText);
}
}
2) 创建类:CountDurationReducer(数据分析的Reducer类,继承自 Reduccer)
代码语言:javascript复制package com.china.analysis.reducer;
import com.china.analysis.kv.key.ComDimension;
import com.china.analysis.kv.value.CountDurationValue;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
/**
* @author chenmingjun
* 2019-03-19 16:30
*/
public class CountDurationReducer extends Reducer<ComDimension, Text, ComDimension, CountDurationValue> {
private CountDurationValue countDurationValue = new CountDurationValue();
@Override
protected void reduce(ComDimension key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
int callSum = 0;
int callDurationSum = 0;
for (Text text : values) {
callSum ;
callDurationSum = Integer.valueOf(text.toString());
}
countDurationValue.setCallSum(callSum);
countDurationValue.setCallDurationSum(callDurationSum);
context.write(key, countDurationValue);
}
}
3) 创建类:CountDurationRunner(数据分析的驱动类,组装 Job)
代码语言:javascript复制package com.china.analysis.runner;
import com.china.analysis.kv.key.ComDimension;
import com.china.analysis.kv.value.CountDurationValue;
import com.china.analysis.mapper.CountDurationMapper;
import com.china.analysis.outputformat.MySQLOutputFormat;
import com.china.analysis.reducer.CountDurationReducer;
import com.china.constants.Constants;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import java.io.IOException;
/**
* @author chenmingjun
* 2019-03-19 16:41
*/
public class CountDurationRunner implements Tool {
private Configuration conf = null;
@Override
public void setConf(Configuration conf) { // conf默认是从resources中加载,加载文件的顺序是:
this.conf = HBaseConfiguration.create(conf); // core-default.xml -> core-site.xml -> hdfs-default.xml -> hdfs-site.xml -> hbase-default.xml -> hbase-site.xml
}
@Override
public Configuration getConf() {
return this.conf;
}
@Override
public int run(String[] strings) throws Exception {
// 得到conf
// 实例化Job
Job job = Job.getInstance(conf, "CALLLOG_ANALYSIS");
job.setJarByClass(CountDurationRunner.class);
// 组装Mapper Inputformat(注意:Inputformat 需要使用 HBase 提供的 HBaseInputformat 或者使用自定义的 Inputformat)
initHBaseInputConfig(job);
// 组装Reducer Outputformat
initReducerOutputConfig(job);
return job.waitForCompletion(true) ? 0 : 1;
}
private void initHBaseInputConfig(Job job) {
Connection conn = null;
Admin admin = null;
try {
conn = ConnectionFactory.createConnection(conf);
admin = conn.getAdmin();
if (!admin.tableExists(TableName.valueOf(Constants.SCAN_TABLE_NAME))) {
throw new RuntimeException("无法找到目标表");
}
Scan scan = new Scan();
// 可以对Scan进行优化
// scan.setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME, Bytes.toBytes(Constants.SCAN_TABLE_NAME));
TableMapReduceUtil.initTableMapperJob(
Constants.SCAN_TABLE_NAME, // 数据源的表名
scan, // scan扫描控制器
CountDurationMapper.class, // 设置Mapper类
ComDimension.class, // 设置Mapper输出key类型
Text.class, // 设置Mapper输出value值类型
job, // 设置给哪个Job
true
);
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
if (admin != null) {
admin.close();
}
if (conn != null && conn.isClosed()) {
conn.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
private void initReducerOutputConfig(Job job) {
job.setReducerClass(CountDurationReducer.class);
job.setOutputKeyClass(ComDimension.class);
job.setOutputValueClass(CountDurationValue.class);
job.setOutputFormatClass(MySQLOutputFormat.class);
}
public static void main(String[] args) {
try {
int status = ToolRunner.run(new CountDurationRunner(), args);
System.exit(status);
if (status == 0) {
System.out.println("运行成功");
} else {
System.out.println("运行失败");
}
} catch (Exception e) {
System.out.println("运行失败");
e.printStackTrace();
}
}
}
注意
:conf默认是从resources中加载,加载文件的顺序是:core-default.xml -> core-site.xml -> hdfs-default.xml -> hdfs-site.xml -> hbase-default.xml -> hbase-site.xml
4) 创建类:MySQLOutputFormat(自定义 Outputformat,对接 Mysql)
代码语言:javascript复制package com.china.analysis.outputformat;
import com.china.analysis.converter.impl.DimensionConverterImpl;
import com.china.analysis.kv.base.BaseDimension;
import com.china.analysis.kv.base.BaseValue;
import com.china.analysis.kv.key.ComDimension;
import com.china.analysis.kv.value.CountDurationValue;
import com.china.constants.Constants;
import com.china.utils.JDBCCacheBean;
import com.china.utils.JDBCUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
/**
* @author chenmingjun
* 2019-03-19 19:01
*/
public class MySQLOutputFormat extends OutputFormat<BaseDimension, BaseValue> {
private OutputCommitter committer = null;
@Override
public RecordWriter<BaseDimension, BaseValue> getRecordWriter(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
// 初始化JDBC连接器对象
Connection conn = null;
try {
conn = JDBCCacheBean.getInstance();
// 关闭自动提交,以便于批量提交
conn.setAutoCommit(false);
} catch (SQLException e) {
throw new IOException(e);
}
return new MysqlRecordWriter(conn);
}
@Override
public void checkOutputSpecs(JobContext jobContext) throws IOException, InterruptedException {
// 校验输出
}
@Override
public OutputCommitter getOutputCommitter(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
// 根据源码怎么实现的模仿写
if (committer == null) {
String name = taskAttemptContext.getConfiguration().get(FileOutputFormat.OUTDIR);
Path outputPath = name == null ? null : new Path(name);
committer = new FileOutputCommitter(outputPath, taskAttemptContext);
}
return committer;
}
static class MysqlRecordWriter extends RecordWriter<BaseDimension, BaseValue> {
private Connection conn = null;
private DimensionConverterImpl dci = null;
private PreparedStatement ps = null;
private String insertSQL = null;
private int count = 0;
private int batchNumber = 0;
public MysqlRecordWriter(Connection conn) {
this.conn = conn;
this.dci = new DimensionConverterImpl();
this.batchNumber = Constants.JDBC_DEFAULT_BATCH_NUMBER;
}
@Override
public void write(BaseDimension key, BaseValue value) throws IOException, InterruptedException {
try {
// 向Mysql中tb_call表写入数据
// tb_call:id_contact_date, id_dimension_contact, id_dimension_date, call_sum, call_duration_sum
// 封装SQL语句
if (insertSQL == null) {
insertSQL = "INSERT INTO `tb_call` (`id_contact_date`, `id_dimension_contact`, `id_dimension_date`, `call_sum`, `call_duration_sum`) VALUES (?, ?, ?, ?, ?) ON DUPLICATE KEY UPDATE `id_contact_date`=?;";
}
// 执行插入操作
if (ps == null) {
ps = conn.prepareStatement(insertSQL);
}
ComDimension comDimension = (ComDimension) key;
CountDurationValue countDurationValue = (CountDurationValue) value;
// 封装要写入的数据
int id_dimension_contact = dci.getDimensionId(comDimension.getContactDimension());
int id_dimension_date = dci.getDimensionId(comDimension.getDateDimension());
String id_contact_date = id_dimension_contact "_" id_dimension_date;
int call_sum = countDurationValue.getCallSum();
int call_duration_sum = countDurationValue.getCallDurationSum();
// 本次SQL
int i = 0;
ps.setString( i, id_contact_date);
ps.setInt( i, id_dimension_contact);
ps.setInt( i, id_dimension_date);
ps.setInt( i, call_sum);
ps.setInt( i, call_duration_sum);
// 有则插入,无则更新的判断依据
ps.setString( i, id_contact_date);
ps.addBatch();
// 当前缓存了多少个sql语句,等待批量执行,计数器
count ;
if (count >= this.batchNumber) {
// 批量插入
ps.executeBatch();
// 连接提交
conn.commit();
count = 0;
}
} catch (SQLException e) {
e.printStackTrace();
}
}
@Override
public void close(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
try {
if (ps != null) {
ps.executeBatch();
this.conn.commit();
}
} catch (SQLException e) {
e.printStackTrace();
} finally {
JDBCUtil.close(conn, ps, null);
}
}
}
}
5) 创建类:BaseDimension(维度(key)基类,为了便于扩展)
代码语言:javascript复制package com.china.analysis.kv.base;
import org.apache.hadoop.io.WritableComparable;
/**
* @author chenmingjun
* 2019-03-19 10:42
*/
public abstract class BaseDimension implements WritableComparable<BaseDimension> {}
6) 创建类:BaseValue(值(value)基类,为了便于扩展)
代码语言:javascript复制package com.china.analysis.kv.base;
import org.apache.hadoop.io.Writable;
/**
* @author chenmingjun
* 2019-03-19 10:43
*/
public abstract class BaseValue implements Writable {}
7) 创建类:ContactDimension(联系人维度,封装 Mapper 输出的 key)
代码语言:javascript复制package com.china.analysis.kv.key;
import com.china.analysis.kv.base.BaseDimension;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
/**
* 联系人维度类
*
* @author chenmingjun
* 2019-03-19 10:49
*/
public class ContactDimension extends BaseDimension {
// 联系人维度主键
private int id;
// 联系人维度:手机号码
private String telephone;
// 联系人维度:姓名
private String name;
public ContactDimension() {
super();
}
public ContactDimension(String telephone, String name) {
super();
this.telephone = telephone;
this.name = name;
}
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getTelephone() {
return telephone;
}
public void setTelephone(String telephone) {
this.telephone = telephone;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
ContactDimension that = (ContactDimension) o;
if (telephone != null ? !telephone.equals(that.telephone) : that.telephone != null) return false;
return name != null ? name.equals(that.name) : that.name == null;
}
@Override
public int hashCode() {
int result = telephone != null ? telephone.hashCode() : 0;
result = 31 * result (name != null ? name.hashCode() : 0);
return result;
}
@Override
public int compareTo(BaseDimension o) {
if (o == this) return 0;
ContactDimension anotherContactDimension = (ContactDimension) o;
int result = Integer.compare(this.id, anotherContactDimension.getId());
if (result != 0) return result;
result= this.telephone.compareTo(anotherContactDimension.getTelephone());
if (result != 0) return result;
result = this.name.compareTo(anotherContactDimension.getName());
return result;
}
@Override
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeInt(this.id);
dataOutput.writeUTF(this.telephone);
dataOutput.writeUTF(this.name);
}
@Override
public void readFields(DataInput dataInput) throws IOException {
this.id = dataInput.readInt();
this.telephone = dataInput.readUTF();
this.name = dataInput.readUTF();
}
@Override
public String toString() {
return "ContactDimension{"
"id=" id
", telephone='" telephone '''
", name='" name '''
'}';
}
}
8) 创建类:DateDimension(时间维度,封装 Mapper 输出的 key)
代码语言:javascript复制package com.china.analysis.kv.key;
import com.china.analysis.kv.base.BaseDimension;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
/**
* 时间维度类
*
* @author chenmingjun
* 2019-03-19 11:36
*/
public class DateDimension extends BaseDimension {
// 时间维度主键
private int id;
// 时间维度:当前通话信息所在年
private int year;
// 时间维度:当前通话信息所在月,如果按照年来统计信息,则month为-1
private int month;
// 时间维度:当前通话信息所在日,如果按照年或者月来统计信息,则day为-1
private int day;
public DateDimension() {
super();
}
public DateDimension(int year, int month, int day) {
super();
this.year = year;
this.month = month;
this.day = day;
}
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public int getYear() {
return year;
}
public void setYear(int year) {
this.year = year;
}
public int getMonth() {
return month;
}
public void setMonth(int month) {
this.month = month;
}
public int getDay() {
return day;
}
public void setDay(int day) {
this.day = day;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
DateDimension that = (DateDimension) o;
if (year != that.year) return false;
if (month != that.month) return false;
return day == that.day;
}
@Override
public int hashCode() {
int result = year;
result = 31 * result month;
result = 31 * result day;
return result;
}
@Override
public int compareTo(BaseDimension o) {
if (o == this) return 0;
DateDimension anotherDateDimension = (DateDimension) o;
int result = Integer.compare(this.id, anotherDateDimension.getId());
if (result != 0) return result;
result = Integer.compare(this.year, anotherDateDimension.getYear());
if (result != 0) return result;
result = Integer.compare(this.month, anotherDateDimension.getMonth());
if (result != 0) return result;
result = Integer.compare(this.day, anotherDateDimension.getDay());
return result;
}
@Override
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeInt(this.id);
dataOutput.writeInt(this.year);
dataOutput.writeInt(this.month);
dataOutput.writeInt(this.day);
}
@Override
public void readFields(DataInput dataInput) throws IOException {
this.id = dataInput.readInt();
this.year = dataInput.readInt();
this.month = dataInput.readInt();
this.day = dataInput.readInt();
}
@Override
public String toString() {
return "DateDimension{"
"id=" id
", year=" year
", month=" month
", day=" day
'}';
}
}
9) 创建类:ComDimension(时间维度 联系人维度的组合维度,封装 Mapper 输出的 组合key)
代码语言:javascript复制package com.china.analysis.kv.key;
import com.china.analysis.kv.base.BaseDimension;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
/**
* 时间维度 联系人维度的组合维度类(包装类)
*
* @author chenmingjun
* 2019-03-19 11:42
*/
public class ComDimension extends BaseDimension {
// 联系人维度
private ContactDimension contactDimension = new ContactDimension();
// 时间维度
private DateDimension dateDimension = new DateDimension();
public ComDimension() {
super();
}
public ComDimension(ContactDimension contactDimension, DateDimension dateDimension) {
super();
this.contactDimension = contactDimension;
this.dateDimension = dateDimension;
}
public ContactDimension getContactDimension() {
return contactDimension;
}
public void setContactDimension(ContactDimension contactDimension) {
this.contactDimension = contactDimension;
}
public DateDimension getDateDimension() {
return dateDimension;
}
public void setDateDimension(DateDimension dateDimension) {
this.dateDimension = dateDimension;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
ComDimension that = (ComDimension) o;
if (contactDimension != null ? !contactDimension.equals(that.contactDimension) : that.contactDimension != null)
return false;
return dateDimension != null ? dateDimension.equals(that.dateDimension) : that.dateDimension == null;
}
@Override
public int hashCode() {
int result = contactDimension != null ? contactDimension.hashCode() : 0;
result = 31 * result (dateDimension != null ? dateDimension.hashCode() : 0);
return result;
}
@Override
public int compareTo(BaseDimension o) {
if (this == o) return 0;
ComDimension anotherComDimension = (ComDimension) o;
int result = this.dateDimension.compareTo(anotherComDimension.getDateDimension());
if (result != 0) return result;
result = this.contactDimension.compareTo(anotherComDimension.getContactDimension());
return result;
}
@Override
public void write(DataOutput dataOutput) throws IOException {
this.contactDimension.write(dataOutput);
this.dateDimension.write(dataOutput);
}
@Override
public void readFields(DataInput dataInput) throws IOException {
this.contactDimension.readFields(dataInput);
this.dateDimension.readFields(dataInput);
}
@Override
public String toString() {
return "ComDimension{"
"contactDimension=" contactDimension
", dateDimension=" dateDimension
'}';
}
}
10) 创建类:CountDurationValue(通话次数与通话时长的封装,封装 Reducer 输出的 value)
代码语言:javascript复制package com.china.analysis.kv.value;
import com.china.analysis.kv.base.BaseValue;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
/**
* @author chenmingjun
* 2019-03-19 15:26
*/
public class CountDurationValue extends BaseValue {
// 某个维度通话次数总和
private int callSum;
// 某个维度通话时间总和
private int callDurationSum;
public CountDurationValue() {
super();
}
public CountDurationValue(int callSum, int callDurationSum) {
super();
this.callSum = callSum;
this.callDurationSum = callDurationSum;
}
public int getCallSum() {
return callSum;
}
public void setCallSum(int callSum) {
this.callSum = callSum;
}
public int getCallDurationSum() {
return callDurationSum;
}
public void setCallDurationSum(int callDurationSum) {
this.callDurationSum = callDurationSum;
}
@Override
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeInt(callSum);
dataOutput.writeInt(callDurationSum);
}
@Override
public void readFields(DataInput dataInput) throws IOException {
this.callSum = dataInput.readInt();
this.callDurationSum = dataInput.readInt();
}
@Override
public String toString() {
return "CountDurationValue{"
"callSum=" callSum
", callDurationSum=" callDurationSum
'}';
}
}
11) 创建类:JDBCUtil(封装 JDBC 和 关闭数据库连接资源操作)
代码语言:javascript复制package com.china.utils;
import java.sql.*;
/**
* @author chenmingjun
* 2019-03-19 9:56
*/
public class JDBCUtil {
private static final String MYSQL_DRIVER_CLASS = "com.mysql.jdbc.Driver";
private static final String MYSQL_URL = "jdbc:mysql://hadoop102:3306/db_telecom?userUnicode=true&characterEncoding=UTF-8";
private static final String MYSQL_USERNAME = "root";
private static final String MYSQL_PASSWORD = "123456";
/**
* 实例化 JDBC 连接器对象
*
* @return
*/
public static Connection getConnection() {
try {
Class.forName(MYSQL_DRIVER_CLASS);
return DriverManager.getConnection(MYSQL_URL, MYSQL_USERNAME, MYSQL_PASSWORD);
} catch (ClassNotFoundException e) {
e.printStackTrace();
} catch (SQLException e) {
e.printStackTrace();
}
return null;
}
/**
* 关闭数据库连接器释放资源
*
* @param conn
* @param stat
* @param rs
*/
public static void close(Connection conn, Statement stat, ResultSet rs) {
try {
if (rs != null && !rs.isClosed()) {
rs.close();
}
if (stat != null && !stat.isClosed()) {
stat.close();
}
if (conn != null && !conn.isClosed()) {
conn.close();
}
} catch (SQLException e) {
e.printStackTrace();
}
}
}
12) 创建类:JDBCCacheBean(单例 JDBC 连接器)
代码语言:javascript复制package com.china.utils;
import java.sql.Connection;
import java.sql.SQLException;
/**
* 单例 JDBC 连接器
*
* @author chenmingjun
* 2019-03-19 10:18
*/
public class JDBCCacheBean {
private static Connection conn = null;
private JDBCCacheBean() {}
public static Connection getInstance() {
try {
if (conn == null || conn.isClosed() || conn.isValid(3)) {
conn = JDBCUtil.getConnection();
}
} catch (SQLException e) {
e.printStackTrace();
}
return conn;
}
}
13) 创建类:DimensionConverter
代码语言:javascript复制package com.china.analysis.converter;
import com.china.analysis.kv.base.BaseDimension;
/**
* @author chenmingjun
* 2019-03-19 22:15
*/
public interface DimensionConverter {
/**
* 根据传入的 baseDimension 对象,获取数据库中对应该对象数据的id,如果不存在,则插入该数据再返回
*/
int getDimensionId(BaseDimension baseDimension);
}
14) 创建类:DimensionConverterImpl
代码语言:javascript复制package com.china.analysis.converter.impl;
import com.china.analysis.converter.DimensionConverter;
import com.china.analysis.kv.base.BaseDimension;
import com.china.analysis.kv.key.ContactDimension;
import com.china.analysis.kv.key.DateDimension;
import com.china.utils.JDBCCacheBean;
import com.china.utils.JDBCUtil;
import com.china.utils.LRUCache;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
/**
* 维度转换实现类:维度对象转维度id类
*
* @author chenmingjun
* 2019-03-19 22:24
*/
public class DimensionConverterImpl implements DimensionConverter {
// 日志记录类,注意导包的正确性
private static final Logger looger = LoggerFactory.getLogger(DimensionConverterImpl.class); // 打印 DimensionConverterImpl 的日志
// 为每个线程保留自己的 Connection 实例(JDBC连接器)
private ThreadLocal<Connection> threadLocalConnection = new ThreadLocal<>();
// 创建数据缓存队列
private LRUCache<String, Integer> lruCache = new LRUCache<>(3000);
public DimensionConverterImpl() {
looger.info("stopping mysql connection ...");
// 设置 JVM 关闭时,尝试关闭数据库连接资源
Runtime.getRuntime().addShutdownHook(new Thread(() -> JDBCUtil.close(threadLocalConnection.get(), null, null)));
looger.info("mysql connection is successful closed");
}
/**
* 根据传入的维度对象,得到该维度对象对应的在表中的主键id(如果数据量特别大,需要用到缓存)
* 1、内存缓存,LRUCache
* 1.1 缓存中有数据:直接返回id
* 1.2 缓存中没有数据:
* 1.1.1 查询Mysql
* 1.1.1.1 Mysql中有该条数据,直接返回id,将本次读取到的id缓存到内存中
* 1.1.1.2 Mysql中没有该数据,插入该条数据,插入成功后,再次反查该数据,得到id并返回,缓存到内存中
*
* @param baseDimension
* @return
*/
@Override
public int getDimensionId(BaseDimension baseDimension) {
// LRUCache 中缓存数据的格式
// 时间维度:date_dimension_year_month_day,10
// 查询人维度:contact_dimension_telphone_name,12
// 1、根据传入的维度对象取得该维度对象对应的 cacheKey
String cackeKey = genCacheKey(baseDimension);
// 2、判断缓存中是否存在该 cacheKey 缓存,有数据就直接返回id
if (lruCache.containsKey(cackeKey)) {
return lruCache.get(cackeKey);
}
// 3、缓存中没有,就去查询数据库,执行 select 操作
// sqls 中包含了一组sql语句:分别是查询和插入
String[] sqls = null;
if (baseDimension instanceof DateDimension) {
// 时间维度表 tb_dimension_date
sqls = genDateDimensionSQL();
} else if (baseDimension instanceof ContactDimension) {
// 查询人维度表 tb_dimension_contacts
sqls = genContactDimensionSQL();
} else {
// 抛出 Checked 异常,提醒调用者可以自行处理。
throw new RuntimeException("Cannot match the dimession, unknown dimension.");
}
// 4、准备对 MySQL 中的表进行操作,先查询,有可能再插入
Connection conn = this.getConnection();
int id = -1;
synchronized (this) {
id = execSQL(conn, sqls, baseDimension);
}
// 将查询到的id缓存到内存中
lruCache.put(cackeKey, id);
return id;
}
/**
* 尝试获取数据库连接对象:先从线程缓冲中获取,没有可用连接则创建新的单例连接器对象。
*
* @return
*/
private Connection getConnection() {
Connection conn = null;
try {
conn = threadLocalConnection.get();
if (conn == null || conn.isClosed() || conn.isValid(3)) {
conn = JDBCCacheBean.getInstance();
}
threadLocalConnection.set(conn);
} catch (SQLException e) {
e.printStackTrace();
}
return conn;
}
/**
* 执行 SQL 语句
*
* @param conn JDBC 连接器
* @param sqls 长度为2,第一个为查询语句,第二个为插入语句
* @param baseDimension 对应维度所保存的数据
* @return
*/
private int execSQL(Connection conn, String[] sqls, BaseDimension baseDimension) {
PreparedStatement ps = null;
ResultSet rs = null;
try {
// 1、假设数据库中有该条数据
// 封装查询的sql语句
ps = conn.prepareStatement(sqls[0]);
// 根据不同的维度,封装不同维度的sql查询语句
setArguments(ps, baseDimension);
// 执行查询
rs = ps.executeQuery();
if (rs.next()) {
return rs.getInt(1); // 注意:结果集的列的索引从1开始
}
// 2、假设数据库中没有该条数据
// 封装插入的sql语句
ps = conn.prepareStatement(sqls[1]);
// 根据不同的维度,封装不同维度的sql插入语句
setArguments(ps, baseDimension);
// 执行插入
ps.executeUpdate();
// 3、释放资源
JDBCUtil.close(null, ps, rs);
// 4、此时数据库中有该条数据了,重新获取id,调用自己即可
// 封装查询的sql语句
ps = conn.prepareStatement(sqls[0]);
// 根据不同的维度,封装不同维度的sql查询语句
setArguments(ps, baseDimension);
// 执行查询
rs = ps.executeQuery();
if (rs.next()) {
return rs.getInt(1); // 注意:结果集的列的索引从1开始
}
} catch (SQLException e) {
e.printStackTrace();
} finally {
// 释放资源
JDBCUtil.close(null, ps, rs);
}
throw new RuntimeException("Failed to get id!");
}
/**
* 根据不同的维度,封装不同维度的sql语句
*
* @param ps
* @param baseDimension
*/
private void setArguments(PreparedStatement ps, BaseDimension baseDimension) {
int i = 0;
try {
if (baseDimension instanceof DateDimension) {
DateDimension dateDimension = (DateDimension) baseDimension;
ps.setInt( i, dateDimension.getYear());
ps.setInt( i, dateDimension.getMonth());
ps.setInt( i, dateDimension.getDay());
} else if (baseDimension instanceof ContactDimension) {
ContactDimension contactDimension = (ContactDimension) baseDimension;
ps.setString( i, contactDimension.getTelephone());
ps.setString( i, contactDimension.getName());
}
} catch (SQLException e) {
e.printStackTrace();
}
}
/**
* 生成查询人维度表的数据库查询语句和插入语句
*
* @return
*/
private String[] genContactDimensionSQL() {
String query = "SELECT `id` FROM `tb_dimension_contacts` WHERE `telephone`=? AND `name`=? ORDER BY `id`;";
String insert = "INSERT INTO `tb_dimension_contacts` (`telephone`, `name`) VALUES (?, ?);";
return new String[]{query, insert};
}
/**
* 生成时间维度表的数据库查询语句和插入语句
*
* @return
*/
private String[] genDateDimensionSQL() {
String query = "SELECT `id` FROM `tb_dimension_date` WHERE `year`=? AND `month`=? AND `day`=? ORDER BY `id`;";
String insert = "INSERT INTO `tb_dimension_date` (`year`, `month`, `day`) VALUES (?, ?, ?);";
return new String[]{query, insert};
}
/**
* 根据传入的维度对象取得该维度对象对应的 cacheKey
* LRUCACHE 中缓存的键值对形式例如:<date_dimension20170820, 3> 或者 <contact_dimension15837312345张三, 12>
*
* @param baseDimension
* @return
*/
private String genCacheKey(BaseDimension baseDimension) {
StringBuilder sb = new StringBuilder();
if (baseDimension instanceof DateDimension) {
DateDimension dateDimension = (DateDimension) baseDimension;
// 拼装缓存 id 对应的 key
sb.append("date_dimension");
sb.append(dateDimension.getYear()).append(dateDimension.getMonth()).append(dateDimension.getDay());
} else if (baseDimension instanceof ContactDimension) {
ContactDimension contactDimension = (ContactDimension) baseDimension;
// 拼装缓存 id 对应的 key
sb.append("contact_dimension");
sb.append(contactDimension.getTelephone()).append(contactDimension.getName());
}
if (sb.length() <= 0) {
throw new RuntimeException("Cannot create cacheKey." baseDimension);
}
return sb.toString();
}
}
15) 创建类:LRUCache
代码语言:javascript复制package com.china.utils;
import java.util.LinkedHashMap;
import java.util.Map;
/**
* @author chenmingjun
* 2019-03-19 22:59
*/
public class LRUCache<K, V> extends LinkedHashMap<K, V> {
private static final long serialVersionUID = 1L;
protected int maxElements;
public LRUCache(int maxSize) {
super(maxSize, 0.75F, true);
this.maxElements = maxSize;
}
protected boolean removeEldestEntry(Map.Entry<K, V> eldest) {
return this.size() > this.maxElements;
}
}
16) 创建类:Constants(常量类)
代码语言:javascript复制package com.china.constants;
/**
* @author chenmingjun
* 2019-03-19 9:57
*/
public class Constants {
public static final int JDBC_DEFAULT_BATCH_NUMBER = 500;
public static final String SCAN_TABLE_NAME = "ns_ct:calllog";
}
3.3.5、运行测试
0) 将 core-site.xml、hdfs-site.xml、log4j.properties、hbase-site.xml 拷贝到 ctct_analysissrcmainresources 目录下
1) 在 hadoop-env.sh 添加内容:
代码语言:javascript复制[atguigu@hadoop102 hadoop]$ pwd
/opt/module/hadoop-2.7.2/etc/hadoop
[atguigu@hadoop102 hadoop]$ vim hadoop-env.sh
export HADOOP_CLASSPATH=$HADOOP_CLASSPATH:/opt/module/hbase/lib/*
注意
:修改配置后,需要配置分发,然后重启集群,方可生效!!!
注意
:修改配置后,需要配置分发,然后重启集群,方可生效!!!
注意
:修改配置后,需要配置分发,然后重启集群,方可生效!!!
2) 将 mysql 驱动包放入到 /opt/module/flume/job/ct/lib 测试目录下
代码语言:javascript复制[atguigu@hadoop102 ct]$ pwd
/opt/module/flume/job/ct
[atguigu@hadoop102 ct]$ cp -a /opt/software/mysql-libs/mysql-connector-java-5.1.27/mysql-connector-java-5.1.27-bin.jar ./lib/
3) 将要运行的 ct_analysis-1.0-SNAPSHOT.jar 拷贝至 /opt/module/hbase/lib 目录下,然后同步到其他机器或者配置分发
代码语言:javascript复制[atguigu@hadoop102 ~]$ scp -r /opt/module/hbase/lib/ct_analysis-1.0-SNAPSHOT.jar hadoop103:/opt/module/hbase/lib/
[atguigu@hadoop102 ~]$ scp -r /opt/module/hbase/lib/ct_analysis-1.0-SNAPSHOT.jar hadoop104:/opt/module/hbase/lib/
或者
[atguigu@hadoop102 ~]$ xsync /opt/module/hbase/lib/ct_analysis-1.0-SNAPSHOT.jar
4) 提交任务
代码语言:javascript复制[atguigu@hadoop102 ct]$ pwd
/opt/module/flume/job/ct
[atguigu@hadoop102 ct]$ /opt/module/hadoop-2.7.2/bin/yarn jar ./ct_analysis-1.0-SNAPSHOT.jar com.china.analysis.runner.CountDurationRunner -libjars ./lib/mysql-connector-java-5.1.27-bin.jar
5) 观察 Mysql 中的结果:
简单测试下数据:
3.3.6、bug 解决
- 1、Mysql 连接的 URL 中加入了数据库,所以后边的表就不能使用:【数据库.表名】这样的形式了。
- 2、-libjars 这个属性,必须显示的指定到具体的 Mysql 驱动包的位置。
- 3、自己写的代码 ct_analysis.jar 类找不到,原因是因为该 jar 包没有添加到 hadoop 的 classpath 中。
- 解决方案:将该 jar 包 拷贝到 HBase 的 lib 目录下(`注意`:添加 jar 后需要分发并重启 Hbase 集群)。