几个关于MapReduce的小例子

2022-03-15 14:02:42 浏览数 (1)

文章已收录到我的Github精选,欢迎Star:https://github.com/yehongzhi/learningSummary

写在前面

上一篇文章通过写一个WordCount学习了MapReduce的入门操作,那么这篇文章继续通过多一些例子来学习MapReduce。下面介绍几种比较常见的操作:排序,去重,求和,求平均数,TopK查询(查询排名前K名的记录)

排序

其实MapReduce会默认对Key进行升序自然排序,这显然是远远不够用的,下面我举个例子,输入的file1内容如下:

代码语言:javascript复制
1,256
1,12
3,283
4,478
2,1001
2,3600
1,4
5,78
2,33

file2内容如下:

代码语言:javascript复制
5,10
3,598
4,654
1,741
2,123
3,850
2,11568
1,12574

我们要的结果是根据第一个数字进行排序,如果第一个数字相同,则根据第二个数字排序,怎么玩呢?

首先我们得创建一个自定义的类,里面包括两个字段表示一行里面的第一个值和第二个值,接着实现序列化,反序列化方法,最重要是比较方法。

代码语言:javascript复制
import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

/**
 * @author Ye Hongzhi 公众号:java技术爱好者
 * @name ComparableKey
 * @date 2022-01-13 23:58
 **/
public class ComparableKey implements WritableComparable<ComparableKey> {
 //一行内容的第一个值
    private long firstNum;
 //第二个值
    private long secondNum;

    public ComparableKey() {
    }

    public ComparableKey(long firstNum, long secondNum) {
        this.firstNum = firstNum;
        this.secondNum = secondNum;
    }

    public long getFirstNum() {
        return firstNum;
    }

    public void setFirstNum(long firstNum) {
        this.firstNum = firstNum;
    }

    public long getSecondNum() {
        return secondNum;
    }

    public void setSecondNum(long secondNum) {
        this.secondNum = secondNum;
    }

    @Override
    public int compareTo(ComparableKey otherComparableKey) {
        //如果第一位数相等,则比较第二位数,从小到大排序
        if (firstNum == otherComparableKey.getFirstNum()) {
            //返回大于0的数表示前面的大于后面的,小于0则表示前面的数小于后面的数
            return (int) (secondNum - otherComparableKey.getSecondNum());
        } else {
            //返回大于0的数表示前面的大于后面的,小于0则表示前面的数小于后面的数
            return (int) (firstNum - otherComparableKey.getFirstNum());
        }
    }

    //序列化
    @Override
    public void write(DataOutput dataOutput) throws IOException {
        dataOutput.writeLong(firstNum);
        dataOutput.writeLong(secondNum);
    }

    //反序列化
    @Override
    public void readFields(DataInput dataInput) throws IOException {
        firstNum = dataInput.readLong();
        secondNum = dataInput.readLong();
    }
}

接着写Mapper,输入类型是Text,转换为自定义的ComparableKey类型,会自动调compareTo()方法进行比较排序。

代码语言:javascript复制
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

/**
 * @author Ye Hongzhi 公众号:java技术爱好者
 * @name NumberSortMapper
 * @date 2022-01-11 23:56
 **/

/**
 * Mapper有四个泛型参数需要填写
 * 第一个参数KEYIN:默认情况下,是mr框架所读到的一行文本的起始偏移量,类型为LongWritable
 * 第二个参数VALUEIN:默认情况下,是mr框架所读的一行文本的内容,类型为Text
 * 第三个参数KEYOUT:是逻辑处理完成之后输出数据的key,使用自定义的类型ComparableKey
 * 第四个参数VALUEOUT:是逻辑处理完成之后输出数据的value,在此处是次数,类型为NullWritable
 */
public class NumberSortMapper extends Mapper<LongWritable, Text, ComparableKey, NullWritable> {

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String[] strings = value.toString().split(",");
        long firstNum = Long.parseLong(strings[0]);
        long secondNum = Long.parseLong(strings[1]);
        ComparableKey comparableKey = new ComparableKey(firstNum, secondNum);
        context.write(comparableKey, NullWritable.get());
    }
}

Mapper已经做了排序,那么Reduce层就只需要取出来就行了。

代码语言:javascript复制
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

/**
 * @author Ye Hongzhi 公众号:java技术爱好者
 * @name NumberSortReduce
 * @date 2022-01-14 00:18
 **/
public class NumberSortReduce extends Reducer<ComparableKey, NullWritable, LongWritable, LongWritable> {

    @Override
    protected void reduce(ComparableKey key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
        context.write(new LongWritable(key.getFirstNum()), new LongWritable(key.getSecondNum()));
    }
}

最后再写个Main方法,作为入口:

代码语言:javascript复制
public class NumberSort {

    public static void main(String[] args) throws Exception {
        Configuration configuration = new Configuration();
        Job job = Job.getInstance(configuration);
        job.setJarByClass(NumberSort.class);
        job.setMapperClass(NumberSortMapper.class);
        job.setReducerClass(NumberSortReduce.class);
        job.setMapOutputKeyClass(ComparableKey.class);
        job.setMapOutputValueClass(NullWritable.class);

        job.setOutputKeyClass(LongWritable.class);
        job.setOutputValueClass(LongWritable.class);
        //指定job 的输入文件所在的目录
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        // 指定job 的输出结果所在的目录
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        boolean completion = job.waitForCompletion(true);
        System.exit(completion ? 0 : 1);
    }
}

接着把实验的文件上传上去hadoop的number_sort文件夹(自己创建的目录)。然后再执行任务,使用命令:

代码语言:javascript复制
hadoop jar /usr/local/hadoop-3.2.2/jar/hadooptest-1.0-SNAPSHOT.jar NumberSort number_sort number_sort_output

执行成功后,效果如下:

代码语言:javascript复制
1 4
1 12
1 256
1 741
1 12574
2 33
2 123
2 1001
2 3600
2 11568
3 283
3 598
3 850
4 478
4 654
5 10
5 78

去重

比如以下的这个文本,单词去重,怎么做呢?

代码语言:javascript复制
hadoop is good
hadoop is so good
java is great
java and hadoop is very good

其实很简单,因为MapReduce输出的类型就是Map,Map的特性就是Key不能重复,于是乎我们可以把值想要去重的值放入Key,Value设置为NULL就完事了。Mapper步骤如下:

代码语言:javascript复制
public class DistinctMapper extends Mapper<LongWritable, Text, Text, NullWritable> {

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        Text keyOut = new Text();
        String[] strings = value.toString().split(" ");
        for (String str : strings) {
            keyOut.set(str);
            context.write(keyOut,NullWritable.get());
        }
    }
}

Reduce步骤不需要做其他操作,直接取值即可。

代码语言:javascript复制
public class DistinctReduce extends Reducer<Text, NullWritable, Text, NullWritable> {

    @Override
    protected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
        context.write(key, NullWritable.get());
    }
}

再加个入口Main方法。

代码语言:javascript复制
public class DistinctMain {
    public static void main(String[] args) throws Exception{
        Configuration configuration = new Configuration();
        Job job = Job.getInstance(configuration);
        job.setJarByClass(DistinctMain.class);
        job.setMapperClass(DistinctMapper.class);
        job.setReducerClass(DistinctReduce.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(NullWritable.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(NullWritable.class);
        //指定job 的输入文件所在的目录
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        // 指定job 的输出结果所在的目录
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        boolean completion = job.waitForCompletion(true);
        System.exit(completion ? 0 : 1);
    }
}

把测试数据上传到hadoop上面。

然后执行命令如下:

代码语言:javascript复制
hadoop jar /usr/local/hadoop-3.2.2/jar/hadooptest-1.0-SNAPSHOT.jar DistinctMain distinct distinct_output

输出结果如下:

代码语言:javascript复制
and
good
great
hadoop
is
java
so
very

去重完成。

求和

比如有一道很经典的数学题,对1到100进行求和,如果用笔算很简单,可以用首尾相加法,1加99,2加98...以此类推。但是用MapReduce怎么做呢?

代码语言:javascript复制
1
2
3
4
...
98
99
100

我们需要使用cleanup()方法,这个方法是在map方法执行完之后执行,只执行一次,看源码就明白了。

代码语言:javascript复制
//一般是啥事都不干,子类可以实现该方法做一些自己的事情
protected void cleanup(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException {
}

public void run(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException {
    this.setup(context);
    try {
        while(context.nextKeyValue()) {
            this.map(context.getCurrentKey(), context.getCurrentValue(), context);
        }
    } finally {
        //执行完map方法后,执行cleanup()方法
        this.cleanup(context);
    }
}

那么问题就很简单了,Mapper实现代码如下:

代码语言:javascript复制
public class SumMapper extends Mapper<LongWritable, Text, LongWritable, NullWritable> {
    private long sum = 0L;
    
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        long val = Long.parseLong(value.toString());
        sum  = val;
    }

    @Override
    protected void cleanup(Context context) throws IOException, InterruptedException {
        context.write(new LongWritable(sum), NullWritable.get());
    }
}

Reduce实现代码如下:

代码语言:javascript复制
public class SumReduce extends Reducer<LongWritable, NullWritable, LongWritable, NullWritable> {

    private long sum = 0L;

    @Override
    protected void reduce(LongWritable key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
        sum  = key.get();
    }

    @Override
    protected void cleanup(Context context) throws IOException, InterruptedException {
        context.write(new LongWritable(sum), NullWritable.get());
    }
}

Main方法入口:

代码语言:javascript复制
public class SumMain {
    public static void main(String[] args) throws Exception {
        Configuration configuration = new Configuration();
        Job job = Job.getInstance(configuration);
        job.setJarByClass(SumMain.class);
        job.setMapperClass(SumMapper.class);
        job.setReducerClass(SumReduce.class);
        job.setMapOutputKeyClass(LongWritable.class);
        job.setMapOutputValueClass(NullWritable.class);

        job.setOutputKeyClass(LongWritable.class);
        job.setOutputValueClass(NullWritable.class);
        //指定job 的输入文件所在的目录
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        // 指定job 的输出结果所在的目录
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        boolean completion = job.waitForCompletion(true);
        System.exit(completion ? 0 : 1);
    }
}

打包成jar包,上传到服务器,然后把包含1到100文本上传到HDFS,执行命令跑任务:

代码语言:javascript复制
hadoop jar /usr/local/hadoop-3.2.2/jar/hadooptest-1.0-SNAPSHOT.jar SumMain sum_main.txt sum_main_out

输出结果如下:

代码语言:javascript复制
5050

求平均数

求平均数也是很常见的操作,比如有一大堆随机生成的数字,求出平均数:

代码语言:javascript复制
10
25
22
78
119
88
56
32
29
25

求平均数的思路其实就是总和除以个数,所以Mapper阶段的输出就是<key,1>,代码如下:

代码语言:javascript复制
public class AverageMapper extends Mapper<LongWritable, Text, LongWritable, IntWritable> {

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        context.write(new LongWritable(Long.parseLong(value.toString())), new IntWritable(1));
    }
}

第二步Reduce步骤就利用cleanup()计算平均数,计算前先计数,求和,代码如下:

代码语言:javascript复制
public class AverageReduce extends Reducer<LongWritable, IntWritable, Text, NullWritable> {

    private long sum = 0L;

    private long count = 0L;

    @Override
    protected void reduce(LongWritable key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        int i = 0;
        for (IntWritable value : values) {
            i  = value.get();
        }
        sum  = (key.get() * i);
        count  = i;
    }

    @Override
    protected void cleanup(Context context) throws IOException, InterruptedException {
        BigDecimal sumBigDecimal = new BigDecimal(sum);
        BigDecimal countBigDecimal = new BigDecimal(count);
        BigDecimal result = sumBigDecimal.divide(countBigDecimal, 2, RoundingMode.HALF_UP);
        context.write(new Text(result.toString()), NullWritable.get());
    }
}

入口Main类如下:

代码语言:javascript复制
public class AverageMain {
    public static void main(String[] args) throws Exception {
        Configuration configuration = new Configuration();
        Job job = Job.getInstance(configuration);
        job.setJarByClass(AverageMain.class);
        job.setMapperClass(AverageMapper.class);
        job.setReducerClass(AverageReduce.class);
        job.setMapOutputKeyClass(LongWritable.class);
        job.setMapOutputValueClass(IntWritable.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(NullWritable.class);
        //指定job 的输入文件所在的目录
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        // 指定job 的输出结果所在的目录
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        boolean completion = job.waitForCompletion(true);
        System.exit(completion ? 0 : 1);
    }
}

接着还是老套路,打包上传jar包和测试用的文件,接着执行命令跑任务:

代码语言:javascript复制
hadoop jar /usr/local/hadoop-3.2.2/jar/hadooptest-1.0-SNAPSHOT.jar AverageMain average_main.txt average_main_out

输出结果如下:

代码语言:javascript复制
48.40

TopK查询

假设下面的文本,是单词以及单词出现的次数,要找出出现次数TOP5的单词,怎么做呢?

代码语言:javascript复制
c   12
redis 45
java 120
Python 50
JavaScript 41
GoLang 30
Spring 30
Mybatis 11
Hibernate 6
RabbitMQ 64
Kafka 78
Nacos 46
SpringCloud 32
MySQL 100
UML 12
Seata 22
ZooKeeper 38

这里我们可以借用TreeMap这个集合的特性,put进treeMap之后会默认从小到大自然排序,然后还提供倒序排序的方法descendingMap()

我写段代码示例一下吧:

代码语言:javascript复制
public static void main(String[] args) {
    TreeMap<Integer, String> treeMap = new TreeMap<>();
    Random random = new Random();
    for (int i = 0; i < 100; i  ) {
        //生成随机数
        int num = random.nextInt(100);
        //插入到treeMap
        treeMap.put(num, num   "");
    }
    for (Integer num : treeMap.keySet()) {
        System.out.println(num);
    }
}

//打印结果
0
2
3
6
8
10
11
12
14
15
...

于是乎我们可以开始写代码,先写Mapper类,比较简单,就是按空格分割一下,然后输出到Reduce。

代码语言:javascript复制
public class TopMapper extends Mapper<LongWritable, Text, LongWritable, Text> {

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String[] split = value.toString().split(" ");
        String word = split[0];
        long count = Long.parseLong(split[1]);
        context.write(new LongWritable(count), new Text(word));
    }

}

输出到Reduce之后,Reduce这边就需要收集,然后做一些处理,代码如下:

代码语言:javascript复制
public class TopReduce extends Reducer<LongWritable, Text, Text, NullWritable> {

    private TreeMap<Long, String> treeMap = new TreeMap<>();

    private static final long TOP_K = 5;

    @Override
    protected void reduce(LongWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
        StringBuilder sb = new StringBuilder();
        for (Text value : values) {
            sb.append(value.toString()).append("、");
        }
        //去掉最后一个顿号
        sb.deleteCharAt(sb.lastIndexOf("、"));
        treeMap.put(key.get(), sb.toString());
        //如果大于最大长度,则删掉第一个元素,因为第一个元素是最小的
        if (treeMap.size() > TOP_K) {
            treeMap.remove(treeMap.firstKey());
        }
    }

    @Override
    protected void cleanup(Context context) throws IOException, InterruptedException {
        //倒序
        Map<Long, String> navigableMap = treeMap.descendingMap();
        //排名
        int i = 1;
        String s;
        for (Map.Entry<Long, String> entry : navigableMap.entrySet()) {
            s = "排名第"   i   "位 "   entry.getValue()   "出现次数"   entry.getKey()   "次";
            context.write(new Text(s), NullWritable.get());
            i  ;
        }
    }
    
}

最后再整个入口类Main。

代码语言:javascript复制
public class TopMain {
    public static void main(String[] args) throws Exception{
        Configuration configuration = new Configuration();
        Job job = Job.getInstance(configuration);
        job.setJarByClass(TopMain.class);
        job.setMapperClass(TopMapper.class);
        job.setReducerClass(TopReduce.class);
        job.setMapOutputKeyClass(LongWritable.class);
        job.setMapOutputValueClass(Text.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(NullWritable.class);
        //指定job 的输入文件所在的目录
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        // 指定job 的输出结果所在的目录
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        boolean completion = job.waitForCompletion(true);
        System.exit(completion ? 0 : 1);
    }
}

大功告成,然后打包上服务器,并且把测试用的文件也上传到服务器,接着执行命令跑任务:

代码语言:javascript复制
hadoop jar /usr/local/hadoop-3.2.2/jar/hadooptest-1.0-SNAPSHOT.jar TopMain top_k_main.txt top_k_main_out

输出结果如下:

代码语言:javascript复制
排名第1位 java出现次数120次
排名第2位 MySQL出现次数100次
排名第3位 Kafka出现次数78次
排名第4位 RabbitMQ出现次数64次
排名第5位 Python出现次数50次

总结

这篇文章主要介绍了排序,去重,求和,求平均数,TopK查询的小例子,可以加深一下对MapReduce的理解,这篇文章就讲到这里了,希望对大家有所帮助。

觉得有用就点个赞吧,你的点赞是我创作的最大动力~

我是一个努力让大家记住的程序员。我们下期再见!!!

能力有限,如果有什么错误或者不当之处,请大家批评指正,一起学习交流!

0 人点赞