之前写分布式系统课的Lab4的时候只顾埋头做,偶然发现居然是MIT 6.824的lab1的java移植版。正好借此机会复习并整理一下。
Framework
Mapper
代码语言:javascript复制public static void doMap(String jobName, int mapTask, String inFile, int nReduce, MapFunc mapF) {
List<KeyValue> mapResult = mapF.map(inFile, readFile(Paths.get(inFile),false));
mapResult
.stream()
.collect(Collectors.groupingBy(kv -> hashCode(kv.key) % nReduce))
.forEach((reduceTask, value) -> writeFile(Paths.get(Utils.reduceName(jobName, mapTask, reduceTask)), JSON.toJSONString(value)));
}
之前的文章讲过,map本质上就是
代码语言:javascript复制map (in_key, in_value) -> list(out_key, intermediate_value)
这里的这个过程通过传入的MapFunc完成,暂且滞后。readFile是我自己实现的工具函数,可忽视,参数里的false指的是读的不是临时文件不需要删除。
生成KV列表之后,通过stream转变为流,通过hashCode%R进行分组。
然后对每个分组,通过规则输出到对应的中间文件中。
代码语言:javascript复制 public static String reduceName(String jobName, int mapTask, int reduceTask) {
return "mrtmp." jobName "-" mapTask "-" reduceTask;
}
Reducer
代码语言:javascript复制public static void doReduce(String jobName, int reduceTask, String outFile, int nMap, ReduceFunc reduceF) {
Stream<List<KeyValue>> MapResults= IntStream.range(0, nMap)
.mapToObj(m -> JSON.parseArray(Mapper.readFile(Paths.get(Utils.reduceName(jobName, m, reduceTask)),true), KeyValue.class));
Map<String,List<KeyValue>> MapFlatResults = MapResults.flatMap(Collection::stream).collect(Collectors.groupingBy(kv -> kv.key));
Map<String,String> res = new TreeMap<>();
for(Map.Entry<String,List<KeyValue>> entry : MapFlatResults.entrySet()){
res.put(entry.getKey(),reduceF.reduce(entry.getKey(),entry.getValue().stream().map(kv -> kv.value).toArray(String[]::new)));
}
Mapper.writeFile(Paths.get(outFile),JSON.toJSONString(res));
}
首先是从0->M,读取所有的中间文件,第二个参数是true表示需要删除中间文件。结果存储在Stream<List<KeyValue>>中。
然后使用flatMap对于流进行处理,合并list,按照key进行分组。
代码语言:javascript复制reduce (out_key, list(intermediate_value)) -> out_value
但是简单的按key分组之后,得到的是Map<String,List<KeyValue>>而不是Map<String,List<Value>>,因此用下面的步骤取出Value
代码语言:javascript复制entry.getValue().stream().map(kv -> kv.value).toArray(String[]::new)
这样,通过对每个<key,list<value>>执行ReduceFunc,即可得到最终的结果并汇总输出,输出结果是JSON的map。
Application
上面的过程是整个框架的抽象,相当于设计模式的模板方法,实际上执行的Reduce、Map函数都需要我们自己实现。
WordCount
统计多个文件中单词的数目
代码语言:javascript复制public class WordCount {
public static List<KeyValue> mapFunc(String file, String value) {
// Your code here (Part II)
Pattern pattern = Pattern.compile("[a-zA-Z0-9] ");
Matcher matcher = pattern.matcher(value);
List<KeyValue> kvs = new ArrayList<>();
while (matcher.find()) {
kvs.add(new KeyValue(matcher.group(), ""));
}
return kvs;
}
public static String reduceFunc(String key, String[] values) {
// Your code here (Part II)
return Integer.toString(values.length);
}
}
输出则是为每个匹配的单词都创建<word,null>。
个人感觉如果是输出<word,count>会更好,Reduce部分只需要sum,而且节约空间。
代码语言:javascript复制map (file, content) -> list(word, null)
单纯计算传入列表的长度即可。如果上面是<word,count>,这里则进行sum。
代码语言:javascript复制reduce (word, list(null)) -> count
InvertedIndex
统计单词所在的文件,以及总文件数
代码语言:javascript复制public class InvertedIndex {
public static List<KeyValue> mapFunc(String file, String value) {
// Your code here (Part V)
Pattern pattern = Pattern.compile("[a-zA-Z0-9] ");
Matcher matcher = pattern.matcher(value);
List<KeyValue> kvs = new ArrayList<>();
while (matcher.find()) {
kvs.add(new KeyValue(matcher.group(), file));
}
return kvs;
}
public static String reduceFunc(String key, String[] values) {
// Your code here (Part V)
return Stream.of(values).sorted().distinct().count() " "
Stream.of(values).sorted().distinct().collect(Collectors.joining(","));
}
输出则是为每个匹配的单词都创建<word,file>,感觉这里先做一个去重可能会比较好。
代码语言:javascript复制map (file, content) -> list(word, file)
这里事实上应该先存Stream.of(values).sorted().distinct()的,不过懒。如果上面做了去重这里就不需要做了。
输出是将所有文件用逗号分隔,前面加上count。
代码语言:javascript复制reduce (word, list(file)) -> InvertedIndex
Scheduler
此外,MapReduce显然是多线程的,虽然这是单机Lab,不过还是需要考虑到worker失败的可能性。(master没考虑,论文说打log,做备份,从checkpoint开始重新启动)
代码语言:javascript复制public static void schedule(String jobName, String[] mapFiles, int nReduce, JobPhase phase, Channel<String> registerChan) {
int nTasks = -1; // number of map or reduce tasks
int nOther = -1; // number of inputs (for reduce) or outputs (for map)
switch (phase) {
case MAP_PHASE:
nTasks = mapFiles.length;
nOther = nReduce;
break;
case REDUCE_PHASE:
nTasks = nReduce;
nOther = mapFiles.length;
break;
}
System.out.println(String.format("Schedule: %d %s tasks (%d I/Os)", nTasks, phase, nOther));
/**
// All ntasks tasks have to be scheduled on workers. Once all tasks
// have completed successfully, schedule() should return.
//
// Your code here (Part III, Part IV).
//
*/
try {
CountDownLatch latch = new CountDownLatch(nTasks);
for(int i = 0; i<nTasks;i ){
new WorkerThread(registerChan,new DoTaskArgs(jobName, mapFiles[i], phase, i, nOther),latch).start();
}
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(String.format("Schedule: %s done", phase));
}
这里利用CountDownLatch并发控制,当phase中的所有worker都完成任务后,返回。
这里存在设计问题,理论上只要Mapper输出了中间文件,Reduce就能读取,直到所有文件都读取完毕再开始计算。而Lab版是所有文件输出完毕之后再开始读取。效率肯定是不行的。
代码语言:javascript复制 private static class WorkerThread extends Thread {
final Channel<String> workers;
final DoTaskArgs args;
final CountDownLatch latch;
public WorkerThread(Channel<String> workers, DoTaskArgs args, CountDownLatch latch) {
this.workers = workers;
this.args = args;
this.latch = latch;
}
@Override
public void run() {
try {
String worker = workers.read();
Call.getWorkerRpcService(worker).doTask(args);
workers.write(worker);
latch.countDown();
} catch (InterruptedException e) {
Utils.debug("worker thread interrupted");
new WorkerThread(workers,args,latch).start();
} catch (SofaRpcException e) {
Utils.debug("worker failed");
new WorkerThread(workers,args,latch).start();
}
}
}
当worker完成之后递减CountDownLock。如果worker故障,简单地新建worker即可。
这得益于Map无状态纯函数的特点,只要输入相同,输出始终相同,幂等性。所以我们只需要和原论文一样重新调度一个worker执行相同task即可。
不过这是因为中间结果输出在本地,如果输出在分布式环境下的话,master需要通知reducer读的位置改变了。(参考论文)
另一种想法是多备份,也就是同一任务交给多个worker进行。
因为听说明年的分布式系统课改成云计算课(吐槽一波,云计算不就是分布式换皮吗),不知道本lab是不是会继续作为作业,所以助教移植的lab框架的代码就不发布了,只分享我自己写的部分。
https://github.com/sjtuzwj/DistributedSystemLab