MapReduce快速入门系列(14) | MapReduce之计数器应用及简单的数据清洗(ETL)

2020-10-28 15:25:12 浏览数 (1)

本次博主分享的是MapReduce的另一进阶知识计数器应用及数据清洗(ETL)。希望大家能够喜欢

一. 计数器应用

  Hadoop为每个作业维护若干内置计数器,以描述多项指标。   比如说,某些计数器记录已处理的字节数和记录数,使用户可监控已处理的输入数据量和已产生的输出数据量。

1.1 计数器API

  • 1. 采用枚举的方式统计计数
代码语言:javascript复制
eunm MyCounter{MALFORORMED,NORMAL}
//对枚举定义的自定义计数器加1
context.getCounter(MyCounter.MALFORORMED).increment(1);
  • 2. 采用计数器组、计数器名称的方式统计
代码语言:javascript复制
context.getCounter("counterGroup","counter").increment(1);
// 组名和计数器名称随便起,但最好有意义。

1.2 计数器案例

通过下面的数据清洗案例分析

二. 简单的数据清洗案例

  在运行核心业务MapReduce程序之前,往往要先对数据进行清洗,清理掉不符合用户要求的数据。清理的过程往往只需要运行Mapper程序,不需要运行Reduce程序。

2.1 需求

去掉日志中字段长度小于等于11的日志。

  • 1. 输入数据
  • 2. 期望输出数据 每行字段长度都大于11。

2.2 需求分析

代码语言:javascript复制
需要在Map阶段对输入的数据根据规则进行过滤清洗。

2.3 代码实现

  • 1. 创建ETLMapper类
代码语言:javascript复制
package com.buwenbuhuo.ETL;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;
/**
 * @author 卜温不火
 * @create 2020-04-25 20:08
 * com.buwenbuhuo.ETL - the name of the target package where the new class or interface will be created.
 * mapreduce0422 - the name of the current project.
 */
public class ETLMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

        // 截取
        String[] fields = value.toString().split(" ");

        // 2 日志长度大于11的为合法
        if (fields.length > 11) {

            // 3 写出数据
            context.write(value, NullWritable.get());

            // 4 系统计数器
            context.getCounter("ETL", "True").increment(1);
        } else {
            context.getCounter("ETL", "False").increment(1);
        }

    }
}
  • 2. 创建ETLDriver类
代码语言:javascript复制
package com.buwenbuhuo.ETL;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;
/**
 * @author 卜温不火
 * @create 2020-04-25 20:08
 * com.buwenbuhuo.ETL - the name of the target package where the new class or interface will be created.
 * mapreduce0422 - the name of the current project.
 */
public class ETLDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

        //  1 获取job信息
        Job job = Job.getInstance(new Configuration());

        // 2 加载jar包
        job.setJarByClass(ETLDriver.class);

        // 3 关联map
        job.setMapperClass(ETLMapper.class);

        // 设置reducetask个数为0
        job.setNumReduceTasks(0);

        // 4 设置最终输出类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(NullWritable.class);

        // 5 设置输入和输出路径
        FileInputFormat.setInputPaths(job, new Path("d:\input"));
        FileOutputFormat.setOutputPath(job, new Path("d:\output"));

        boolean b = job.waitForCompletion(true);
        System.exit(b ? 0 : 1);
    }
}

2.4 运行及结果

  • 1. 运行
  • 2. 结果

0 人点赞