之前写了一篇分析MapReduce实现矩阵乘法算法的文章:Mapreduce实现矩阵乘法的算法思路 http://www.linuxidc.com/Linux/2014-09/106646.htm
为了让大家更直观的了解程序执行,今天编写了实现代码供大家参考。
编程环境:
- java version "1.7.0_40"
- Eclipse Kepler
- Windows7 x64
- Ubuntu 12.04 LTS
- Hadoop2.2.0
- Vmware 9.0.0 build-812388
输入数据:
A矩阵存放地址:hdfs://singlehadoop:8020/wordspace/dataguru/hadoopdev/week09/matrixmultiply/matrixA/matrixa
A矩阵内容: 3 4 6 4 0 8
B矩阵存放地址:hdfs://singlehadoop:8020/wordspace/dataguru/hadoopdev/week09/matrixmultiply/matrixB/matrixb
B矩阵内容: 2 3 3 0 4 1
实现代码:
一共三个类:
- 驱动类MMDriver
- Map类MMMapper
- Reduce类MMReducer
大家可根据个人习惯合并成一个类使用。
MMDriver.java
package dataguru.matrixmultiply;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class MMDriver { public static void main(String[] args) throws Exception { // set configuration Configuration conf = new Configuration();
// create job Job job = new Job(conf,"MatrixMultiply"); job.setJarByClass(dataguru.matrixmultiply.MMDriver.class); // specify Mapper & Reducer job.setMapperClass(dataguru.matrixmultiply.MMMapper.class); job.setReducerClass(dataguru.matrixmultiply.MMReducer.class); // specify output types of mapper and reducer job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); // specify input and output DIRECTORIES Path inPathA = new Path("hdfs://singlehadoop:8020/wordspace/dataguru/hadoopdev/week09/matrixmultiply/matrixA"); Path inPathB = new Path("hdfs://singlehadoop:8020/wordspace/dataguru/hadoopdev/week09/matrixmultiply/matrixB"); Path outPath = new Path("hdfs://singlehadoop:8020/wordspace/dataguru/hadoopdev/week09/matrixmultiply/matrixC"); FileInputFormat.addInputPath(job, inPathA); FileInputFormat.addInputPath(job, inPathB); FileOutputFormat.setOutputPath(job,outPath);
// delete output directory try{ FileSystem hdfs = outPath.getFileSystem(conf); if(hdfs.exists(outPath)) hdfs.delete(outPath); hdfs.close(); } catch (Exception e){ e.printStackTrace(); return ; } // run the job System.exit(job.waitForCompletion(true) ? 0 : 1); } }
MMMapper.java
package dataguru.matrixmultiply;
import java.io.IOException; import java.util.StringTokenizer;
import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileSplit;
public class MMMapper extends Mapper<Object, Text, Text, Text> { private String tag; //current matrix private int crow = 2;// 矩阵A的行数 private int ccol = 2;// 矩阵B的列数 private static int arow = 0; //current arow private static int brow = 0; //current brow @Override protected void setup(Context context) throws IOException, InterruptedException { // TODO get inputpath of input data, set to tag FileSplit fs = (FileSplit)context.getInputSplit(); tag = fs.getPath().getParent().getName(); }
/** * input data include two matrix files */ public void map(Object key, Text value, Context context) throws IOException, InterruptedException { StringTokenizer str = new StringTokenizer(value.toString()); if ("matrixA".equals(tag)) { //left matrix,output key:x,y int col = 0; while (str.hasMoreTokens()) { String item = str.nextToken(); //current x,y = line,col for (int i = 0; i < ccol; i ) { Text outkey = new Text(arow "," i); Text outvalue = new Text("a," col "," item); context.write(outkey, outvalue); System.out.println(outkey " | " outvalue); } col ; } arow ; }else if ("matrixB".equals(tag)) { int col = 0; while (str.hasMoreTokens()) { String item = str.nextToken(); //current x,y = line,col for (int i = 0; i < crow; i ) { Text outkey = new Text(i "," col); Text outvalue = new Text("b," brow "," item); context.write(outkey, outvalue); System.out.println(outkey " | " outvalue); } col ; } brow ; } } }
MMReducer.java
package dataguru.matrixmultiply;
import java.io.IOException; import java.util.HashMap; import java.util.Iterator; import java.util.Map; import java.util.StringTokenizer;
import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.Reducer.Context;
public class MMReducer extends Reducer<Text, Text, Text, Text> {
public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
Map<String,String> matrixa = new HashMap<String,String>(); Map<String,String> matrixb = new HashMap<String,String>(); for (Text val : values) { //values example : b,0,2 or a,0,4 StringTokenizer str = new StringTokenizer(val.toString(),","); String sourceMatrix = str.nextToken(); if ("a".equals(sourceMatrix)) { matrixa.put(str.nextToken(), str.nextToken()); //(0,4) } if ("b".equals(sourceMatrix)) { matrixb.put(str.nextToken(), str.nextToken()); //(0,2) } } int result = 0; Iterator<String> iter = matrixa.keySet().iterator(); while (iter.hasNext()) { String mapkey = iter.next(); result = Integer.parseInt(matrixa.get(mapkey)) * Integer.parseInt(matrixb.get(mapkey)); }
context.write(key, new Text(String.valueOf(result))); } }