MIT 6.824 (2020 Spring) Lab1 - MapReduce 实现

2022-07-06 15:04:39 浏览数 (1)

使用的2020 Spring版的教程和Lab,最新的2021版是由Frans Kaashoek主讲,而不是Robert Morris。再加上Frans的口音都比Morris重很多,手写也比较难认,所以没有使用2021的教程。

个人感觉6.824还是值得听一听的。虽然主要部分在论文里面,但是课堂上的讨论也很有价值。刷课的视频我放在B站上,ID: s09g谷歌摸鱼。

Lab1要求用Go实现一个基本的MapReduce框架。需要先读MapReduce论文。如果英文论文读不下来,我也在B站上做了一期MapReduce论文解读的视频。

6.824所有的Lab都要求用Go实现,所以我先花了点时间去学Go.微信阅读有《Head First Go》,知识结构合理,适合零基础,无限卡可以免费读。看完《Head First Go》之后可以快速过一下Go by Example中文版 ,了解一下还有哪些《Head First Go》没有讲到的地方。然后就可以去写Lab1了,有不清楚的地方直接百度。Lab1最终的实现代码我放在了github.com/s09g/mapreduce-go

MapReduce的执行流程

根据论文第三节,MapReduce的执行流程分这么几步:

  1. The MapReduce library in the user program first splits the input files into M pieces of typically 16 megabytes to 64 megabytes (MB) per piece (controllable by the user via an optional parameter). It then starts up many copies of the program on a cluster of machines.

启动MapReduce, 将输入文件切分成大小在16-64MB之间的文件。然后在一组多个机器上启动用户程序

  1. One of the copies of the program is special – the master. The rest are workers that are assigned work by the master. There are M map tasks and R reduce tasks to assign. The master picks idle workers and assigns each one a map task or a reduce task.

其中一个副本将成为master, 余下成为worker. master给worker指定任务(M个map任务,R个reduce任务)。master选择空闲的worker给予map或reduce任务

  1. A worker who is assigned a map task reads the contents of the corresponding input split. It parses key/value pairs out of the input data and passes each pair to the user-defined Map function. The intermediate key/value pairs produced by the Map function are buffered in memory.

Map worker 接收切分后的input,执行Map函数,将结果缓存到内存

  1. Periodically, the buffered pairs are written to local disk, partitioned into R regions by the partitioning function. The locations of these buffered pairs on the local disk are passed back to the master, who is responsible for forwarding these locations to the reduce workers.

缓存后的中间结果会周期性的写到本地磁盘,并切分成R份(reducer数量)。R个文件的位置会发送给master, master转发给reducer

  1. When a reduce worker is notified by the master about these locations, it uses remote procedure calls to read the buffered data from the local disks of the map workers. When a reduce worker has read all intermediate data, it sorts it by the intermediate keys so that all occurrences of the same key are grouped together. The sorting is needed because typically many different keys map to the same reduce task. If the amount of intermediate data is too large to fit in memory, an external sort is used.

Reduce worker 收到中间文件的位置信息,通过RPC读取。读取完先根据中间<k, v>排序,然后按照key分组、合并。

  1. The reduce worker iterates over the sorted intermediate data and for each unique intermediate key encountered, it passes the key and the corresponding set of intermediate values to the user’s Reduce function. The output of the Reduce function is appended to a final output file for this reduce partition.

Reduce worker在排序后的数据上迭代,将中间<k, v> 交给reduce 函数处理。最终结果写给对应的output文件(分片)

  1. When all map tasks and reduce tasks have been completed, the master wakes up the user program. At this point, the MapReduce call in the user program returns back to the user code.

所有map和reduce任务结束后,master唤醒用户程序

MapReduce的实现

Master的数据结构设计

这一部分对论文的3.2节有所改动,相比于原论文有所简化。

论文提到每个(Map或者Reduce)Task有分为idle, in-progress, completed 三种状态。

代码语言:javascript复制

type MasterTaskStatus int
const (
    Idle MasterTaskStatus = iota
    InProgress
    Completed
)

Master存储这些Task的信息。与论文不同的是,这里我并没有保留worked的ID,因此master不会主动向worker发送心跳检测

代码语言:javascript复制

type MasterTask struct {
    TaskStatus    MasterTaskStatus
    StartTime     time.Time
    TaskReference *Task
}

此外Master存储Map任务产生的R个中间文件的信息。

代码语言:javascript复制

type Master struct {
    TaskQueue     chan *Task          // 等待执行的task
    TaskMeta      map[int]*MasterTask // 当前所有task的信息
    MasterPhase   State               // Master的阶段
    NReduce       int
    InputFiles    []string
    Intermediates [][]string // Map任务产生的R个中间文件的信息
}

Map和Reduce的Task应该负责不同的事情,但是在实现代码的过程中发现同一个Task结构完全可以兼顾两个阶段的任务。

代码语言:javascript复制

type Task struct {
    Input         string
    TaskState     State
    NReducer      int
    TaskNumber    int
    Intermediates []string
    Output        string
}

此外我将task和master的状态合并成一个State。task和master的状态应该一致。如果在Reduce阶段收到了迟来MapTask结果,应该直接丢弃。

代码语言:javascript复制

type State int
const (
    Map State = iota
    Reduce
    Exit
    Wait
)

MapReduce执行过程的实现

1. 启动master

代码语言:javascript复制

func MakeMaster(files []string, nReduce int) *Master {
    m := Master{
        TaskQueue:     make(chan *Task, max(nReduce, len(files))),
        TaskMeta:      make(map[int]*MasterTask),
        MasterPhase:   Map,
        NReduce:       nReduce,
        InputFiles:    files,
        Intermediates: make([][]string, nReduce),
    }

    // 切成16MB-64MB的文件
    // 创建map任务
    m.createMapTask()

    // 一个程序成为master,其他成为worker
    //这里就是启动master 服务器就行了,
    //拥有master代码的就是master,别的发RPC过来的都是worker
    m.server()
    // 启动一个goroutine 检查超时的任务
    go m.catchTimeOut()
    return &m
}

2. master监听worker RPC调用,分配任务

代码语言:javascript复制

// master等待worker调用
func (m *Master) AssignTask(args *ExampleArgs, reply *Task) error {
    // assignTask就看看自己queue里面还有没有task
    mu.Lock()
    defer mu.Unlock()
    if len(m.TaskQueue) > 0 {
        //有就发出去
        *reply = *<-m.TaskQueue
        // 记录task的启动时间
        m.TaskMeta[reply.TaskNumber].TaskStatus = InProgress
        m.TaskMeta[reply.TaskNumber].StartTime = time.Now()
    } else if m.MasterPhase == Exit {
        *reply = Task{TaskState: Exit}
    } else {
        // 没有task就让worker 等待
        *reply = Task{TaskState: Wait}
    }
    return nil
}

3. 启动worker

代码语言:javascript复制

func Worker(mapf func(string, string) []KeyValue,
            reducef func(string, []string) string) {
    // 启动worker
    for {
        // worker从master获取任务
        task := getTask()

        // 拿到task之后,根据task的state,map task交给mapper, reduce task交给reducer
        // 额外加两个state,让 worker 等待 或者 直接退出
        switch task.TaskState {
        case Map:
            mapper(&task, mapf)
        case Reduce:
            reducer(&task, reducef)
        case Wait:
            time.Sleep(5 * time.Second)
        case Exit:
            return
        }
    }
}

4. worker向master发送RPC请求任务

代码语言:javascript复制

func getTask() Task {
    // worker从master获取任务
    args := ExampleArgs{}
    reply := Task{}
    call("Master.AssignTask", &args, &reply)
    return reply
}

5. worker获得MapTask,交给mapper处理

代码语言:javascript复制

func mapper(task *Task, mapf func(string, string) []KeyValue) {
    //从文件名读取content
    content, err := ioutil.ReadFile(task.Input)
    if err != nil {
        log.Fatal("fail to read file: " task.Input, err)
    }
    //将content交给mapf,缓存结果
    intermediates := mapf(task.Input, string(content))

    //缓存后的结果会写到本地磁盘,并切成R份
    //切分方式是根据key做hash
    buffer := make([][]KeyValue, task.NReducer)
    for _, intermediate := range intermediates {
        slot := ihash(intermediate.Key) % task.NReducer
        buffer[slot] = append(buffer[slot], intermediate)
    }
    mapOutput := make([]string, 0)
    for i := 0; i < task.NReducer; i   {
        mapOutput = append(mapOutput, writeToLocalFile(task.TaskNumber, i, &buffer[i]))
    }
    //R个文件的位置发送给master
    task.Intermediates = mapOutput
    TaskCompleted(task)
}

6. worker任务完成后通知master

代码语言:javascript复制

func TaskCompleted(task *Task) {
    //通过RPC,把task信息发给master
    reply := ExampleReply{}
    call("Master.TaskCompleted", task, &reply)
}

7. master收到完成后的Task

  • 如果所有的MapTask都已经完成,创建ReduceTask,转入Reduce阶段
  • 如果所有的ReduceTask都已经完成,转入Exit阶段
代码语言:javascript复制

func (m *Master) TaskCompleted(task *Task, reply *ExampleReply) error {
    //更新task状态
    mu.Lock()
    if task.TaskState != m.MasterPhase || m.TaskMeta[task.TaskNumber].TaskStatus == Completed {
        // 因为worker写在同一个文件磁盘上,对于重复的结果要丢弃
        return nil
    }
    m.TaskMeta[task.TaskNumber].TaskStatus = Completed
    mu.Unlock()
    defer m.processTaskResult(task)
    return nil
}

func (m *Master) processTaskResult(task *Task)  {
    mu.Lock()
    defer mu.Unlock()
    switch task.TaskState {
    case Map:
        //收集intermediate信息
        for reduceTaskId, filePath := range task.Intermediates {
            m.Intermediates[reduceTaskId] = append(m.Intermediates[reduceTaskId], filePath)
        }
        if m.allTaskDone() {
            //获得所以map task后,进入reduce阶段
            m.createReduceTask()
            m.MasterPhase = Reduce
        }
    case Reduce:
        if m.allTaskDone() {
            //获得所以reduce task后,进入exit阶段
            m.MasterPhase = Exit
        }
    }
}

这里使用一个辅助函数判断是否当前阶段所有任务都已经完成

代码语言:javascript复制

func (m *Master) allTaskDone() bool {
    for _, task := range m.TaskMeta {
        if task.TaskStatus != Completed {
            return false
        }
    }
    return true
}

8. 转入Reduce阶段,worker获得ReduceTask,交给reducer处理

代码语言:javascript复制

func reducer(task *Task, reducef func(string, []string) string) {
    //先从filepath读取intermediate的KeyValue
    intermediate := *readFromLocalFile(task.Intermediates)
    //根据kv排序
    sort.Sort(ByKey(intermediate))

    dir, _ := os.Getwd()
    tempFile, err := ioutil.TempFile(dir, "mr-tmp-*")
    if err != nil {
        log.Fatal("Fail to create temp file", err)
    }
    // 这部分代码修改自mrsequential.go
    i := 0
    for i < len(intermediate) {
        //将相同的key放在一起分组合并
        j := i   1
        for j < len(intermediate) && intermediate[j].Key == intermediate[i].Key {
            j  
        }
        values := []string{}
        for k := i; k < j; k   {
            values = append(values, intermediate[k].Value)
        }
        //交给reducef,拿到结果
        output := reducef(intermediate[i].Key, values)
        //写到对应的output文件
        fmt.Fprintf(tempFile, "%v %vn", intermediate[i].Key, output)
        i = j
    }
    tempFile.Close()
    oname := fmt.Sprintf("mr-out-%d", task.TaskNumber)
    os.Rename(tempFile.Name(), oname)
    task.Output = oname
    TaskCompleted(task)
}

9. master确认所有ReduceTask都已经完成,转入Exit阶段,终止所有master和worker goroutine

代码语言:javascript复制

func (m *Master) Done() bool {
    mu.Lock()
    defer mu.Unlock()
    ret := m.MasterPhase == Exit
    return ret
}

10. 上锁 master跟多个worker通信,master的数据是共享的,其中TaskMeta, Phase, Intermediates, TaskQueue 都有读写发生。TaskQueue使用channel实现,自己带锁。只有涉及Intermediates, TaskMeta, Phase的操作需要上锁。PS.写的糙一点,那就是master每个方法都要上锁,master直接变成同步执行。。。

另外go -race并不能检测出所有的datarace。我曾一度任务Intermediates写操作发生在map阶段,读操作发生在reduce阶段读,逻辑上存在barrier,所以不会有datarace. 但是后来想到两个write也可能造成datarace,然而Go Race Detector并没有检测出来。

11. carsh处理

test当中有容错的要求,不过只针对worker。mapreduce论文中提到了:

  1. 周期性向worker发送心跳检测
  • 如果worker失联一段时间,master将worker标记成failed
  • worker失效之后
    • 已完成的map task被重新标记为idle
    • 已完成的reduce task不需要改变
    • 原因是:map的结果被写在local disk,worker machine 宕机会导致map的结果丢失;reduce结果存储在GFS,不会随着machine down丢失
  1. 对于in-progress 且超时的任务,启动backup执行

tricky的在于Lab1是在单机上用多进程模拟了多机器,但并不会因为进程终止导致写好的文件丢失,这也是为什么我前面没有按照论文保留workerID.

针对Lab1修改后的容错设计:

  1. 周期性检查task是否完成。将超时未完成的任务,交给新的worker,backup执行
代码语言:javascript复制

func (m *Master) catchTimeOut() {
    for {
        time.Sleep(5 * time.Second)
        mu.Lock()
        if m.MasterPhase == Exit {
            mu.Unlock()
            return
        }
        for _, masterTask := range m.TaskMeta {
            if masterTask.TaskStatus == InProgress && time.Now().Sub(masterTask.StartTime) > 10*time.Second {
                m.TaskQueue <- masterTask.TaskReference
                masterTask.TaskStatus = Idle
            }
        }
        mu.Unlock()
    }
}

  1. 从第一个完成的worker获取结果,将后序的backup结果丢弃
代码语言:javascript复制

if task.TaskState != m.MasterPhase || m.TaskMeta[task.TaskNumber].TaskStatus == Completed {
    // 因为worker写在同一个文件磁盘上,对于重复的结果要丢弃
    return nil
}
m.TaskMeta[task.TaskNumber].TaskStatus = Completed

测试

开启race detector,执行test-mr.sh

PS. Lab1要求的Go版本是1.13,如果使用的更新的版本,会报出

代码语言:javascript复制
rpc.Register: method "Done" has 1 input parameters; needs exactly three
代码语言:javascript复制
可以直接无视

0 人点赞