从HBASE读取清洗过的数据,写入到mysql的表中
NewInstallUserRunner.java
计算新增用户入口类
NewInstallUserRunner的所有属性方法
main方法:
代码语言:javascript复制public static void main(String[] args)
只有一个方法ToolRunner.run
代码语言:javascript复制org.apache.hadoop.util.ToolRunner.run(new Configuration(), new NewInstallUserRunner(), args);
入口类implements Tool接口
代码语言:javascript复制public class NewInstallUserRunner implements Tool
Tool定义run方法
代码语言:javascript复制int run(String [] args) throws Exception;
Tool 继承Configurable
代码语言:javascript复制public interface Tool extends Configurable
Configurable定义两个方法
代码语言:javascript复制/** Set the configuration to be used by this object. */
void setConf(Configuration conf);
/** Return the configuration used by this object. */
Configuration getConf();
所以入口类需要实现3个方法
代码语言:javascript复制int run(String [] args) throws Exception;
void setConf(Configuration conf);
Configuration getConf();
setConf方法实现
代码语言:javascript复制@Override
public void setConf(Configuration conf) {
conf.addResource("output-collector.xml");
conf.addResource("query-mapping.xml");
conf.addResource("transformer-env.xml");
conf.set("fs.defaultFS", "hdfs://master:8020");
conf.set("yarn.resourcemanager.hostname", "master");
conf.set("hbase.zookeeper.quorum", "master");
this.conf = HBaseConfiguration.create(conf);
}
定义output-collector的类,反射用的
代码语言:javascript复制output-collector.xml
<property>
<name>collector_new_install_user</name>
<value>com.sxt.transformer.mr.nu.StatsUserNewInstallUserCollector</value>
</property>
输出到mysql的时候,组成insert语句的
代码语言:javascript复制query-mapping.xml
<property>
<name>new_install_user</name>
<value>
INSERT INTO `stats_user`(
`platform_dimension_id`,
`date_dimension_id`,
`new_install_users`,
`created`)
VALUES(?, ?, ?, ?) ON DUPLICATE KEY UPDATE `new_install_users` = ?
</value>
</property>
输出到mysql的链接信息
代码语言:javascript复制transformer-env.xml
getConf方法实现
代码语言:javascript复制@Override
public Configuration getConf() {
return this.conf;
}
run方法的实现
代码语言:javascript复制public int run(String[] args) throws Exception {
Configuration conf = this.getConf();
// 处理参数
this.processArgs(conf, args);
// Job job = Job.getInstance(conf, "new_install_user");
System.out.println("KpiType.NEW_INSTALL_USER.toString()===="
KpiType.NEW_INSTALL_USER.name);
Job job = Job.getInstance(conf, KpiType.NEW_INSTALL_USER.name);
// job.setJarByClass(NewInstallUserRunner.class);
// 本地运行
TableMapReduceUtil.initTableMapperJob(initScans(job),
NewInstallUserMapper.class, StatsUserDimension.class,
TimeOutputValue.class, job, false);
// 集群运行:本地提交和打包(jar)提交
// TableMapReduceUtil.initTableMapperJob(initScans(job),
// NewInstallUserMapper.class, StatsUserDimension.class,
// TimeOutputValue.class, job);
job.setReducerClass(NewInstallUserReducer.class);
job.setOutputKeyClass(StatsUserDimension.class);
job.setOutputValueClass(MapWritableValue.class);
// job.setInputFormatClass(KeyValueTextInputFormat.class);
job.setOutputFormatClass(TransformerOutputFormat.class);
if (job.waitForCompletion(true)) {
// 执行成功, 需要计算总用户
this.calculateTotalUsers(conf);
return 0;
} else {
return -1;
}
}
run方法的第一条命令processArgs处理参数
代码语言:javascript复制processArgs调用处理参数的方法
this.processArgs(conf, args);
processArgs方法解读
run方法第一条 this.processArgs(conf, args); 执行结束,
返回结果默认是昨天,或者运行时加入-d yyyy-DD-mm 格式输入的日期。
run方法的第二条命令
代码语言:javascript复制run方法的第二条语句
Job job = Job.getInstance(conf, "new_install_user");
是不是应该这么写更好?
Job job = Job.getInstance(conf, KpiType.NEW_INSTALL_USER.name);
参数中用到的枚举方法
代码语言:javascript复制public enum KpiType {
NEW_INSTALL_USER("new_install_user"), // 统计新用户的kpi
BROWSER_NEW_INSTALL_USER("browser_new_install_user"), // 统计浏览器维度的新用户kpi
ACTIVE_USER("active_user"), // 统计活跃用户kpi
BROWSER_ACTIVE_USER("browser_active_user"), // 统计浏览器维度的活跃用户kpi
;
run方法的第三条命令TableMapReduceUtil.initTableMapperJob设置job执行参数
代码语言:javascript复制TableMapReduceUtil.initTableMapperJob(initScans(job),
NewInstallUserMapper.class,
StatsUserDimension.class,
TimeOutputValue.class, job,
false);
addDependencyJars 如果在服务器运行,需要设置为true 如果在本地运行 设置成false
代码语言:javascript复制 /**
* Use this before submitting a Multi TableMap job.
* It will appropriately setup the job.
*
* @param scans The list of {@link Scan} objects to read from.
* @param mapper The mapper class to use.
* @param outputKeyClass The class of the output key.
* @param outputValueClass The class of the output value.
* @param job The current job to adjust. Make sure the passed job is carrying
* all necessary HBase configuration.
* @param addDependencyJars upload HBase jars and jars for any of the
* configured job classes via the distributed cache (tmpjars).
* @throws IOException When setting up the details fails.
*/
public static void initTableMapperJob(List<Scan> scans,
Class<? extends TableMapper> mapper,
Class<? extends WritableComparable> outputKeyClass,
Class<? extends Writable> outputValueClass, Job job,
boolean addDependencyJars) throws IOException {
initTableMapperJob(scans, mapper, outputKeyClass, outputValueClass, job,
addDependencyJars, true);
}
第一个参数initScans(job)
http://www.jianshu.com/p/fd25d036d4dc
第二个参数NewInstallUserMapper.class
NewInstallUserMapper.class
http://www.jianshu.com/p/1493de43dbf7
第三、四个参数是输出的K,V
run方法的下一条命令job.setReducerClass
job.setReducerClass(NewInstallUserReducer.class); NewInstallUserReducer.class
run方法的下一条命令job.setOutputFormatClass
job.setOutputFormatClass(TransformerOutputFormat.class); TransformerOutputFormat.class
run方法的下一个分支执行MR成功后,执行计算总用户
代码语言:javascript复制if (job.waitForCompletion(true)) {
// 执行成功, 需要计算总用户
this.calculateTotalUsers(conf);
return 0;
} else {
return -1;
}
代码语言:javascript复制private void calculateTotalUsers(Configuration conf)