前言: 从IT跨度到DT,如今的数据每天都在海量的增长。面对如此巨大的数据,如何能让搜索引擎更好的工作呢?本文作为Hadoop系列的第二篇,将介绍分布式情况下搜索引擎的基础实现,即“倒排索引”。
1.问题描述 将所有不同文件里面的关键词进行存储,并实现快速检索。下面假设有3个文件的数据如下:
代码语言:javascript复制file1.txt:MapReduce is simple
file2.txt:mapReduce is powerful is simple
file3.txt:Hello MapReduce bye MapReduce
最终应生成如下索引结果:
代码语言:javascript复制Hello file3.txt:1
MapReduce file3.txt:2;file2.txt:1;file1.txt:1
bye file3.txt:1
is file2.txt:2;file1.txt:1
powerful file2.txt:1
simple file2.txt:1;file1.txt:1
--------------------------------------------------------
2.设计 首先,我们对读入的数据利用Map操作进行预处理,如图1:
对比之前的单词计数(WorldCount.java),要实现倒排索引单靠Map和Reduce操作明显无法完成,因此中间我们加入'Combine',即合并操作;具体如图2:
--------------------------------------------------------------
3.代码实现
代码语言:javascript复制 1 package pro;
2
3 import java.io.IOException;
4 import java.util.StringTokenizer;
5 import org.apache.hadoop.conf.Configuration;
6 import org.apache.hadoop.fs.Path;
7 import org.apache.hadoop.io.IntWritable;
8 import org.apache.hadoop.io.Text;
9 import org.apache.hadoop.mapreduce.Job;
10 import org.apache.hadoop.mapreduce.Mapper;
11 import org.apache.hadoop.mapreduce.Reducer;
12 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
13 import org.apache.hadoop.mapreduce.lib.input.FileSplit;
14 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
15 import org.apache.hadoop.util.GenericOptionsParser;
16
17 public class InvertedIndex {
18 final static String INPUT_PATH = "hdfs://hadoop0:9000/index_in";
19 final static String OUTPUT_PATH = "hdfs://hadoop0:9000/index_out";
20
21 public static class Map extends Mapper<Object, Text, Text, Text> {
22
23 private Text keyInfo = new Text(); // 存储单词和URL组合
24 private Text valueInfo = new Text(); // 存储词频
25 private FileSplit split; // 存储Split对象
26
27 // 实现map函数
28 public void map(Object key, Text value, Context context)
29 throws IOException, InterruptedException {
30 // 获得<key,value>对所属的FileSplit对象
31 split = (FileSplit) context.getInputSplit();
32 StringTokenizer itr = new StringTokenizer(value.toString());
33 while (itr.hasMoreTokens()) {
34
35 // 只获取文件的名称。
36 int splitIndex = split.getPath().toString().indexOf("file");
37 keyInfo.set(itr.nextToken() ":"
38 split.getPath().toString().substring(splitIndex));
39 // 词频初始化为1
40 valueInfo.set("1");
41 context.write(keyInfo, valueInfo);
42 }
43 }
44 }
45
46 public static class Combine extends Reducer<Text, Text, Text, Text> {
47 private Text info = new Text();
48
49 // 实现reduce函数
50 public void reduce(Text key, Iterable<Text> values, Context context)
51 throws IOException, InterruptedException {
52 // 统计词频
53 int sum = 0;
54 for (Text value : values) {
55 sum = Integer.parseInt(value.toString());
56 }
57
58 int splitIndex = key.toString().indexOf(":");
59 // 重新设置value值由URL和词频组成
60 info.set(key.toString().substring(splitIndex 1) ":" sum);
61 // 重新设置key值为单词
62 key.set(key.toString().substring(0, splitIndex));
63 context.write(key, info);
64 }
65 }
66
67 public static class Reduce extends Reducer<Text, Text, Text, Text> {
68 private Text result = new Text();
69
70 // 实现reduce函数
71 public void reduce(Text key, Iterable<Text> values, Context context)
72 throws IOException, InterruptedException {
73 // 生成文档列表
74 String fileList = new String();
75 for (Text value : values) {
76 fileList = value.toString() ";";
77 }
78 result.set(fileList);
79
80 context.write(key, result);
81 }
82 }
83
84 public static void main(String[] args) throws Exception {
85
86 Configuration conf = new Configuration();
87
88 Job job = new Job(conf, "Inverted Index");
89 job.setJarByClass(InvertedIndex.class);
90
91 // 设置Map、Combine和Reduce处理类
92 job.setMapperClass(Map.class);
93 job.setCombinerClass(Combine.class);
94 job.setReducerClass(Reduce.class);
95
96 // 设置Map输出类型
97 job.setMapOutputKeyClass(Text.class);
98 job.setMapOutputValueClass(Text.class);
99
100 // 设置Reduce输出类型
101 job.setOutputKeyClass(Text.class);
102 job.setOutputValueClass(Text.class);
103
104 // 设置输入和输出目录
105 FileInputFormat.addInputPath(job, new Path(INPUT_PATH));
106 FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH));
107 System.exit(job.waitForCompletion(true) ? 0 : 1);
108 }
109 }
4.测试结果
代码语言:javascript复制Hello file3.txt:1;
MapReduce file3.txt:2;file1.txt:1;file2.txt:1;
bye file3.txt:1;
is file1.txt:1;file2.txt:2;
powerful file2.txt:1;
simple file2.txt:1;file1.txt:1;
Reference:
[1]Hadoop权威指南【A】Tom Wbite
[2]深入云计算·Hadoop应用开发实战详解【A】万川梅 谢正兰
--------------
结语:
从上面的Map---> Combine ----> Reduce操作过程中,我们可以体会到“倒排索引”的过程其实也就是不断组合并拆分字符串的过程,而这也就是Hadoop中MapReduce并行计算的体现。在现今的大部分企业当中,Hadoop主要应用之一就是针对日志进行处理,所以想进军大数据领域的朋友,对于Hadoop的Map/Reduce实现原理可以通过更多的实战操作加深理解。本文仅仅只是牛刀小试,对于Hadoop的深层应用本人也正在慢慢摸索~~