MapReduce实现矩阵乘法–实现代码

2022-07-03 12:17:04 浏览数 (1)

之前写了一篇分析MapReduce实现矩阵乘法算法的文章:Mapreduce实现矩阵乘法的算法思路 http://www.linuxidc.com/Linux/2014-09/106646.htm

为了让大家更直观的了解程序执行,今天编写了实现代码供大家参考。

编程环境:

  • java version "1.7.0_40"
  • Eclipse Kepler
  • Windows7 x64
  • Ubuntu 12.04 LTS
  • Hadoop2.2.0
  • Vmware 9.0.0 build-812388 

输入数据:

A矩阵存放地址:hdfs://singlehadoop:8020/wordspace/dataguru/hadoopdev/week09/matrixmultiply/matrixA/matrixa

A矩阵内容: 3 4 6 4 0 8

 B矩阵存放地址:hdfs://singlehadoop:8020/wordspace/dataguru/hadoopdev/week09/matrixmultiply/matrixB/matrixb

B矩阵内容: 2 3 3 0 4 1

实现代码:

一共三个类:

  • 驱动类MMDriver
  • Map类MMMapper
  • Reduce类MMReducer

大家可根据个人习惯合并成一个类使用。

MMDriver.java

package dataguru.matrixmultiply;

import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class MMDriver {  public static void main(String[] args) throws Exception {   // set configuration   Configuration conf = new Configuration();

  // create job   Job job = new Job(conf,"MatrixMultiply");   job.setJarByClass(dataguru.matrixmultiply.MMDriver.class);         //  specify Mapper & Reducer   job.setMapperClass(dataguru.matrixmultiply.MMMapper.class);   job.setReducerClass(dataguru.matrixmultiply.MMReducer.class);   // specify output types of mapper and reducer   job.setOutputKeyClass(Text.class);   job.setOutputValueClass(Text.class);   job.setMapOutputKeyClass(Text.class);   job.setMapOutputValueClass(Text.class);   // specify input and output DIRECTORIES   Path inPathA = new Path("hdfs://singlehadoop:8020/wordspace/dataguru/hadoopdev/week09/matrixmultiply/matrixA");   Path inPathB = new Path("hdfs://singlehadoop:8020/wordspace/dataguru/hadoopdev/week09/matrixmultiply/matrixB");   Path outPath = new Path("hdfs://singlehadoop:8020/wordspace/dataguru/hadoopdev/week09/matrixmultiply/matrixC");   FileInputFormat.addInputPath(job, inPathA);   FileInputFormat.addInputPath(job, inPathB);         FileOutputFormat.setOutputPath(job,outPath);

  // delete output directory   try{    FileSystem hdfs = outPath.getFileSystem(conf);    if(hdfs.exists(outPath))     hdfs.delete(outPath);    hdfs.close();   } catch (Exception e){    e.printStackTrace();    return ;   }   //  run the job   System.exit(job.waitForCompletion(true) ? 0 : 1);  } }

MMMapper.java

package dataguru.matrixmultiply;

import java.io.IOException; import java.util.StringTokenizer;

import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileSplit;

public class MMMapper extends Mapper<Object, Text, Text, Text> {  private String tag;  //current matrix     private int crow = 2;// 矩阵A的行数     private int ccol = 2;// 矩阵B的列数     private static int arow = 0; //current arow     private static int brow = 0; //current brow  @Override  protected void setup(Context context) throws IOException,    InterruptedException {   // TODO get inputpath of input data, set to tag   FileSplit fs = (FileSplit)context.getInputSplit();   tag = fs.getPath().getParent().getName();  }

 /**   * input data include two matrix files   */  public void map(Object key, Text value, Context context)    throws IOException, InterruptedException {   StringTokenizer str = new StringTokenizer(value.toString());   if ("matrixA".equals(tag)) {          //left matrix,output key:x,y    int col = 0;    while (str.hasMoreTokens()) {     String item = str.nextToken();  //current x,y = line,col     for (int i = 0; i < ccol; i ) {      Text outkey = new Text(arow "," i);      Text outvalue = new Text("a," col "," item);      context.write(outkey, outvalue);      System.out.println(outkey " | " outvalue);     }     col ;    }    arow ;   }else if ("matrixB".equals(tag)) {    int col = 0;    while (str.hasMoreTokens()) {     String item = str.nextToken();  //current x,y = line,col     for (int i = 0; i < crow; i ) {      Text outkey = new Text(i "," col);      Text outvalue = new Text("b," brow "," item);      context.write(outkey, outvalue);      System.out.println(outkey " | " outvalue);     }     col ;    }    brow ;   }  } }

MMReducer.java

package dataguru.matrixmultiply;

import java.io.IOException; import java.util.HashMap; import java.util.Iterator; import java.util.Map; import java.util.StringTokenizer;

import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.Reducer.Context;

public class MMReducer extends Reducer<Text, Text, Text, Text> {

 public void reduce(Text key, Iterable<Text> values, Context context)    throws IOException, InterruptedException {

  Map<String,String> matrixa = new HashMap<String,String>();   Map<String,String> matrixb = new HashMap<String,String>();   for (Text val : values) {  //values example : b,0,2  or  a,0,4    StringTokenizer str = new StringTokenizer(val.toString(),",");    String sourceMatrix = str.nextToken();    if ("a".equals(sourceMatrix)) {     matrixa.put(str.nextToken(), str.nextToken());  //(0,4)    }    if ("b".equals(sourceMatrix)) {     matrixb.put(str.nextToken(), str.nextToken());  //(0,2)    }   }   int result = 0;   Iterator<String> iter = matrixa.keySet().iterator();   while (iter.hasNext()) {    String mapkey = iter.next();    result = Integer.parseInt(matrixa.get(mapkey)) * Integer.parseInt(matrixb.get(mapkey));   }

  context.write(key, new Text(String.valueOf(result)));  } }

0 人点赞