Hadoop的分布式计算系统MapReduce

2022-10-27 16:37:57 浏览数 (2)

一.序列化

在MapReduce中要求被传输的数据能够被序列化 MapReduce中的序列化机制使用的是AVRO,MapReduce对AVRO进行了封装 被传输的类实现Writable接口实现方法即可

二.mapreduce 排序

代码语言:javascript复制
在MapReduce中会自动对被传输的key值进行排序,如果使用一个对象
作为输出键,那么要求对象相对应的类应该实现Comparable接口,考虑到
MapReduce中被传输的对象要求被序列化,所以MapReduce中提供了WritableComparable
接口.
如果ComparaTo方法中返回值为0,则MapReduce在进行计算时会把两个键的值放到
一个迭代器中,输出是第二个key是没有记录的。

mapreduce 分区

代码语言:javascript复制
我们在使用MapReduce对HDFS中的数据进行计算时,有时可能会有分类
输出的场景,MapReduce中提供了Partitioner类,我们在使用时只需继承
该类,然后重写getPartition方法即可,分区编号默认从0开始。
有多少个分区JobTracter就会分配多少个reduceTask。分区数量要在
驱动类中指定,如果不指定分区类与ReduceTask的数量,则使用默认
的HashPartitioner类进行分区,也就是自定义的分区无效。

mapreduce 合并

代码语言:javascript复制
1.合并是减少数据总量并没有改变计算结果 - Combiner(合并)实际上只是
让MapTask进行提前聚合,最后ReduceTask在进行总的聚合.
2.并不是所有的场景都适合于用Combiner,像求和、求最值、去重等可以使用
combiner,但是例如求平均的场景不适合与使用Combiner

inputFormat

inputFormat用来获取切片并创建流来读取数据,如果不自定义InputFormat 则默认使用TextInputFormt按行读取Mapper获取的key值为当前行数的偏移量, 自定义inputFormat类只需要继承FileInputFormat类自定义读取文件的即可.

Code

案例:对HDFS中的case.txt文件按月份对每个人的收益进行降序排序

inputformat类
代码语言:javascript复制
package com.jmy.profitcase;

import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.util.LineReader;

import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.net.URL;

// 泛型表示读完之后给mapper的数据类型
public class CaseInputFormat extends FileInputFormat<Text, Text> {

    @Override
    public RecordReader<Text, Text> createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
        return new CaseReader();
    }
}

class CaseReader extends RecordReader<Text,Text>{

    private LineReader reader;
    private Text key;
    private Text value;

    // 初始化Reader
    // 最终目的就是拿到读取切片的流
    @Override
    public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {

        FileSplit fs = (FileSplit) inputSplit;
        Path path = fs.getPath();

        // 连接HDFS获取切片
        FileSystem fileSystem = FileSystem.get(URI.create(path.toString()), taskAttemptContext.getConfiguration());
        // 打开对应的文件获取输入流
        InputStream in = fileSystem.open(path);
       // 字节流转换为字符流按行读取
        reader = new LineReader(in);
    }

    // 判断有无下一个键值对
    @Override
    public boolean nextKeyValue() throws IOException, InterruptedException {
        key = new Text();
        value = new Text();
        Text tmp = new Text();

        // 将读取到的一行数据传入到这个参数中 返回值为读取到字节数
        if (reader.readLine(tmp) == ) {
            return false;
        }
        key.set(tmp);

        if (reader.readLine(tmp) == ) {
            return false;
        }
        value.set(tmp);

        return true;
    }

    // 获取当前key值
    @Override
    public Text getCurrentKey() throws IOException, InterruptedException {
        return key;
    }

    // 获取当前value值
    @Override
    public Text getCurrentValue() throws IOException, InterruptedException {
        return value;
    }

    // 获取进度
    @Override
    public float getProgress() throws IOException, InterruptedException {
        return ;
    }

    // 关流
    @Override
    public void close() throws IOException {
        if (reader != null)
            reader.close();
    }
}
Case类用来封装读取的文本
代码语言:javascript复制
package com.jmy.profitcase;

import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class Case implements WritableComparable<Case> {
    private int month;

    public int getMonth() {
        return month;
    }

    public void setMonth(int month) {
        this.month = month;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    private String name;
    private int inCome;
    private int outCome;
    private int countCome;

    public int getInCome() {
        return inCome;
    }

    public void setInCome(int inCome) {
        this.inCome = inCome;
    }

    public int getOutCome() {
        return outCome;
    }

    public void setOutCome(int outCome) {
        this.outCome = outCome;
    }

    public int getCountCome() {
        return inCome - outCome;
    }

    public void setCountCome(int countCome) {
        this.countCome = countCome;
    }

    @Override
    public int compareTo(Case o) {
        return o.getCountCome() - this.getCountCome();
    }

    @Override
    public void write(DataOutput dataOutput) throws IOException {
        dataOutput.writeInt(month);
        dataOutput.writeUTF(name);
        dataOutput.writeInt(inCome);
        dataOutput.writeInt(outCome);
        dataOutput.writeInt(countCome);
    }

    @Override
    public void readFields(DataInput dataInput) throws IOException {
        month = dataInput.readInt();
        name = dataInput.readUTF();
        inCome = dataInput.readInt();
        outCome = dataInput.readInt();
        countCome = dataInput.readInt();
    }
}
Mapper类
代码语言:javascript复制
package com.jmy.profitcase;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class CaseMapper extends Mapper<Text,Text,Case,NullWritable> {
    @Override
    protected void map(Text key, Text value, Context context) throws IOException, InterruptedException {
        String[] s = key.toString().split(" ");
        String[] s1 = value.toString().split(" ");
        Case aCase = new Case();
        aCase.setMonth(Integer.parseInt(s[]));
        aCase.setName(s[]);
        aCase.setInCome(Integer.parseInt(s1[]));
        aCase.setOutCome(Integer.parseInt(s1[]));

        context.write(aCase,NullWritable.get());
    }
}
Reducer类
代码语言:javascript复制
package com.jmy.profitcase;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class CaseReducer extends Reducer<Case, NullWritable,Text, IntWritable> {
    @Override
    protected void reduce(Case key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
        context.write(new Text(key.getName()),new IntWritable(key.getCountCome()));
    }
}
Driver类
代码语言:javascript复制
package com.jmy.profitcase;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class CaseDriver {

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        // 获取job 提交给JobTracter
        Job job = Job.getInstance();

        // 入口类
        job.setJarByClass(CaseDriver.class);
        // mapper类
        job.setMapperClass(CaseMapper.class);
        // reducer类
        job.setReducerClass(CaseReducer.class);
        // inputformat类
        job.setInputFormatClass(CaseInputFormat.class);
        // partitioner类 reduceTask数量
        job.setPartitionerClass(CasePartition.class);
        job.setNumReduceTasks();

        // mapper
        job.setMapOutputKeyClass(Case.class);
        job.setMapOutputValueClass(NullWritable.class);
        // reducer
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        // 处理文件路径
        FileInputFormat.addInputPath(job,new Path("hdfs://10.42.99.103:9000/case.txt"));
        // 文件输出路径
        FileOutputFormat.setOutputPath(job,new Path("hdfs://10.42.99.103:9000/caseResult"));

        // 启动
        job.waitForCompletion(true);
    }
}
case.txt
代码语言:javascript复制
1 ls 
2850 100
2 ls
3566 200
3 ls 
4555 323
1 zs 
19000 2000
2 zs 
28599 3900
3 zs 
34567 5000
1 ww 
355 10
2 ww 
555 222
3 ww 
667 192

数据本地化策略

代码语言:javascript复制
1.job会被提交到JobTracker,JobTracker会访问HDFS中的namenode,
获取Block的存储位置以及大小.
2.JobTracker收到文件信息之后会对文件进行切片,默认Block的大小
就是切片的大小,切片的数量决定了mapTask的数量。
3.JobTacker计算完mapTask与ReduceTask的数量之后会把任务提交给TaskTracker
,为了减少集群间节点间的访问,TaskTracker会与datanode部署在同一个节点上
4. JobTracker在分配任务的时候,会尽量将任务分配给有数据的节点
  • 如果是空文件,则整个文件作为一个切片处理
  • 在MapReduce中,文件有可切和不可切的区分。在MapReduce中,默认文件是可切的,但是有些文件处理的时候不能切分,这个时候需要手动设置为不可切,例如压缩包
  • 如果文件不可切,则整个文件作为一个切片处理
  • 计算切片大小的公式为Math.min(minSize,Math.max(spilteSize,maxSize))
  • 在进行切片计算的时候底层有一个阈值为1.1

Job任务提交流程

代码语言:javascript复制
1.客户端将任务提交给JobTracker:hadoop jar ***.jar
2.准备阶段:
  a.检查输入路径是否存在,输出路径是否不存在
  b.计算切片数量以及分区
  c.如果有需要,可以设置分布式缓存存根账户
  d.将jar包提交到HDFS上
  e.将任务提交到JobTracker上
3. 提交阶段
a. JobTracker会计算MapTask的数量和ReduceTask的数量。
MapTask的数量由切片数量决定,ReduceTask的数量由分区数量决定
b. JobTracker在划分好之后,会等待TaskTracker的心跳。
当收到TaskTracker的心跳的时候,JobTracker就会将MapTask或者
ReduceTask分配给TaskTracker。在分配的时候,MapTask要尽量满足
数据本地化策略;ReduceTask尽量分配到相对空闲的节点上
c. TaskTracker在领取到任务之后,去连接HDFS下载对应的jar包
体现的逻辑移动数据固定的思想
d. TaskTracker会在本节点上开启一个JVM子进程执行MapTask或者
ReduceTask。每一个MapTask或者ReduceTask的执行都会开启一个JVM
子进程

Shuffle

一、Map端的Shuffle
代码语言:javascript复制
1. Mapper中的map方法在处理完一行数据之后,会将数据写出到缓冲区中
2. 数据在缓冲区中进行分区、排序,如果指定了Combiner,那么数据在缓冲区中还会进行combine合并 - 采取了快速排序的方式
3. 每一个MapTask自带一个缓冲区,缓冲区本质上是一个环形的字节数组。设置为环形的优势在于能够重复利用缓冲区而不用寻址
4. 缓冲区是维系在内存中,缓冲区的默认容量是100M
5. 缓冲区的容量使用达到一定限度(溢写阈值:0.8,目的是为了避免MapTask写出结果的时候产生大量的阻塞)的时候,MapTask会将缓冲区中的数据溢写(spill)到磁盘上,后续的数据可以继续写到缓冲区中
6. 每一次溢写都会产生一个新的溢写文件。单个溢写文件中的数据是分区且排序的,但是所有的溢写文件中的数据是局部有序整体无序
7. 当MapTask将所有数据都处理完成之后,会将所有的溢写文件合并(merge)成一个结果文件(final out)。如果一部分结果在缓冲区中,一部分结果在溢写文件中,这个时候所有的结果会直接合并到最后的final out中。如果没有产生溢写过程,则缓冲区中的数据直接冲刷到final out中
8. 在merge过程中,数据会再次进行分区和排序,所以final out是整体分区且有序。这个过程中的排序使用的是归并排序。如果指定了Combiner,并且溢写文件的个数大于等于3个,那么在merge过程中自动进行combine
二、Reduce端的Shuffle
代码语言:javascript复制
1. 每一个ReduceTask启动fetch线程通过get请求去抓取数据
2. 在抓取数据的时候,每一个ReduceTask只抓取当前分区的数据。在抓取到数据的之后,会将数据存储在本地的磁盘上
3. 在抓取完成之后,ReduceTask会将这些小文件进行merge,合并成一个大文件。在合并过程中,数据会再次进行排序,采取的是归并排序
4. 合并完成之后,会将相同的键对应的值放到一个迭代器中,这个过程称之为分组(group),形成的结构就是一个键对应一个迭代器每一个键触发一次reduce方法

0 人点赞