切片机制
一个超大文件在HDFS上存储时,是以多个Block存储在不同的节点上,比如一个512M的文件,HDFS默认一个Block为128M,那么1G的文件分成4个Block存储在集群中4个节点上。
Hadoop在map阶段处理上述512M的大文件时分成几个MapTask进行处理呢?Hadoop的MapTask并行度与数据切片有有关系,数据切片是对输入的文件在逻辑上进行分片,对文件切成多少份,Hadoop就会分配多少个MapTask任务进行并行执行该文件,原理如下图所示。
Block与Splite区别:Block是HDFS物理上把数据分成一块一块;数据切片只是在逻辑上对输入进行分片,并不会在磁盘上将其切分成片进行存储。如下图所示,一个512M的文件在HDFS上存储时,默认一个block为128M,那么该文件需要4个block进行物理存储;若对该文件进行切片,假设以100M大小进行切片,该文件在逻辑上需要切成5片,则需要5个MapTask任务进行处理。
一、数据切片源码详解
代码语言:javascript复制 /**
* Generate the list of files and make them into FileSplits.
* @param job the job context
* @throws IOException
*/
public List<InputSplit> getSplits(JobContext job) throws IOException {
StopWatch sw = new StopWatch().start();
/*
* 1、minSize默认最小值为1
* maxSize默认最大值为9,223,372,036,854,775,807
* */
long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
long maxSize = getMaxSplitSize(job);
// generate splits
List<InputSplit> splits = new ArrayList<InputSplit>();
/*
* 2、获取所有需要处理的文件
* */
List<FileStatus> files = listStatus(job);
for (FileStatus file: files) {
Path path = file.getPath();
/*
* 3、获取文件的大小
* */
long length = file.getLen();
if (length != 0) {
BlockLocation[] blkLocations;
if (file instanceof LocatedFileStatus) {
/*
* 4、获取文件的block,比如一个500M的文件,默认一个Block为128M,500M的文件会分布在4个DataNode节点上进行存储
* */
blkLocations = ((LocatedFileStatus) file).getBlockLocations();
} else {
/*
* 5、Hadoop如不特殊指定,默认用的HDFS文件系统,只会走上面if分支
* */
FileSystem fs = path.getFileSystem(job.getConfiguration());
blkLocations = fs.getFileBlockLocations(file, 0, length);
}
if (isSplitable(job, path)) {
/*
* 6、获取Block块的大小,默认为128M
* */
long blockSize = file.getBlockSize();
/*
* 7、计算spliteSize分片的尺寸,首先取blockSize与maxSize之间的最小值即blockSize,
* 然后取blockSize与minSize之间的最大值,即为blockSize=128M,所以分片尺寸默认为128M
* */
long splitSize = computeSplitSize(blockSize, minSize, maxSize);
long bytesRemaining = length;
/*
* 8、计算分片file文件可以在逻辑上划分为多少个数据切片,并把切片信息加入到List集合中
* */
while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
splits.add(makeSplit(path, length-bytesRemaining, splitSize,
blkLocations[blkIndex].getHosts(),
blkLocations[blkIndex].getCachedHosts()));
bytesRemaining -= splitSize;
}
/*
* 9、如果文件最后一个切片不满128M,单独切分到一个数据切片中
* */
if (bytesRemaining != 0) {
int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
blkLocations[blkIndex].getHosts(),
blkLocations[blkIndex].getCachedHosts()));
}
} else { // not splitable
/*
* 10、如果文件不可以切分,比如压缩文件,会创建一个数据切片
* */
splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(),
blkLocations[0].getCachedHosts()));
}
} else {
//Create empty hosts array for zero length file
/*
* 11、如果为空文件,创建一个空的数据切片
* */
splits.add(makeSplit(path, 0, length, new String[0]));
}
}
// Save the number of input files for metrics/loadgen
job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());
sw.stop();
if (LOG.isDebugEnabled()) {
LOG.debug("Total # of splits generated by getSplits: " splits.size()
", TimeTaken: " sw.now(TimeUnit.MILLISECONDS));
}
return splits;
}
二、数据切片机制
1、TextInputFormat切片机制
切片方式:TextInputFormat是默认的切片机制,按文件规划进行切分。比如切片默认为128M,如果一个文件为200M,则会形成两个切片,一个是128M,一个是72M,启动两个MapTask任务进行处理任务。但是如果一个文件只有1M,也会单独启动一个MapTask执行此任务,如果是10个这样的小文件,就会启动10个MapTask处理小文件任务。读取方式:TextInputFormat是按行读取文件的每条记录,key代表读取的文件行在该文件中的起始字节偏移量,key为LongWritable类型;value为读取的行内容,不包括任何行终止符(换行符/回车符), value为Text类型,相当于java中的String类型。
例如:
代码语言:javascript复制
Birds of a feather flock together
Barking dogs seldom bite
Bad news has wings
用TextInputFormat按每行读取文件时,对应的key和value分别为:
代码语言:javascript复制(0,Birds of a feather flock together)
(34,Barking dogs seldom bite)
(59,Bad news has wings)
Demo:下面测试案例已统计单词为测试案例,处理文件为4个文件。
建立对应的Mapper类WordCountMapper:
代码语言:javascript复制package com.lzj.hadoop.input;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
/*
* LongWritable - 表示读取第几行
* Text - 表示读取一行的内容
* Text - 表示输出的键
* IntWritable - 表示输出的键对应的个数
* */
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
Text k = new Text();
IntWritable v = new IntWritable(1);
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
//1、读取一行内容
String line = value.toString();
if(line.isEmpty()) {
return;
}
//2、按空格切割读取的单词
String[] words = line.split(" ");
//3、输出mapper处理完的内容
for(String word : words) {
/*给键设置值*/
k.set(word);
/*把mapper处理后的键值对写到context中*/
context.write(k, v);
}
}
}
建立对应的Reducer类:
代码语言:javascript复制package com.lzj.hadoop.input;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
/*
* Text - 输入的键(即Mapper阶段输出的键)
* IntWritable - 输入的值(个数)(即Mapper阶段输出的值)
* Text - 输出的键
* IntWritable - 输出的值
* */
public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
@Override
protected void reduce(Text text, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
//1、统计键对应的个数
int sum = 0;
for(IntWritable value : values) {
sum = sum value.get();
}
//2、设置reducer的输出
IntWritable v = new IntWritable(sum);
context.write(text, v);
}
}
建立驱动类drive:
代码语言:javascript复制/*测试TextInputFormat*/
public void testTextInputFormat() throws IOException, ClassNotFoundException, InterruptedException{
//1、获取job的配置信息
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
//2、设置jar的加载路径
job.setJarByClass(WordCountDriver.class);
//3、分别设置Mapper和Reducer类
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReducer.class);
//4、设置map的输出类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
//5、设置最终输出的键值类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//6、设置输入输出路径
FileInputFormat.setInputPaths(job, new Path("D:/tmp/word/in"));
FileOutputFormat.setOutputPath(job, new Path("D:/tmp/word/out"));
//7、提交任务
boolean flag = job.waitForCompletion(true);
System.out.println("flag ;" flag);
}
启动测试,在输出的日志信息中会有如下一行内容:number of splits:4。
处理的4个文件1.txt、2.txt、3.txt、4.txt分别小于128M,每一个文件会被切成一个split。
2、CombineTextInputFormat切片机制
如果要处理的任务中含有很多小文件,采用默认的TextInputFormat切片机制会启动多个MapTask任务处理文件,浪费资源。CombineTextInputFormat用于处理小文件过多的场景,它可以将多个小文件从逻辑上切分到一个切片中。CombineTextInputFormat在形成切片过程中分为虚拟存储过程和切片过程两个过程。
(1)虚拟存储过程
将输入目录下所有文件大小,依次和设置的setMaxInputSplitSize值比较,如果不大于设置的最大值,逻辑上划分一个块。如果输入文件大于设置的最大值且大于两倍,那么以最大值切割一块;当剩余数据大小超过设置的最大值且不大于最大值2倍,此时将文件均分成2个虚拟存储块(防止出现太小切片)。例如setMaxInputSplitSize值为4M,输入文件大小为8.02M,则先逻辑上分成一个4M。剩余的大小为4.02M,如果按照4M逻辑划分,就会出现0.02M的小的虚拟存储文件,所以将剩余的4.02M文件切分成(2.01M和2.01M)两个文件。
(2)切片过程
判断虚拟存储的文件大小是否大于setMaxInputSplitSize值,大于等于则单独形成一个切片;如果不大于则跟下一个虚拟存储文件进行合并,共同形成一个切片。
代码语言:javascript复制下面以“D:\tmp\word\in”目录下的1.txt(576K)、2.txt(1151K)、3.txt(2302K)、4.txt(4604K)为例,比如设置虚拟存储切片setMaxInputSplitSize为2M,1.txt 大小576K小于2M,形成一个存储块,2.txt大小1151K也小于2M,形成一个存储块,3.txt大小2302K大于2M,但小于4M,形成两个存储块,分别为1151K,4.txt大小4604K大于4M,形成一个2M的存储块后,还剩4604-1024*2=2556K,2556K大于2M,小于4M,分别形成2个1278K的存储块, 在存储过程会形成6个文件块,分别为:
576K、1151K、(1151K,1151K)、(2048K、1278K、1278K)
在切片过程中,前3个存储块和为576K 1151K 1151K = 2878K > 2M,形成一个切片;第4和第5个存储块和为:1151K 2048K = 3199K > 2M,形成一个切片;最后两个存储块和为:1278K 1278K = 2556K > 2M,形成一个切片,最终在切片过程中,4个文件形成了3个切片,启动三个MapTask任务进行处理文件。
Demo:采用上述D:tmpwordin目录下的文件进行测试。WordCountMapper和WordCountReducer同上例,驱动类如下
代码语言:javascript复制/*测试CombineTextInputFormat*/
public void testCombineTextInputFormat() throws IOException, ClassNotFoundException, InterruptedException {
//1、获取job的配置信息
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
//2、设置jar的加载路径
job.setJarByClass(WordCountDriver.class);
//3、分别设置Mapper和Reducer类
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReducer.class);
//4、设置map的输出类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
//5、设置最终输出的键值类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//6、设置输入输出路径
FileInputFormat.setInputPaths(job, new Path("D:\tmp\word\in"));
FileOutputFormat.setOutputPath(job, new Path("D:\tmp\word\out"));
//7、设置数据切分方式
job.setInputFormatClass(CombineTextInputFormat.class);
CombineTextInputFormat.setMaxInputSplitSize(job, 2097152); //2M
//8、提交任务
boolean flag = job.waitForCompletion(true);
System.out.println("flag ;" flag);
}
启动测试类,日志输出中会有如下内容:
代码语言:javascript复制number of splits:3
3、KeyValueTextInputFormat切片机制
KeyValueTextInputFormat与TextInputFormat相似,按行读入记录,每个文件形成一个切片,但KeyValueTextInputFormat在读入一行后可以指定切割符,把一行内容按切割符分割成键值对的形式。例如
代码语言:javascript复制A-this is a
B-this is b
C-this is c
C-this is c
经过mapper阶段后被切割成:
代码语言:javascript复制(A,this is a)
(B,this is b)
(C,this is c)
(C,this is c)
下面统计每行开头为相同字母的个数。Mapper类为:
代码语言:javascript复制package com.lzj.hadoop.input;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
/*
* LongWritable - 表示读取第几行
* Text - 表示读取一行的内容
* Text - 表示输出的键
* IntWritable - 表示输出的键对应的个数
* */
public class WordCountMapper extends Mapper<Text, Text, Text, LongWritable>{
LongWritable v = new LongWritable(1);
@Override
protected void map(Text key, Text value, Context context)
throws IOException, InterruptedException {
//1、读取一行内容
String line = value.toString();
if(line.isEmpty()) {
return;
}
//2、按空格切割读取的单词
context.write(key, v);
}
}
Reducer类为:
代码语言:javascript复制java
package com.lzj.hadoop.input;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
/*
* Text - 输入的键(即Mapper阶段输出的键)
* IntWritable - 输入的值(个数)(即Mapper阶段输出的值)
* Text - 输出的键
* IntWritable - 输出的值
* */
public class WordCountReducer extends Reducer<Text, LongWritable, Text, LongWritable>{
@Override
protected void reduce(Text text, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
//1、统计键对应的个数
long sum = 0;
for(LongWritable value : values) {
sum = sum value.get();
}
//2、设置reducer的输出
LongWritable v = new LongWritable(sum);
context.write(text, v);
}
}
Driver驱动类为:
代码语言:javascript复制/*测试keyvaleTextInputFormat*/
public static void testkeyValeTextInputFormat() throws IOException, ClassNotFoundException, InterruptedException {
//1、获取job的配置信息
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
//2、设置jar的加载路径
job.setJarByClass(WordCountDriver.class);
//3、分别设置Mapper和Reducer类
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReducer.class);
//4、设置map的输出类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
//5、设置最终输出的键值类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
//6、设置输入输出路径
FileInputFormat.setInputPaths(job, new Path("D:/tmp/word/in1/1.txt"));
FileOutputFormat.setOutputPath(job, new Path("D:/tmp/word/out6"));
//7、设置数据切分方式
job.setInputFormatClass(KeyValueTextInputFormat.class);
conf.set(KeyValueLineRecordReader.KEY_VALUE_SEPERATOR, "-");
//8、提交任务
boolean flag = job.waitForCompletion(true);
System.out.println("flag ;" flag);
}
启动测试,输出切片个数为1
4、NLineInputFormat切片机制
NLineInputFormat可以指定切分文件时按指定的行数进行切分,比如文件总行数为n,切分行数为N,则切片数为:如果n/N整除,切片数为n/N;如果不能整除,切片数为(n/N 1)。以下面测试文件为例:
代码语言:javascript复制There is no royal road to learning
It is never too old to learn
A man becomes learned by asking questions
Absence makes the heart grow fonder
When the cat is away, the mice will play
No cross, no crown
Ill news travels fast
He that climbs high falls heavily
From saving comes having
Experience is the mother of wisdom
East or west, home is best
Don't teach your grandmother to suck eggs
Don't trouble trouble until trouble troubles you
Doing is better than saying
Birds of a feather flock together
Barking dogs seldom bite
Bad news has wings
As the tree, so the fruit
An idle youth, a needy age
文件共有19行,假设设置切片行数为5,即每5行形成一个切片,可以分成 19/5 1=5个切片。Mapper在读入文件时与TextInputFormat相同,按每行读取记录,对应的键key为该行内容在文件中的偏移量,对应的值value为该行具体内容。例如
代码语言:javascript复制(0,There is no royal road to learning)
(35,It is never too old to learn)
(64,A man becomes learned by asking questions)
……
统计该测试文件中单词数案例如下,建立Mapper类:
代码语言:javascript复制package com.lzj.hadoop.input.nline;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class NLineInputFormatMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
Text k = new Text();
LongWritable v = new LongWritable(1);
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
//1、获取一行内容
String line = value.toString();
//2、切割行
String[] words = line.split(" ");
//3、循环写出
for(String word : words) {
k.set(word);
context.write(k, v);
}
}
}
建立Reducer类:
代码语言:javascript复制package com.lzj.hadoop.input.nline;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class NLineInputFormatReducer extends Reducer<Text, LongWritable, Text, LongWritable> {
LongWritable v = new LongWritable();
@Override
protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
long sum = 0;
for(LongWritable value : values) {
sum = sum value.get();
}
v.set(sum);
context.write(key, v);
}
}
建立Driver测试类:
代码语言:javascript复制package com.lzj.hadoop.input.nline;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.NLineInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class NLineInputFormatDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
//1、获取job的配置信息
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
//2、设置jar的加载路径
job.setJarByClass(NLineInputFormatDriver.class);
//3、分别设置Mapper和Reducer类
job.setMapperClass(NLineInputFormatMapper.class);
job.setReducerClass(NLineInputFormatReducer.class);
//4、设置map的输出类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
//5、设置最终输出的键值类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
//6、设置输入输出路径
FileInputFormat.setInputPaths(job, new Path("D:\tmp\word\in2"));
FileOutputFormat.setOutputPath(job, new Path("D:\tmp\word\out7"));
//7、设置切分方式
job.setInputFormatClass(NLineInputFormat.class);
NLineInputFormat.setNumLinesPerSplit(job, 5);
//8、提交任务
boolean flag = job.waitForCompletion(true);
System.out.println("flag ;" flag);
}
}
启动测试类,日志中会输出切片的个数:
代码语言:javascript复制number of splits:4
5、自定义InputFormat切片机制
除了上述hadoop自带的切片机制,还可以自定义切片机制满足定制开发。自定义InputFormat切片机制时需要自定义一个RecorderReader用于读取文件,需要自定义一个InputFormat用于设置切文件输入切分方式,然后后续开发如同上述切片机制开发一样,创建Mapper、Reducer、driver类即可。下面以将3个小文件合并成一个大文件为例。首先,定制RecordReader类:
代码语言:javascript复制package com.lzj.hadoop.input.custom;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
public class CustomRecordReader extends RecordReader<Text, BytesWritable>{
private FileSplit split;
private Configuration conf;
private Text key = new Text();
private BytesWritable value = new BytesWritable();
private Boolean isProgress = true;
@Override
public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
this.split = (FileSplit) split;
conf = context.getConfiguration();
}
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
if(isProgress) {
FSDataInputStream inputStream = null;
try {
/*1、获取文件系统*/
Path path = split.getPath();
FileSystem fs = path.getFileSystem(conf);
/*2、获取数据输入流*/
inputStream = fs.open(path);
/*3、读取文件内容*/
byte[] buf = new byte[(int) split.getLength()];
IOUtils.readFully(inputStream, buf, 0, buf.length);
/*4、设置输出文件内容value*/
value.set(buf, 0, buf.length);
/*5、设置输出文件的key*/
String fileName = split.getPath().toString();
key.set(fileName);
} catch (Exception e) {
e.printStackTrace();
}finally {
/*6、关闭数据流*/
IOUtils.closeStream(inputStream);
}
isProgress = false;
return true;
}
return false;
}
@Override
public Text getCurrentKey() throws IOException, InterruptedException {
return key;
}
@Override
public BytesWritable getCurrentValue() throws IOException, InterruptedException {
return value;
}
@Override
public float getProgress() throws IOException, InterruptedException {
return 0;
}
@Override
public void close() throws IOException {
// TODO Auto-generated method stub
}
}
其次,定制FileInputFormat
代码语言:javascript复制package com.lzj.hadoop.input.custom;
import java.io.IOException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
public class CustomFileInputFormat extends FileInputFormat<Text, BytesWritable>{
@Override
protected boolean isSplitable(JobContext context, Path filename) {
return false;
}
@Override
public RecordReader<Text, BytesWritable> createRecordReader(InputSplit split, TaskAttemptContext context)
throws IOException, InterruptedException {
CustomRecordReader recorder = new CustomRecordReader();
recorder.initialize(split, context);
return recorder;
}
}
然后创建Mapper类
代码语言:javascript复制package com.lzj.hadoop.input.custom;
import java.io.IOException;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class CstomMapper extends Mapper<Text, BytesWritable, Text, BytesWritable>{
@Override
protected void map(Text key, BytesWritable value, Mapper<Text, BytesWritable, Text, BytesWritable>.Context context)
throws IOException, InterruptedException {
//把文件名作为key,文件内容作为value
context.write(key, value);
}
}
再然后,创建Reducer类:
代码语言:javascript复制package com.lzj.hadoop.input.custom;
import java.io.IOException;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class CustomReducer extends Reducer<Text, BytesWritable, Text, BytesWritable>{
@Override
protected void reduce(Text key, Iterable<BytesWritable> values,
Reducer<Text, BytesWritable, Text, BytesWritable>.Context context) throws IOException, InterruptedException {
/*把key(文件名) value(文件内容)写入到一个文件中*/
context.write(key, values.iterator().next());
}
}
最后,创建Driver驱动类:
代码语言:javascript复制package com.lzj.hadoop.input.custom;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
public class CustomDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
//1、获取job的配置信息
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
//2、设置jar的加载路径
job.setJarByClass(CustomDriver.class);
//3、分别设置Mapper和Reducer类
job.setMapperClass(CstomMapper.class);
job.setReducerClass(CustomReducer.class);
//4、设置map的输出类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(BytesWritable.class);
//5、设置最终输出的键值类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(BytesWritable.class);
//6、设置输入文件格式
job.setInputFormatClass(CustomFileInputFormat.class);
//7、设置输出文件格式
job.setOutputFormatClass(SequenceFileOutputFormat.class);
//6、设置输入输出路径
FileInputFormat.setInputPaths(job, new Path("D:/tmp/word/in3"));
FileOutputFormat.setOutputPath(job, new Path("D:/tmp/word/out7"));
//8、提交任务
boolean flag = job.waitForCompletion(true);
System.out.println("flag ;" flag);
}
}
运行驱动类,会在out7目录下生成一个part-r-00000文件,打开之后,发现把in3目录下的1.txt、2.txt、3.txt的文件和内容写入到了该文件中,以后直接读取该文件,通过key(文件名)就可以直接获取小文件的内容。
Hadoop序列化(自定义传输对象)
序列化就是把内存中的对象转化成字节序列,便于网络间传输和持久化到硬盘上,避免数据掉电丢失。
在Haoop中定义的最常用的基本对象,都已经实现了org.apache.hadoop.io.Writable接口,比如BooleanWritable、ByteWritable、IntWritable、FloatWritable、LongWritable、DoubleWritable、Text、MapWritable、ArrayWritable等对象,这些对象都可以在Mapper和Reducer之间进行数据序列化传输或持久到磁盘中,因此我们可以自定义对象,实现Writable接口,便可实现同样功能。
示例:有一个文本user.txt,每条记录登记了一个工人id、性别、单位小时劳动力价格,以及时长,有的工人会做多分工作,因此有多条记录。下面统计出每个工人id对应的性别和总金额。user.txt内容如下:
代码语言:javascript复制12001 male 10 5
12002 female 8 7
12003 male 15 5
12004 male 12 10
12005 female 7 12
12003 male 16 5
1、建立输入数据对应的bean
建立User的bean,实现Writable接口,需要重写两个方法write(序列化方法)、readFields(反序列化方法),写序列化方法和发序列化方法的写入和读取的顺序必须一致,示例如下:
代码语言:javascript复制package com.lzj.hadoop.serialize;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable;
/*实现writable接口*/
public class User implements Writable {
private String sex;
private int amount;
/*空参构造函数,反序列化时调用 */
public User() {
super();
}
/*写序列化方法*/
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(sex);
out.writeInt(amount);
}
/*反序列化,反序列化必须与读序列化的方法一致*/
@Override
public void readFields(DataInput in) throws IOException {
this.sex = in.readUTF();
this.amount = in.readInt();
}
@Override
public String toString() {
return sex "t" "t" amount;
}
public String getSex() {
return sex;
}
public void setSex(String sex) {
this.sex = sex;
}
public int getAmount() {
return amount;
}
public void setAmount(int amount) {
this.amount = amount;
}
}
2、建立Mapper分割处理数据
代码语言:javascript复制package com.lzj.hadoop.serialize;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class UserMapper extends Mapper<LongWritable, Text, Text, User>{
Text k = new Text();
User v = new User();
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
/*1、获取一行*/
String line = value.toString();
/*2、切割字段*/
String[] fields = line.split("t");
/*3、取出用户id作为key*/
String userId = fields[0];
/*4、取出用户单价和时长,求总金额*/
int price = Integer.valueOf(fields[2]);
int hours = Integer.valueOf(fields[3]);
int amount = price * hours;
/*5、设置输出键值对*/
k.set(userId); //设置键
v.setSex(fields[1]);
v.setAmount(amount);
context.write(k, v);
}
}
3、建立Reducer合并数据
代码语言:javascript复制package com.lzj.hadoop.serialize;
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class UserReducer extends Reducer<Text, User, Text, User>{
@Override
protected void reduce(Text key, Iterable<User> values, Context context)
throws IOException, InterruptedException {
int amount = 0;
/*遍历获取总金额*/
String sex = null;
for(User u : values) {
amount = amount u.getAmount();
sex = u.getSex();
}
/*封装Reducer输出对象*/
User user = new User();
user.setSex(sex);
user.setAmount(amount);
context.write(key, user);
}
}
4、建立job的启动类
代码语言:javascript复制package com.lzj.hadoop.serialize;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class UserDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
/*获取job的配置信息*/
Configuration config = new Configuration();
Job job = Job.getInstance(config);
/*指定jar的启动类*/
job.setJarByClass(UserDriver.class);
/*指定关联的mapper/reducer类*/
job.setMapperClass(UserMapper.class);
job.setReducerClass(UserReducer.class);
/*指定Mapper输出数据的KV类型*/
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(User.class);
/*指定最终的输出数据KV类型*/
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(User.class);
/*设定job的输入和输出路径*/
FileInputFormat.setInputPaths(job, new Path("D:/tmp/user.txt"));
FileOutputFormat.setOutputPath(job, new Path("D:/tmp/userOut"));
/*提交任务*/
boolean flag = job.waitForCompletion(true);
System.out.println(flag);
}
}
5、测试
运行job的启动类UserDriver,输出结果如下:
代码语言:javascript复制12001 male 50
12002 female 56
12003 male 155
12004 male 120
12005 female 84