本次博主分享的是MapReduce的另一进阶知识计数器应用及数据清洗(ETL)。希望大家能够喜欢
一. 计数器应用
Hadoop为每个作业维护若干内置计数器,以描述多项指标。 比如说,某些计数器记录已处理的字节数和记录数,使用户可监控已处理的输入数据量和已产生的输出数据量。
1.1 计数器API
- 1. 采用枚举的方式统计计数
eunm MyCounter{MALFORORMED,NORMAL}
//对枚举定义的自定义计数器加1
context.getCounter(MyCounter.MALFORORMED).increment(1);
- 2. 采用计数器组、计数器名称的方式统计
context.getCounter("counterGroup","counter").increment(1);
// 组名和计数器名称随便起,但最好有意义。
1.2 计数器案例
通过下面的数据清洗案例分析
二. 简单的数据清洗案例
在运行核心业务MapReduce程序之前,往往要先对数据进行清洗,清理掉不符合用户要求的数据。清理的过程往往只需要运行Mapper程序,不需要运行Reduce程序。
2.1 需求
去掉日志中字段长度小于等于11的日志。
- 1. 输入数据
- 2. 期望输出数据 每行字段长度都大于11。
2.2 需求分析
代码语言:javascript复制需要在Map阶段对输入的数据根据规则进行过滤清洗。
2.3 代码实现
- 1. 创建ETLMapper类
package com.buwenbuhuo.ETL;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
/**
* @author 卜温不火
* @create 2020-04-25 20:08
* com.buwenbuhuo.ETL - the name of the target package where the new class or interface will be created.
* mapreduce0422 - the name of the current project.
*/
public class ETLMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 截取
String[] fields = value.toString().split(" ");
// 2 日志长度大于11的为合法
if (fields.length > 11) {
// 3 写出数据
context.write(value, NullWritable.get());
// 4 系统计数器
context.getCounter("ETL", "True").increment(1);
} else {
context.getCounter("ETL", "False").increment(1);
}
}
}
- 2. 创建ETLDriver类
package com.buwenbuhuo.ETL;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
/**
* @author 卜温不火
* @create 2020-04-25 20:08
* com.buwenbuhuo.ETL - the name of the target package where the new class or interface will be created.
* mapreduce0422 - the name of the current project.
*/
public class ETLDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
// 1 获取job信息
Job job = Job.getInstance(new Configuration());
// 2 加载jar包
job.setJarByClass(ETLDriver.class);
// 3 关联map
job.setMapperClass(ETLMapper.class);
// 设置reducetask个数为0
job.setNumReduceTasks(0);
// 4 设置最终输出类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(NullWritable.class);
// 5 设置输入和输出路径
FileInputFormat.setInputPaths(job, new Path("d:\input"));
FileOutputFormat.setOutputPath(job, new Path("d:\output"));
boolean b = job.waitForCompletion(true);
System.exit(b ? 0 : 1);
}
}
2.4 运行及结果
- 1. 运行
- 2. 结果